Deep Dive with RxJS in Angular
Mohammad Quanit
Posted on March 22, 2020
Before we deep dive in RxJS or Reactive Extension For Javascript in Angular, we should know what exactly is RxJS. RxJs is a Powerful Javascript library for reactive programming using the concept of Observables. It is one of the hottest libraries in web development, Offering a powerful, functional approach for dealing with events and with integration points into a growing number of frameworks, libraries, and utilities, the case for learning Rx has never been more appealing.
According to its Documentation
Think of RxJS as Lodash for events.
ReactiveX or RxJS works internally with Observer Pattern in which an Object, we call as Subject maintains it's dependencies and notifies when any of its state changes.
Why RxJS
As RxJS, follows functional programming fundamentals, it provides every type of Pure Function for events. This simply means your code is less prone to errors. Normally we create impure functions that could possibly mess up your code when it grows.
Streams
RxJS works as Streams for your app on any event. Streams are basically the definition of Observables which we cover right after it. Stream API allows you to get a sequence of data in the form of chunks, where we usually get large data from API in little pieces of data. RxJS Stream itself contains many sub-API's which makes it easier for everyday tasks related to web APIs like mouse events, Keyboard events or any kind of data that is coming right from the backend services.
Now let's move onto some basic concepts which RxJS is based on for async event management.
Observables
As we have discussed above, Observables are a definition or declaration of Streams and by its means is that it is a collection of future events or values, which we get continuously from time to time. You can create an observable from nearly anything, but the most common use case in RxJS is from events. The easiest ways to create Observables is by using built-in functions provided by RxJS. Angular ships this cool library by default so you don't need to install it explicitly.
Let's see a code snippet:
Note: Try code snippets online in ng-run.com so you don't have to create angular project just for these snippets.
import { Component, VERSION, OnInit } from '@angular/core';
import { interval, fromEvent } from "rxjs"; // <----------- importing rxjs lib
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
ngOnInit() {
const interval$ = interval(2000); //<-- interval func. same as setinterval in vanilla javascript
interval$.subscribe(val => console.log(val)) // subscribed to listen our stream of numbers
}
}
After running this code, open chrome debugging tools by pressing the F-12
key and check the console tab. You will see numbers after 2 seconds of delay.
You have noticed that I have created a constant variable interval$
, and you may be wondering why I added $
with the variable name. It's just an standard for Observables means that this variable is an Observable.
Let's see another simple code example:
import { Component, VERSION, OnInit } from '@angular/core';
import { interval, fromEvent } from "rxjs"; // <----------- importing rxjs lib
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
ngOnInit() {
const clickEvt$ = fromEvent(document, 'click');
clickEvt$.subscribe(evt => console.log(evt))
}
}
After executing this code, when you click anywhere on the browser document, you'll see mouse click event
on console as it creates an Stream of click events to listen on every click.
Subscription
Subscription is what sets everything in motion. We could say that it's the execution of Observable, where you get to subscribe to events and map or transform data as you want. To create a subscription, you call the subscribe method, supplying a function (or object) - also known as an observer. A Subscription has one important method known as unsubscribe()
which takes no argument and is responsible for disposing / exiting of subscription. In previous versions of RxJS, Subscription was called "Disposable".
import { Component, OnInit } from '@angular/core';
import { fromEvent } from "rxjs";
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
name = 'Angular';
ngOnInit() {
const clickEvt$ = fromEvent(document, 'click');
clickEvt$.subscribe(evt => console.log(evt))
}
}
In the above code snippet, we set up a click event listener on anywhere on the document, then we passed the subscribe method on each click of the document and then it returns an object with Unsbscribe which contains clean up logic, like removing events.
It's important to note that each subscription will create it's own execution context which means calling subscribe
method a second time will create a new event listener
import { Component, OnInit } from '@angular/core';
import { fromEvent } from "rxjs";
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
name = 'Angular';
ngOnInit() {
const clickEvt$ = fromEvent(document, 'click');
const keyUpEvt$ = fromEvent(document, 'keyup');
clickEvt$.subscribe(evt => console.log(evt));
keyUpEvt$.subscribe(evt => console.log(evt));
}
}
Subscriptions create one on one, one-sided conversation between the Observable & Observer, which also known as Unicasting. It's worth noting that when we discuss an Observable source emitting data to observers, this is a push-based model. The source doesn't know or care what subscribers do with the data, it simply pushes it down the line.
Operators
RxJS is incomplete without its operators, even though Observables are the foundation. Operators are some pure functions in RxJS, which is responsible for manipulating data from source returning an Observable of the transformed values. Many of the RxJS operators are similar to vanilla javascript functions like map
for Arrays. Here's what it looks like in Rxjs code:
import { Component, OnInit } from '@angular/core';
import { fromEvent, of } from "rxjs";
import { map } from "rxjs/operators";
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
name = 'Angular';
ngOnInit() {
const transformedData = of(1,2,3,4,5,6)
.pipe(map((val: any) => val * 5))
.subscribe(data => console.log(data));
}
}
You'll see all these numbers are multiplied with 5
in subscription, and if you console transformedData
, it will show that specific Observable.
There are many sheer numbers of Operators which could be overwhelming at first as you starting to learn RxJS. We Obviously don't cover all these operators but will provide details of mostly used ones which you could probably use in your applications.
Let's start with the most common one,
Pipe
The Pipe function is the assembly line from your observable data source through your operators. It's for using multiple operators within an observable chain, contained within the pipe function. We can implement multiple operators in the pipe
function for better readability.
import { Component, OnInit } from '@angular/core';
import { fromEvent, of } from "rxjs";
import { map } from "rxjs/operators";
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
name = 'Angular';
ngOnInit() {
const transformedData = of(1,2,3,4,5,6)
.pipe(map((val: any) => val * 5))
.subscribe(data => console.log(data));
}
}
Of
Another most common & simplest RxJS operator is Of
function. It simply emits each value in a sequence from a source of data and then emits a complete notification.
official marble image from rxjs official site
Code Snippet for Of
operator
import { Component, OnInit } from '@angular/core';
import { of } from "rxjs";
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
name = 'Angular';
ngOnInit() {
const person = { name: 'John Doe', age: 22 }; //<-- simple object
const personObs = of(person); //<-- convert object into stream
personObs.subscribe(data => console.log(data)) //<-- execute observable
}
}
There are 6 types of operators that RxJS based upon.
1) Creation Operators
2) Combination Operators
3) Error Handling Operators
4) Filtering Operators
5) MultiCasting Operators
6) Transforming Operators
Creation Operators
Creation operators are functions that can be used to create Observable from any other data type or convert it into an Observable, like in the above example we did. From generic to specific use-cases you are free, and encouraged, to turn everything into a stream. There are many other operators included in Creation Operators.
Here's an example of Simple Creation Operators with RxJS Ajax module:
import { Component, VERSION, OnInit } from '@angular/core';
import { ajax } from 'rxjs/ajax';
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
name = 'Angular ' + VERSION.full;
githubUsers = `https://api.github.com/users`;
users = ajax({ url: this.githubUsers, method: "GET" })
ngOnInit() {
const subscribe = this.users.subscribe(
res => console.log(res.response),
err => console.error(err)
);
}
}
Combination Operators
Combination Operators also known as Join Operators allows the join of data from multiple observables. Emitted values are the primary variation among these operators. There are many other operators included in Combination Operators.
Here's the example of the most common combination operator,
import { Component, VERSION, OnInit } from '@angular/core';
import { fromEvent, interval } from 'rxjs';
import { map, combineAll, take } from 'rxjs/operators';
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
name = 'Angular ' + VERSION.full;
ngOnInit() {
const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
map(
ev => interval(Math.random() * 2000).pipe(take(3))
), take(2)
);
const result = higherOrder.pipe(combineAll())
result.subscribe(data => console.log(data));
}
}
In this example, we have combined the result of clicks
and higherOrder
observables and show it into the console by subscribing result
observable.
Error Handling Operators
Errors are an unfortunate side-effect of development. These operators provide effective ways to gracefully handle errors and retry logic, should they occur. Some of the other operators are included in Error Handling Operators.
Here's the example of the catchError
handling operator, which catches errors on the observable to be handled by returning a new observable or throwing an error.
import { Component, VERSION, OnInit } from '@angular/core';
import { of } from 'rxjs';
import { map, catchError } from 'rxjs/operators';
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
name = 'Angular ' + VERSION.full;
ngOnInit() {
of(1, 2, 3, 4, 5).pipe(
map(num => {
if (num == 4) throw 'Four!'
return num
}),
catchError(err => of('I', 'II', 'III', 'IV', 'V')),
)
.subscribe(data => console.log(data))
}
}
Filtering Operators
The filtering operators provide techniques for accepting - or declining - values from an observable source and dealing with the build-up of values within a stream. This operator is similar to Array.prototype.filter
, which yields true for emitted values.
Here's the simplest filter
operator example from RxJS,
import { Component, VERSION, OnInit } from '@angular/core';
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
name = 'Angular ' + VERSION.full;
ngOnInit() {
const source = from([
{ name: 'Joe', age: 31 },
{ name: 'Bob', age: 25 }
]);
//filter out people with age under 30
const example = source.pipe(filter(person => person.age >= 30));
//output: "Over 30: Joe"
const subscribe = example.subscribe(val => console.log(`Over 30: ${val.name}`))
}
}
Multicasting Operators
In RxJS observables are cold, or unicast (one source per subscriber) by default. These operators can make an observable hot, or multicast, allowing side-effects to be shared among multiple subscribers.
Example of multicast
operator with standard Subject,
import { Component, VERSION, OnInit } from '@angular/core';
import { Subject, interval, ConnectableObservable } from 'rxjs';
import { take, tap, multicast, mapTo } from 'rxjs/operators';
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
name = 'Angular ' + VERSION.full;
ngOnInit() {
//emit every 2 seconds, take 5
const source = interval(2000).pipe(take(5));
const example = source.pipe(
//since we are multicasting below, side effects will be executed once
tap(() => console.log('Side Effect #1')),
mapTo('Result!')
);
//subscribe subject to source upon connect()
const multi = example.pipe(multicast(() => new Subject())) as ConnectableObservable<number>;
/*
subscribers will share source
output:
"Side Effect #1"
"Result!"
"Result!"
...
*/
const subscriberOne = multi.subscribe(val => console.log(val));
const subscriberTwo = multi.subscribe(val => console.log(val));
//subscribe subject to source
multi.connect()
}
}
Here in the above example we use connectObservable<number>
as type for our pipe
function because pipe
function only returns an Observable
but mutlicast
operator returns connectObservable
, so that's how we get connect
funtion with multi
named observable. Here you can leran more about Connectable Observable
Transformation Operators
Transforming values as they pass through the operator chain is a common task. These operators provide transformation techniques for nearly any use-case you will encounter. In some of our examples above we used some of the transformation operators like mapTo
, map
, scan
& mergeMap
. Here are all the operators in transformation operators.
Let's see an example of the most common transformation operator,
import { Component, VERSION, OnInit } from '@angular/core';
import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { mergeMap } from 'rxjs/operators';
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
name = 'Angular ' + VERSION.full;
ngOnInit() {
// free api url
const API_URL = 'https://jsonplaceholder.typicode.com/todos/1';
// streams
const click$ = fromEvent(document, 'click');
click$
.pipe(
/*
* Using mergeMap for example, but generally for GET requests
* you will prefer switchMap.
* Also, if you do not need the parameter like
* below you could use mergeMapTo instead.
* ex. mergeMapTo(ajax.getJSON(API_URL))
*/
mergeMap(() => ajax.getJSON(API_URL))
)
// { userId: 1, id: 1, ...}
.subscribe(console.log);
}
}
Here's on above example, we are merging our click$
observable with response which we are getting from ajax.getJSON()
. When we click on anywhere on document, we will get a response from API in console.
Here are all the main operators, described in this article, and I hope you learned something new regarding RxJS. Here are some more resources of RxJS,
https://www.learnrxjs.io/
https://rxjs.dev/
https://www.learnrxjs.io/learn-rxjs/recipes
https://www.youtube.com/playlist?list=PL55RiY5tL51pHpagYcrN9ubNLVXF8rGVi
If you like it please share it in your circle and follow me for more of this kinda brief article.
Peace ✌️✌️✌️
Posted on March 22, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.