In this article I want to show how to use the “Reactive Events” library RxJS to handle asynchronous event processing in AngularJS2.
In many Javascripts frameworks asynchronous events are handled by either providing a callback function to be called, or be retrieving a “Promise” object to get the data.
The RxJS libraray is taking the concept of promises even further with the interface Observable, which is a combination of a Promise and an Iterator. To be precise, an Observable describes a channel to receive an arbitrary number of events asynchronously over time.
Angular2 adapts this paradigm for all major interactions, as all asynchronous actions can be described by Observables:
- DOMĀ Events (e.g. user clicks a button)
- AJAX Requests
- Timeouts and Timers
Basic functions of Observables
Subscription
As I said earlier, the Observable interface behaves like an asynchronous channel, so the first basic thing you can do, is subscribe to the Obserable. To subscribe, you just provide up to three functions, which hande information received from the observable.
The first parameter is a function which receives all events in the Observable. Some Observable fire only once (e.g. an http-request), but others may fire repeatedly. The second (optional) paramater can handle errors in the observable (e.g. some exception was thrown inside the Observable) and the third (optional) paramater will be fired when the Observable has no more elements and is finished.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
import {Http, Response} from "@angular/http" import {Observable} from "rxjs/Observable" @Injectable() export class HttpCommunicationService { constructor(private http: Http) { } getObservable: function():Observable<Response> { return http.get("/api") } demo: function() { let observable = this.getObservable() observable.subscribe( (res: Response) => console.log("Success: " + res.json()) (error: any) => console.log("Something went wrong", error) () => console.log("all finished") ) } } |
There can be zero, one or multiple subscriptions to an Observable.
Most Observables are only evaluated, if there is an active subscription. E.g. if you perform a HTTP request to fetch a resource from the server, the request will not be executed, until there is a subscription for the request.
Mapping
If you imagine Observables as a kind of event channel, it’s no wonder you can also transform events flowing through the channel. The map() operator allows you to apply any function which transforms events in the Observable to a new form.
RxJS follows the functional paradigm here, and does not change the original Observable, but returns a new Observable from the map() operation
1 2 3 4 5 6 7 8 |
import "rxjs/add/operator/map" ... let observable: Observable<Response> = http.get("api") let dataObservable: Observable<string> = observable.map((res: Response) => { let json = res.json() return json.myData || "data missing" }) |
Note: As Angular2 does not include all RxJS operations by default, you have to import the operations in your TypeScript file (see first line)
Creating message channels
There is a kind of Observable which allows you to also put events in the channel: Subject. Just think of it as a FIFO where you can enter events, and have other components/services/processes subscribe to these events. A Subject can be created and have any event type, e.g.:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
import {Subject} from "rxjs/Subject" let subject: Subject<integer> = new Subject<integer>() // each Subject is also an Observable, so you can use map etc. let squares: Observable<integer> = subject.map((i: integer) => i * i) let squareMessages: Observable<integer> = squares.map((s: integer) => "This is a square: " + s) class ConsolePrinter { constrctur(obs: Observable<string>) { obs.subscribe((msg: string) => console.log(msg)); } } new ConsolePrinter(squareMessages); // subscribe in another service etc. // Process events asynchronously subject.next(1); // Prints: This is a square: 1 subject.next(2); // Prints: This is a square: 4 subject.next(10); // Prints: This is a square: 100 |
Creating stateful subjects
Sometime it is not only important to receive new events, but also to have access to the last received event no matter how late you subscribe. For this the class BehaviorSubject can be used.
BehaviorSubject is very similar to Subject, but it contains a value at every point in time. When you create the BehaviorSubject, you provide an initial value and each time the subject receives new data, it overwrites the current value. It also totally bahaves like a Subject and an Observable, so you can still subscribe to it and create derived Observables via map and other operators.
This kind of structure can be very useful when you need to hold a state for something (and provide this state to other, later subscribing services), and want the state to change every time a new value is received, e.g. for building caches.
1 2 3 4 5 6 7 8 9 10 11 12 |
import {BehaviorSubject} from "rxjs/BehaviorSubject" const initialColors = ["red","green","blue"] let colors = new BehavorSubject<Array<string>>(initialColors) let reload = function() { http.get("/apo/morecolors").subscribe((res: Response) => colors.next(res.json().colors)) } reload() // Will be execute immediately after subscription with the initial value, and then every time after the APi request finished books.subscribe((colors: Array<string>) => console.log("Colors:", colors)) |
Conclusion
The RxJS library holds a very useful and yet easy API for event processing, which Angular 2 makes heavy use of. After these basics of the reactive event processing, I will continue with more advanced features of RxJS and real world applications in Angular2 in one of the next articles.