Sunday, 26 July 2009

GroupBy Rx Combinator

type IObservableGrouping<'a,'b> =
inherit IObservable<'b>
abstract member Key: 'a with get


[<Extension>]
let GroupBy(source : IObservable<'a>, keySelector : Func<'a, 'b>) =
let dict = new System.Collections.Generic.Dictionary<'b, bool * IObservableGrouping<'b, 'a> * ('a -> unit)>()

{ new IObservable<IObservableGrouping<'b, 'a>>
with member o.Subscribe(observer) =
source.Subscribe({ new IObserver<'a> with
member
o.OnNext x =
let y = keySelector.Invoke x
let getValue key value =
if (not (dict.ContainsKey key)) then
dict.Add (key, (false, { new IObservableGrouping<'b, 'a> with
member
o.Key = y
member o.Subscribe(observer') =
observer'.OnNext value
let (_, obv, _) = dict.[y]
dict.[y] <- (true, obv, fun x' -> observer'.OnNext x')
null }, fun x' -> ()))
dict.[key]
let (subs, obv, f) = getValue y x
if (not subs) then observer.OnNext(obv)
f(x) }) }

Wednesday, 22 July 2009

Reactive Linq in F#

namespace FSReactiveLinq
[<System.Runtime.CompilerServices.Extension>]
module LinqEnabler

open System
open System.Runtime.CompilerServices

// IObservable interfaces

type IObserver<'a> =
abstract member OnNext : 'a -> unit

type IObservable<'a> =
abstract member Subscribe : IObserver<'a> -> IDisposable

let CreateObserver f (next: IObserver<'b>) = { new IObserver<'a> with
member
o.OnNext(x) =
next.OnNext(f x) }
let Map(observable: IObservable<'a>, f)
= { new IObservable<'b> with
member
o.Subscribe(observer) =
observable.Subscribe(CreateObserver f observer) }

let Bind(observable: IObservable<'a>, selector : 'a -> IObservable<'b>, projection) =

let project (observer:IObserver<'c>) x = CreateObserver (projection x) observer

let Subscribe observer = { new IObserver<'a> with
member
o.OnNext x =
//TODO: Add support for IDisposable
ignore ((selector x).Subscribe(project observer x)) }

{ new IObservable<'c> with
member
o.Subscribe(observer) =
observable.Subscribe(Subscribe observer) }


[<Extension>]
let Select(observable : IObservable<'a>, selector : Func<'a,'b>) =
Map(observable, fun x -> selector.Invoke(x))

[<Extension>]
let SelectMany(observable : IObservable<'a>, selector : Func<'a, IObservable<'b>>, projection : Func<'a, 'b, 'c>) =
Bind(observable, (fun x -> selector.Invoke(x)), (fun x -> fun y -> projection.Invoke(x, y)))


[<Extension>]
let Subscribe<'a> (observable : IObservable<'a>) (action:Action<'a>) =
let observer = { new IObserver<'a>
with member o.OnNext x = action.Invoke x }
observable.Subscribe(observer)