Saturday, 9 January 2010

Weak Subscribe to an IObservable Source



namespace Reactive
[<System.Runtime.CompilerServices.Extension>]
module Extensions =

    open System
    open System.Runtime.CompilerServices

    type IObserver<'a> =
        abstract member OnNext : 'a -> unit

    type IObservable<'a> =
        abstract member Subscribe : IObserver<'a> -> IDisposable

    [<Extension>]
    let Subscribe<'a> (o:IObservable<'a>, f) =
        o.Subscribe({ new IObserver<'a> with
                          member this.OnNext x = f x })

    [<Extension>]
    let SubscribeWeakly<'a> (o:IObservable<'a>, a:Action<'a>) =
        let (m, r) = (a.Method, new WeakReference(a.Target))
        let rec disposable = Subscribe(o, fun x -> if (r.IsAlive) then
                                                      ignore (m.Invoke (r.Target, [|x|]))
                                                   else
                                                      disposable.Dispose ())

        disposable

1 comment:

Nick said...

I think there's a race in SubscribeWeakly - r can be collected between the invocations to IsAlive and Target. It would be safer to take a local reference to Target and check that for null first.