RxJS 7 - Pipeable Operators
Barış BALLI
Posted on March 29, 2023
Here are my detailed notes from the fantastic course created by Jurek Wozniak, called RxJS 7 and Observables: Introduction.
In order to understand the concept please consider buying the course yourself, these notes are simply subsidiaries
Pipeable Operators
Operator Stacking
Pipe operators make some operations before an event actually reaches to an observer.
[Event Source → operator 1 → operator 2 → operator 3 → …] → observer
Important thing here is observer only have access the last value after all those operators.
This works just like javascript array utility functions.
Note: Applying a pipeable operator creates a new observable with some additional logic, it doesn’t modify the existing observable.
filter
When a value passed to the filter function it will either pass to (other operators or observable) or it will be discarded.
Even though filter function filters notifications emitted by the next, it always passes error and complete
We can call pipe method from any observable and then we can call any operator we want.
import { Observable } from 'rxjs';
import { filter } from 'rxjs/operators';
interface NewsItem {
category: 'Bussiness' | 'Sports';
content: string;
}
const newsFeed$ = new Observable<NewsItem>((subscriber) => {
setTimeout(() => {
subscriber.next({ category: 'Bussiness', content: 'A' });
}, 1000);
setTimeout(() => {
subscriber.next({ category: 'Sports', content: 'B' });
}, 3000);
setTimeout(() => {
subscriber.next({ category: 'Bussiness', content: 'C' });
}, 4000);
setTimeout(() => {
subscriber.next({ category: 'Sports', content: 'D' });
}, 5000);
setTimeout(() => {
subscriber.next({ category: 'Bussiness', content: 'E' });
}, 6000);
});
newsFeed$
.pipe(filter((item) => item.category === 'Sports'))
.subscribe((item) => console.log(item));
If we want we can extract only sport feed and use it in anywhere we want
import { Observable } from "rxjs";
import { filter } from "rxjs/operators";
interface NewsItem {
category: 'Business' | 'Sports';
content: string;
}
const newsFeed$ = new Observable<NewsItem>(subscriber => {
setTimeout(() =>
subscriber.next({ category: 'Business', content: 'A' }), 1000);
setTimeout(() =>
subscriber.next({ category: 'Sports', content: 'B' }), 3000);
setTimeout(() =>
subscriber.next({ category: 'Business', content: 'C' }), 4000);
setTimeout(() =>
subscriber.next({ category: 'Sports', content: 'D' }), 6000);
setTimeout(() =>
subscriber.next({ category: 'Business', content: 'E' }), 7000);
});
const sportsNewsFeed$ = newsFeed$.pipe(
filter(item => item.category === 'Sports')
);
newsFeed$.subscribe(
item => console.log(item)
);
map
map function works really similar to js map function
import { forkJoin } from "rxjs";
// Mike is from New Delhi and likes to eat pasta.
import { ajax } from "rxjs/ajax";
import { map } from "rxjs/operators";
const randomFirstName$ = ajax<any>('https://random-data-api.com/api/name/random_name').pipe(
map(ajaxResponse => ajaxResponse.response.first_name)
);
const randomCapital$ = ajax<any>('https://random-data-api.com/api/nation/random_nation').pipe(
map(ajaxResponse => ajaxResponse.response.capital)
);
const randomDish$ = ajax<any>('https://random-data-api.com/api/food/random_food').pipe(
map(ajaxResponse => ajaxResponse.response.dish)
);
forkJoin([randomFirstName$, randomCapital$, randomDish$]).subscribe(
([firstName, capital, dish]) =>
console.log(`${firstName} is from ${capital} and likes to eat ${dish}.`)
);
tap
tab operator works like a spy and allow us to make some side effect without interacting with the notifications.
Because of in map, the value we return becomes the input of the next chained function sometimes for debug purposes for example we can pass a tap() function in the middle and spy what is happening
import { of } from "rxjs";
import { filter, map, tap } from "rxjs/operators";
of(1, 7, 3, 6, 2).pipe(
filter(value => value > 5),
map(value => value * 2),
tap({
next: value => console.log('Spy:', value)
}),
).subscribe(value => console.log('Output:', value));
But of couse if we would use map function with curly brances and return values with return keyword, just before returning them we could also console log.
But keep in mid that tap operator is more than just a console log, it is there for making any side effects therefore it is quite a useful tool.
One more thing…
Starting from RxJS 7.3.0, the tap()
operator can do even more. You can see when the Subscription starts and ends at the level of the tap()
operator.
On top of the ‘next’, ‘error’ and ‘complete’ handlers which we have discussed above, the tap()
operator introduces three more handlers:
- subscribe — called when a new Subscription is made,
-
unsubscribe — called when the Subscription is closed by unsubscribing (calling
unsubscribe
); this will NOT be executed when ‘error’ or ‘complete’ is emitted, - finalize — called when the Subscription ends, no matter what the cause: it can be either unsubscribing, an error or a complete notification; all will cause the ‘finalize’ handler to be called.
...
tap({
subscribe: () => { console.log('New Subscription'); },
unsubscribe: () => { console.log('Unsubscribed'); },
finalize: () => { console.log('Subscription ended'); }
}),
...
debounceTime
debounceTime delays the emitted notifications and only emits the lastly emitted value after a certain threshold.
For example we have multiple notifications coming let’s say 5 notifications per second and we are only interested in with the last value. We can have such a scenario when we listen for input field, the user writes fastly and when the user stops for a bit we finally emit the value and work on it.
In this way we can avoid the excesive processing.
import { fromEvent } from 'rxjs';
import { map, debounceTime } from 'rxjs/operators';
const sliderInput = document.querySelector('input#slider');
fromEvent<any>(sliderInput, 'input')
.pipe(
debounceTime(500),
map((event) => event.target['value'])
)
.subscribe((value) => console.log(value));
In this example instead of emitting thousands of notifications it just emits 3, which is an amazing performance gain.
catchError
catchError function provides a fallback observable in case of error. It automatically pass next() and complete() functions it doesn’t interest with them.
If an error notification occurs instead of passing it to the next chained function it passes it to an observable.
And this observable maybe directly emits next() and complete() notifications, then those notifications are passed to the next chained functions.
import { Observable, of } from 'rxjs';
import { catchError } from 'rxjs/operators';
const failingHttpRequest$ = new Observable((subscriber) => {
setTimeout(() => {
subscriber.error(new Error('Timeout'));
}, 3000);
return () => console.log('Teardown');
});
console.log('App started');
failingHttpRequest$
.pipe(
catchError((error) =>
of(`Fallback value and err message: ${error.message}`)
)
)
.subscribe({
next: (value) => console.log(value),
error: (err) => console.log(err.message),
complete: () => console.log('Completed'),
});
We see that error didn’t pass to the other chained functions instead, we just create a new observable with *of* and passed the next() function also the error message.
We can also see that after of function sends a completed notification it also emits a completed notification to our main pipe.
Sometimes we may want to hide the error message or not want to do anything when error occurs, for those cases rxjs has an EMPTY observable, this observable as the name suggests is empty and just emits complete() notification.
import { EMPTY, Observable, of } from "rxjs";
import { catchError } from "rxjs/operators";
const failingHttpRequest$ = new Observable(subscriber => {
setTimeout(() => {
subscriber.error(new Error('Timeout'));
}, 3000);
});
console.log('App started');
failingHttpRequest$.pipe(
catchError(error => EMPTY)
).subscribe({
next: value => console.log(value),
complete: () => console.log('Completed')
});
Flattening Operators
flattening operators work like catch error but for next notification.
Flattening operator react to a next notification by subscribing a new observable, as long as source emit new values flattening operator will keep subscribing to new observables.
Flattening Operators Static Example
In the flattening operators first source emit a value and with that value a new observable is getting created, and it also starts to emit new values, then if we have a complete notification coming from the newly created observable, they are not emitted to the main notifications.
let’s make an example with concatMap
import { Observable, of } from 'rxjs';
import { concatMap } from 'rxjs/operators';
const source$ = new Observable((subscriber) => {
setTimeout(() => subscriber.next('A'), 2000);
setTimeout(() => subscriber.next('B'), 3000);
});
console.log('App has started');
source$
.pipe(concatMap((value) => of(1, 2)))
.subscribe((value) => console.log(value));
As we can see we totally mapped the values ‘A’ and ‘B’ to those observables.
Even though for the static numbers, it is not a really efficient use case, if we can handle creation of hot observables etc. These flattening operators can be quite useful.
Flattening Operators - Dynamic HTTP Request
Let’s this time make a ***************concatMap*************** example that will send an http request with the value provided
import { fromEvent } from "rxjs";
import { ajax } from "rxjs/ajax";
import { concatMap, map } from "rxjs/operators";
const endpointInput: HTMLInputElement = document.querySelector('input#endpoint');
const fetchButton = document.querySelector('button#fetch');
fromEvent(fetchButton, 'click').pipe(
map(() => endpointInput.value),
concatMap(value =>
ajax(`https://random-data-api.com/api/${value}/random_${value}`)
)
).subscribe(
value => console.log(value)
);
Flattening Operators - Error Handling #1
Even though the complete notifications are not passed to the main stream, the error notifications does!
So we need to handle tem carefully in order to not kill our main pipeline
import { EMPTY, fromEvent } from "rxjs";
import { ajax } from "rxjs/ajax";
import { catchError, concatMap, map } from "rxjs/operators";
const endpointInput: HTMLInputElement = document.querySelector('input#endpoint');
const fetchButton = document.querySelector('button#fetch');
fromEvent(fetchButton, 'click').pipe(
map(() => endpointInput.value),
concatMap(value =>
ajax(`https://random-data-api.com/api/${value}/random_${value}`)
),
catchError(() => EMPTY)
).subscribe({
next: value => console.log(value),
error: err => console.log('Error:', err),
complete: () => console.log('Completed')
});
We have a problem with this approach, even though we catch the error, we catch it in the main pipeline and convert it to a complete notification this also kills the main pipeline and we cannot keep searching.
Flattening Operators - Error Handling #2
In order to solve the above problem we need to chain catchError to the internal observable
import { EMPTY, fromEvent, of } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { catchError, concatMap, map } from 'rxjs/operators';
const endpointInput: HTMLInputElement =
document.querySelector('input#endpoint');
const fetchButton = document.querySelector('button#fetch');
fromEvent(fetchButton, 'click')
.pipe(
map(() => endpointInput.value),
concatMap((value) =>
ajax(`https://random-data-api.com/api/${value}/random_${value}`).pipe(
catchError((error) => of(`Could not fetch data: ${error}`))
)
)
)
.subscribe({
next: (value) => console.log(value),
error: (err) => console.log('Error:', err),
complete: () => console.log('Completed'),
});
Flattening Operators - Concurrency - concatMap
concatMap operators do not pass to the next observable until the first observable completes.
This can be a good feature since uncompleted observables would create a memory leak in long term.
concatMap always guarantees that observables will be executed in the entered order. And this is super important (think multiple httpRequests or mouse positions for example)
Flattening Operators - switchMap
switchMap provides us almost the same solution as the concatMap but with one big difference.
If for example a new value is emitted while the previous observable still didn’t provide complete notification, switchMap automatically cancels the previous observable and creates the new observable with the new value.
It creates unimportant memory leaks because program cleans this memory after the cancel operation
This way it stops memory leaks and also can act really fast, based on the situation.
Flattening Operators - mergeMap
mergeMap is also like concatMap, but it allows multiple open observables, it doesn’t cancel the previous observable like switchMap, it allows them to work concurrently,
and this is sometimes, the exact thing we need.
Let's keep in touch
Hey, thanks a lot if you read it so far.
I want you to keep in touch for more sweet content.
Please subscibe in Dev.to and other channels (🦸♂️ especially twitter)
Twitter 🐦 https://twitter.com/barisbll_dev
Linkedin 📎 https://www.linkedin.com/in/barisbll-dev/
Github 👨🏻💻 https://github.com/barisbll
Medium 📰 https://medium.com/@barisbll
Posted on March 29, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.