avatar

Derek Zeng

A programmer

Play with Reactive programming (1)

by coderek

Rxjs is a popular library now. I used it when working on my Radio Player project. The Angular uses it as a way to pass data between components. Because the data passed can be async and continuous, it's quite important to have a consistent way to handle them together with ordinary data source.

With Rxjs, the data source can be packaged as an observable object and passed around. The data source could be one time data produced synchronously by function computation, or async API call, or an UI element that produces a series of UI actions in non-deterministic intervals, or a mix of them. The subscribers or observers do not need to care about the nature of the source. To handle the emitted data, it just need to supply a callback when subscribing the source.

The idea sounds similar to Promise. However, Promise has its major limitation when handling streaming data which is the selling point of Rxjs. So I would say Rxjs is a super-set of Promise, plus a lot of handy library tools to work with such data sources. For non-trivial applications, I highly recommend using Rxjs in place of Promises.

As an example of how useful Rxjs can be, I want to discuss how I implment the random-play feature for my Radio Player. The server streams meta data to the Player whenever a new program starts,. E.g. a song or talk show. I have a "thread" listening to this streaming of data and updates the display accordingly. Something like this in my radio.service.ts:

this.socket = Observable.webSocket(wsSrc);
this.nextProgram = this.socket.filter(meta=>meta!==undefined).map(meta=> {
    if (isEmpty(meta.coverUrl)) {
        meta.coverUrl = this.getDefaultCover(this.station);
    }
    if (this.station) {
        meta.stationName = this.station.get('name');
    }
    return meta;
})

Rxjs provides a webSocket class that I can subscribe to. This method abstract away the open event for me. I only need to subscribe it and start receiving messages from server. filter and map method works just like it for normal JS array.

When random playing mode is on, I want to switch to a random station automatically. And this switch should only happen when a program ends. Thus I have to listen to the meta source in order to know a program ends. But I do not want to mix this random "thread" with the display "thread", so I would like to abstract it to a separate "thread".

private randomRequest = new Subject<void>();

...
this.nextProgram = this.nextProgram.do(program=> {
    if (previousProgram === null || previousProgram.title !== program.title) {
        if (this.getPreferenceFromLocalStorage().playRandom) {
            this.randomRequest.next(); // push data to subject
        }
    }
});

this.randomRequest.subscribe(this.switchRandomStation.bind(this));

Here, I create a Rxjs Subject. The Subject is basically an Observable that you can push and pull. The Subject doesn't produce any real data here. But it still triggers the subscribers' callback function which is switchRandomStation. This is the same as event trigger mechanism in DOM. Now I can pass this.randomRequest to my radio controller and this.nextProgrm to my display component separately even they are essetionally from the same data source.

Notice that the do method returns an Obervable. do is an operator in rxjs. There are many such operators that can be chained to modify the source Observable. Since I need to assign the modified Obervable later, I need to keep a reference of the latest Observable.

Later I find that, when I first connect to websocket server, it may not start sending messages immediately, so my reactive display component will show blank view temporarily until receiving the first message. It's easy to fix this in Rxjs. Just manually add an empty message to the stream.

...
this.nextProgram = Observable.just(<ProgramMeta>{}).concat(this.nextProgram);    
...

How simple is that!

I'll look at RxJava in my next post.

(End of article)