type IObservableGrouping<'a,'b> =
inherit IObservable<'b>
abstract member Key: 'a with get
[<Extension>]
let GroupBy(source : IObservable<'a>, keySelector : Func<'a, 'b>) =
let dict = new System.Collections.Generic.Dictionary<'b, bool * IObservableGrouping<'b, 'a> * ('a -> unit)>()
{ new IObservable<IObservableGrouping<'b, 'a>>
with member o.Subscribe(observer) =
source.Subscribe({ new IObserver<'a> with
member o.OnNext x =
let y = keySelector.Invoke x
let getValue key value =
if (not (dict.ContainsKey key)) then
dict.Add (key, (false, { new IObservableGrouping<'b, 'a> with
member o.Key = y
member o.Subscribe(observer') =
observer'.OnNext value
let (_, obv, _) = dict.[y]
dict.[y] <- (true, obv, fun x' -> observer'.OnNext x')
null }, fun x' -> ()))
dict.[key]
let (subs, obv, f) = getValue y x
if (not subs) then observer.OnNext(obv)
f(x) }) }
Useful snippets of F# code, formatted in a way that makes it easy to copy and paste the snippet in the F# Interactive editor.
Sunday, 26 July 2009
GroupBy Rx Combinator
Wednesday, 22 July 2009
Reactive Linq in F#
namespace FSReactiveLinq
[<System.Runtime.CompilerServices.Extension>]
module LinqEnabler
open System
open System.Runtime.CompilerServices
// IObservable interfaces
type IObserver<'a> =
abstract member OnNext : 'a -> unit
type IObservable<'a> =
abstract member Subscribe : IObserver<'a> -> IDisposable
let CreateObserver f (next: IObserver<'b>) = { new IObserver<'a> with
member o.OnNext(x) =
next.OnNext(f x) }
let Map(observable: IObservable<'a>, f)
= { new IObservable<'b> with
member o.Subscribe(observer) =
observable.Subscribe(CreateObserver f observer) }
let Bind(observable: IObservable<'a>, selector : 'a -> IObservable<'b>, projection) =
let project (observer:IObserver<'c>) x = CreateObserver (projection x) observer
let Subscribe observer = { new IObserver<'a> with
member o.OnNext x =
//TODO: Add support for IDisposable
ignore ((selector x).Subscribe(project observer x)) }
{ new IObservable<'c> with
member o.Subscribe(observer) =
observable.Subscribe(Subscribe observer) }
[<Extension>]
let Select(observable : IObservable<'a>, selector : Func<'a,'b>) =
Map(observable, fun x -> selector.Invoke(x))
[<Extension>]
let SelectMany(observable : IObservable<'a>, selector : Func<'a, IObservable<'b>>, projection : Func<'a, 'b, 'c>) =
Bind(observable, (fun x -> selector.Invoke(x)), (fun x -> fun y -> projection.Invoke(x, y)))
[<Extension>]
let Subscribe<'a> (observable : IObservable<'a>) (action:Action<'a>) =
let observer = { new IObserver<'a>
with member o.OnNext x = action.Invoke x }
observable.Subscribe(observer)
Subscribe to:
Posts (Atom)