Sunday 26 July 2009

GroupBy Rx Combinator

type IObservableGrouping<'a,'b> =
inherit IObservable<'b>
abstract member Key: 'a with get


[<Extension>]
let GroupBy(source : IObservable<'a>, keySelector : Func<'a, 'b>) =
let dict = new System.Collections.Generic.Dictionary<'b, bool * IObservableGrouping<'b, 'a> * ('a -> unit)>()

{ new IObservable<IObservableGrouping<'b, 'a>>
with member o.Subscribe(observer) =
source.Subscribe({ new IObserver<'a> with
member
o.OnNext x =
let y = keySelector.Invoke x
let getValue key value =
if (not (dict.ContainsKey key)) then
dict.Add (key, (false, { new IObservableGrouping<'b, 'a> with
member
o.Key = y
member o.Subscribe(observer') =
observer'.OnNext value
let (_, obv, _) = dict.[y]
dict.[y] <- (true, obv, fun x' -> observer'.OnNext x')
null }, fun x' -> ()))
dict.[key]
let (subs, obv, f) = getValue y x
if (not subs) then observer.OnNext(obv)
f(x) }) }

No comments: