Reactive Extensions Or Rx (More On IEnumerable, IQueryable, IObservable and IQbservable) - Awesome Libraries For C# Developers #2

By Anoop Madhusudanan

Vote on HN

In my last post – we had a look at Interactive Extensions. In this post, we’ll do a recap of Reactive Extensions and LINQ to Event streams.

imageReactive Extensions are out there in the wild for some time, and I had a series about Reactive Extensions few years back. How ever, after my last post on Interactive Extensions, I thought we should discuss Reactive extensions in a bit more detail. Also, in this post we’ll touch IQbservables – the most mysteriously named thing/interface in the world, may be after Higgs Boson. Push and Pull sequences are everywhere – and now with the devices on one end and the cloud at the other end, most of the data transactions happen via push/pull sequences. Hence, it is essential to grab the basic concepts regarding the programming models around them.

First Things First

Let us take a step back and discuss IEnumerable and IQueryable first, before discussing further about Reactive IObservable and IQbservable (Qbservables = Queryable Observables – Oh yea, funny name).

IEnumerable<T>

As you may be aware, the IEnumerable model can be viewed as a pull operation. You are getting an enumerator, and then you iterate the collection by moving forward using MoveNext on a set of items till you reach the final item. And Pull models are useful when the environment is requesting data from an external source. To cover some basics - IEnumerable has a GetEnumerator method which returns an enumerator with a MoveNext() method and a Current property. Offline tip - A C# for each statement can iterate on any dumb thing that can return a GetEnumerator.  Anyway, here is what the non generic version of IEnumerable looks like.

public interface IEnumerable
{
    IEnumerator GetEnumerator();
}

public interface IEnumerator
{
    Object Current {get;}
    bool MoveNext();
    void Reset();
}

Now, LINQ defines a set of operators as extension methods, on top of the generic version of IEnumerable – i.e,  IEnumerable<T>  - So by leveraging the type inference support for Generic Methods, you can invoke these methods on any IEnumerable with out specifying the type. I.e, you could say someStringArray.Count() instead of someStringArray.Count<String>(). You can explore Enumerable class to find these static extensions.

The actual query operators in this case (like Where, Count etc) with related expressions are compiled to IL, and they operate in process much like any IL code is executed by CLR. From an implementation point of view, the parameters of LINQ clauses like Where is a lambda expression (As you may be already knowing, the from.. select is just Syntax sugar that gets expanded to extension methods of IEnumerable<T>), and in most cases a delegate like Func<T,..> can represent an expression from an in memory perspective. But what if you want apply query operators on items sitting some where else? For example, how to apply LINQ operators on top of a set of data rows stored in a table in a database that may be in the cloud, instead of an in memory collection that is an IEnumerable<T>? That is exactly what IQueryable<T> is for.

IQueryable<T>

IQueryable<T> is an IEnumerable<T> (It inherits from IEnumerable<T>) and it points to a query expression that can be executed in a remote world. The LINQ operators for querying objects of type IQueryable<T> are defined in Queryable class, and returns Expression<Func<T..>> when you apply them on an IQueryable<T>, which is a System.Linq.Expressions.Expression (you can read about expression trees here). This will be translated to the remote world (say a SQL system) via a query provider. So, essentially, IQueryable concrete implementations points to a query expression and a Query Provider – it is the job of Query Provider to translate the query expression to the query language of the remote world where it gets executed. From an implementation point of view, the parameters you pass for LINQ that is applied on an IQueryable is assigned to an Expression<T,..> instead. Expression trees in .NET provides a way to represent code as data or kind of Abstract Syntax Trees. Later, the query provider will walk through this to construct an equivalent query in the remote world.

    public interface IQueryable : IEnumerable {       
        Type ElementType { get; }
        Expression Expression { get; }
        IQueryProvider Provider { get; }
    }
    public interface IQueryable<T> : IEnumerable<T>, IQueryable, IEnumerable {
       ..
    }

For example, in LINQ to Entity Framework or LINQ to SQL, the query provider will convert the expressions to SQL and hand it over to the database server. You can even view the translation to the target query language (SQL), just by looking at the  Or in short, the LINQ query operators you apply on IQueryable will be used to build an expression tree, and this will be translated by the query provider to build and execute a query in a remote world. Read this article if you are not clear about how an expression trees are built using Expression<T> from Lambdas. 

Reactive Extensions

So, now let us get into the anatomy and philosophy of observables.

IObservable <T>

As we discussed, objects of type IEnumerable<T>  are pull sequences. But then, in real world, at times we push things as well – not just pull. (Health Alert – when you do both together, make sure you do it safe). In  a lot of scenarios, push pattern makes a lot of sense – for example, instead of you waiting in a queue infinitely day and night with your neighbors in front of the local post office to collect snail mails, the post office agent will just push you the mails to your home when they arrive.

Now, one of the cool things about push and pull sequences are, they are duals. This also means, IObservable<T> is a dual of IEnumerable<T> – See the code below. So, to keep the story short, the dual interface of IEnumerable, derived using the Categorical Duality is IObservable. The story goes like some members in Erik’s team (he was with Microsoft then) had a well deserved temporal meglomaniac hyperactive spike when they discovered this duality. Here is a beautiful paper from Erik on that if you are more interested – A brief summary of Erik’s paper is below.

//Generic version of IEnumerable, ignoring the non generic IEnumerable base

interface IEnumerable<out T>
{
	IEnumerator<T> GetEnumerator();
}

interface IEnumerator<out T>: IDisposable
{
	bool MoveNext(); // throws Exception
	T Current { get; } 
}


//Its dual IObservable

interface IObservable<out T>
{
	IDisposable Subscribe(IObserver<T> observer);
}

interface IObserver<in T>
{
	void OnCompleted(bool done);
	void OnError(Exception exception);
	T OnNext { set; } 
}

Surprisingly, the IObservable implementation looks like the Observer pattern.

Now, LINQ operators are cool. They are very expressive, and provide an abstraction to query things. So the crazy guys in the Reactive Team thought they should take LINQ to work against event streams. Event streams are in fact push sequences, instead of pull sequences. So, they built IObservable. IObservable fabric lets you write LINQ operators on top of push sequences like event streams, much like the same way you query IEnumerable<T>.  The LINQ operators for an object of type IObservable<T> are defined in Observable class. So, how will you implement a LINQ operator, like where, on an observer to do some filtering? Here is a simple example of the filter operator Where for an IEnumerable and an IObservable (simplified for comparison). In the case of IEnumerable, you dispose the enumerator when we are done with traversing.

 //Where for IEnumerable

        static IEnumerable<T> Where<T>(IEnumerable<T> source, Func<T, bool> predicate)
        {
            // foreach(var element in source)
            //   if (predicate(element))
            //        yield return element;
                    
            using (var enumerator = source.GetEnumerator())
            {
                while (enumerator.MoveNext())
                {
                    var value= enumerator.Current;
                    if (predicate(value))
                    {
                        yield return value;
                    }
                }
            }
        }

//Where for IObservable

        static  IObservable<T> Where<T>(this IObserver<T> source, Func<T, bool> predicate)
        {
           return Observable.Create<T>(observer =>
               {
                   return source.Subscribe(Observer.Create<T>(value =>
                       {
                           try
                           {
                               if (predicate(value)) observer.OnNext(value);
                           }
                           catch (Exception e)
                           {
                               observer.OnError(e);
                           }
                       }));
               });
       }

Now, look at the IObservable’s Where implementation. In this case, we return the IDisposable handle to an Observable so that we can dispose it to stop  subscription. For filtering, we are simply creating an inner observable that we are subscribing to the source to apply our filtering logic inside that - and then creating another top level observable that subscribes to the inner observable we created. Now, you can have any concrete implementation for IObservable<T> that wraps an event source, and then you can query that using Where!! Cool. Observable class in Reactive extensions has few helper methods to create observables from events, like FromEvent. Let us create an observable, and query the events now. Fortunately, the Rx Team already has the entire implementation of Observables and related Query operators so that we don’t end up in writing customer query operators like this.

You can do a nuget for install-package Rx-Main  to install Rx, and try out this example that shows event filtering.

  //Let us print all ticks between 5 seconds and 20 seconds
            
            //Interval in milli seconds
            var timer = new Timer() { Interval = 1000 };
            timer.Start();

            //Create our event stream which is an Observable
            var eventStream = Observable.FromEventPattern<ElapsedEventArgs>(timer, "Elapsed");
            var nowTime = DateTime.Now;

            //Same as eventStream.Where(item => ...);

            var filteredEvents = from e in eventStream
                                 let time = e.EventArgs.SignalTime
                                 where
                                     time > nowTime.AddSeconds(5) &&
                                     time < nowTime.AddSeconds(20)
                                 select e;

            //Subscribe to our observable
            filteredEvents.Subscribe(t => Console.WriteLine(DateTime.Now));

            Console.WriteLine("Let us wait..");
	    
            //Dispose filteredEvents explicitly if you want
            Console.ReadKey();

Obviously, in the above example, we could’ve used Observable.Timer – but I just wanted to show how to wrap an external event source with observables. Similarly, you can wrap your Mouse Events or WPF events.  You can explore more about Rx and observables, and few applications here. Let us move on now to IQbservables.

IQbservable<T>

Now, let us  focus on IQbservable<T>. IQbservable<T> is the counterpart to IObserver<T> to represent a query on push sequences/event sources as an expression, much like IQueryable<T> is the counterpart of IEnumerable<T>. So, what exactly this means?  If you inspect IQbservable, you can see that

public interface IQbservable<out T> : IQbservable, IObservable<T>
    {
    }

    public interface IQbservable
    {
        Type ElementType { get; }
        Expression Expression { get; }
        IQbservableProvider Provider { get; }
    }

You can see that it has an Expression property to represent the LINQ to Observable query much like how IQueryable had an Expression to represent the AST of a LINQ query. The IQbservableProvider is responsible for translating the expression to the language of a remote event source (may be a stream server in the cloud).

Conclusion

This post is a very high level summary of Rx Extensions, and here is an awesome talk from Bart De Smet that you cannot miss.

And let me take the liberty of embedding the drawing created by Charles that is a concrete representation of the abstract drawing Bart did in the white board. This is the summary of this post.

representation of the three dimensional graph of Rx's computational fabric

We’ll discuss more practical scenarios where Rx and Ix comes so handy in future – mainly for device to cloud interaction scenarios, complex event processing, task distribution using ISheduler etc - along with some brilliant add on libraries others are creating on top of Rx. But this one was for a quick introduction. Happy Coding!!

© 2012. All Rights Reserved. Amazedsaint.com