Wednesday, 22 July 2009

Reactive Linq in F#

namespace FSReactiveLinq
[<System.Runtime.CompilerServices.Extension>]
module LinqEnabler

open System
open System.Runtime.CompilerServices

// IObservable interfaces

type IObserver<'a> =
abstract member OnNext : 'a -> unit

type IObservable<'a> =
abstract member Subscribe : IObserver<'a> -> IDisposable

let CreateObserver f (next: IObserver<'b>) = { new IObserver<'a> with
member
o.OnNext(x) =
next.OnNext(f x) }
let Map(observable: IObservable<'a>, f)
= { new IObservable<'b> with
member
o.Subscribe(observer) =
observable.Subscribe(CreateObserver f observer) }

let Bind(observable: IObservable<'a>, selector : 'a -> IObservable<'b>, projection) =

let project (observer:IObserver<'c>) x = CreateObserver (projection x) observer

let Subscribe observer = { new IObserver<'a> with
member
o.OnNext x =
//TODO: Add support for IDisposable
ignore ((selector x).Subscribe(project observer x)) }

{ new IObservable<'c> with
member
o.Subscribe(observer) =
observable.Subscribe(Subscribe observer) }


[<Extension>]
let Select(observable : IObservable<'a>, selector : Func<'a,'b>) =
Map(observable, fun x -> selector.Invoke(x))

[<Extension>]
let SelectMany(observable : IObservable<'a>, selector : Func<'a, IObservable<'b>>, projection : Func<'a, 'b, 'c>) =
Bind(observable, (fun x -> selector.Invoke(x)), (fun x -> fun y -> projection.Invoke(x, y)))


[<Extension>]
let Subscribe<'a> (observable : IObservable<'a>) (action:Action<'a>) =
let observer = { new IObserver<'a>
with member o.OnNext x = action.Invoke x }
observable.Subscribe(observer)

No comments: