Show Buttons
Share On Facebook
Share On Twitter
Share On Google Plus
Share On Linkdin
Share On Reddit
Share On Stumbleupon
Contact us
Hide Buttons

Getting started with Rxjs and streams

When I first started learn­ing about Rxjs observ­ables and streams, I found it really dif­fi­cult to under­stand the flow of data across the sys­tem. Not because its dif­fi­cult, but mainly because it is sim­ple yet dif­fer­ent. In this arti­cle, I will try to explain my per­cep­tion of the con­cept of streams and observ­ables. Hope­fully, it would assist you in your under­stand­ing and usage of this pow­er­ful library.

Whats in a stream?

The sim­plest way to think about a stream is in terms of a timeline.

Its easy to con­cep­tu­al­ize a stream as a time­line on which events occur.

To visu­al­ize a stream, we usu­ally cre­ate a marble diagram as follows

Stream: ---x------x-----x--------x--x---

In the above dia­gram, the mark­ers x are are con­sid­ered as data points. Hence a stream is clearly noth­ing else but data points sep­a­rated by time.

Now think about that for a moment. What can a stream rep­re­sent in the real world?
Well, the answer is quite sim­ple. A stream can eas­ily rep­re­sent asyn­cro­nous data arriv­ing from a source. The best exam­ples would be — users click­ing on a but­ton, ajax responses, an inter­val etc.

How to cre­ate a stream in Rxjs?

To cre­ate a stream, you have to do two things
– a) define the source of its data.
– b) emit the data.

RXjs pro­vides you sev­eral util­ity func­tions to cre­ate streams from com­monly event sources like — but­ton clicks, mouse scrolls, timed inter­vals etc. In the fol­low­ing exam­ple, we will see how to cre­ate an observ­able stream using the Observable.create func­tion which cre­ates an stream based upon a func­tion which acts as a dataSource.

function dataSource(observer) {; // Emit this value instantaneously

  setTimeout(function () {
    // Emit this value after some time;
    observer.complete(); // Indicate that there will be no more data
  }, 1000);

let numberStream$ = Rx.Observable.create(dataSource);

As you can see in the above exam­ple, the func­tion dataSource must have a cer­tain sig­na­ture in order to pro­duce data points on the stream.
– The first argu­ment it receives is con­sid­ered the observer — i.e. the receiver of the data.
– In order to pro­duce data, you must call the next() method on the observer.
– When there is no more data, the data source can indi­cate com­ple­tion by invok­ing the complete() method.

TIP: The $ suf­fix in the vari­able numberStream$ is just a con­ven­tion for nam­ing streams.

Now that we have a stream, all we have to do is sub­scribe to the stream. You can do so by, guess what, the subscribe() func­tion which is avail­able on the stream itself.

function logger(data) { console.log(data); }

The above code can be read as — When data is pro­duced on the numberStream$, the logger is inter­ested in receiv­ing that data. The log­ger func­tion, is there­fore the observer.

Cre­at­ing new streams from exist­ing streams

The other inter­est­ing thing about data streams is that you can cre­ate new streams out of exist­ing ones. What that means is — when­ever a data point arrives on a stream, you can cre­ate another stream from it by writ­ing another tran­for­ma­tion func­tion that pro­duces a cor­re­spond­ing data point.

Lets see an example.

// Create a stream of clicks.
var source$ = Rx.Observable.fromEvent(document.querySelector('body'), 'click');

// Create another stream of the x coordinate of the clicks.
xCoordinate$ = source$.map(function(e) { return e.x; });

// logs the click event object
source$.subscribe(data => console.log(data) );

// logs the x coordinates
xCoordinate$.subscribe(data => console.log(data) );

A mar­ble dia­gram of the above streams would be rep­re­sented as follows

source$:     ---e---e----e-e----

xCoordinate$:     x---x----x-x---

As seen in the exam­ple above, we used an rxjs oper­a­tor called map to cre­ate a new stream from an exist­ing stream. The map oper­a­tor takes a func­tion as an argu­ment, and invokes it when­ever data arrives on the under­ly­ing stream(source$). It then cre­ates a stream that con­tains the val­ues returned by our trans­for­ma­tion func­tion. In our case, the trans­for­ma­tion func­tion returns the x coor­di­nate of the event.

The inter­est­ing part here is that the orig­i­nal stream source$ is left intact, which as seen in the exam­ple above can be sub­scribed to inde­pen­dent of the newly cre­ated xCoordinate$ stream.

Now that you know how to cre­ate sim­ple streams, in the next arti­cle, I will cover the nature of exe­cu­tion of streams to lay the ground­work for more advanced con­cepts like sub­jects and stream combination.

Ryan Sukale

Ryan is a UX engineer living in San Francisco, California.