RxJS - Join Operators
Luca Del Puppo
Posted on September 22, 2021
Hi Guys and Welcome Back,
Today I'll speak about the Join Operators. Some of these operators are similar to other operators shown in the Creation Operators but they are used in different contexts, ok let's start!
Flattens an Observable-of-Observables by applying combineLatest when the Observable-of-Observables completes.
import { interval, timer } from 'rxjs';
import { combineLatestAll, map, take } from 'rxjs/operators';
const A_CHAR_CODE = 65
const createSource = (index: number) =>
interval((index + 1) * 1000).pipe(
take(3),
map(value => `${String.fromCharCode(A_CHAR_CODE + index)}${String.fromCharCode(A_CHAR_CODE + value)}`)
)
console.log(`${new Date().toLocaleTimeString()}: combineLatestAll start`)
const source$ = timer(0, 1000).pipe(
take(3)
);
source$.pipe(
map(createSource),
combineLatestAll()
).subscribe({
next: val => console.log(`${new Date().toLocaleTimeString()}: combineLatestAll`, val)
});
16:22:07: combineLatestAll start
16:22:12: combineLatestAll [ 'AB', 'BA', 'CA' ]
16:22:12: combineLatestAll [ 'AC', 'BA', 'CA' ]
16:22:13: combineLatestAll [ 'AC', 'BB', 'CA' ]
16:22:15: combineLatestAll [ 'AC', 'BB', 'CB' ]
16:22:15: combineLatestAll [ 'AC', 'BC', 'CB' ]
16:22:18: combineLatestAll [ 'AC', 'BC', 'CC' ]
As you can see, this operator is the brother of the combineLatest operator discussed in a previous article RxJS - Creation Operators.
Its job is to convert an Observable of Observables to an array of values, where each item of this array is a value of an observable.
When we use this operator we need to remember that the first value emitted by the operator is when all the observables have emitted at least one value.
This operator is used when in the pipe chain we have an operator that emits a list of observables; using this operator we can flat the values in a single array including all the values emitted by these observables.
Converts a higher-order Observable into a first-order Observable by concatenating the inner Observables in order.
import { interval, timer } from 'rxjs';
import { concatAll, map, take } from 'rxjs/operators';
const createSource = (index: number) => interval(index * 1000).pipe(
take(3),
map(value => `${index}-${value + 1}`)
)
console.log(`${new Date().toLocaleTimeString()}: concatAll start`)
const source$ = timer(0, 1000).pipe(
take(3)
);
source$.pipe(
map(v => createSource(v + 1)),
concatAll()
).subscribe({
next: val => console.log(`${new Date().toLocaleTimeString()}: concatAll`, val)
});
15:38:29: concatAll start
15:38:30: concatAll 1-1
15:38:31: concatAll 1-2
15:38:32: concatAll 1-3
15:38:34: concatAll 2-1
15:38:36: concatAll 2-2
15:38:38: concatAll 2-3
15:38:41: concatAll 3-1
15:38:44: concatAll 3-2
15:38:47: concatAll 3-3
The concatAll operator concatenates an Observable of Observables and emits all the observables in sequence.
It's important to remember that when we use this operator, the observables are running one after the other, so the operator executes the first observable until it isn't completed, then it passes to the second observable, and so on.
This operator is used when we have in a pipe chain an operator that returns a list of observables and we need to execute them in sequence.
Converts a higher-order Observable into a first-order Observable by dropping inner Observables while the previous inner Observable has not yet completed.
import { interval, timer } from 'rxjs';
import { exhaustAll, map, take } from 'rxjs/operators';
const createSource = (index: number) => interval(index * 1000).pipe(
take(3),
map(value => `${index}-${value + 1}`)
)
console.log(`${new Date().toLocaleTimeString()}: exhaustAll start`)
const source$ = timer(0, 2500).pipe(
take(3)
);
source$.pipe(
map(v => createSource(v + 1)),
exhaustAll()
).subscribe({
next: val => console.log(`${new Date().toLocaleTimeString()}: exhaustAll`, val)
});
16:27:05: exhaustAll start
16:27:07: exhaustAll 1-1
16:27:08: exhaustAll 1-2
16:27:09: exhaustAll 1-3
16:27:14: exhaustAll 3-1
16:27:17: exhaustAll 3-2
16:27:20: exhaustAll 3-3
Converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables.
import { interval, timer } from 'rxjs';
import { map, mergeAll, take } from 'rxjs/operators';
const createSource = (index: number) => interval(index * 1000).pipe(
take(3),
map(value => `${index}-${value + 1}`),
)
console.log(`${new Date().toLocaleTimeString()}: mergeAll start`)
const source$ = timer(0, 1000).pipe(
take(3),
);
source$.pipe(
map(v => createSource(v + 1)),
mergeAll()
).subscribe({
next: val => console.log(`${new Date().toLocaleTimeString()}: mergeAll`, val)
});
15:41:46: mergeAll start
15:41:47: mergeAll 1-1
15:41:48: mergeAll 1-2
15:41:49: mergeAll 2-1
15:41:49: mergeAll 1-3
15:41:51: mergeAll 3-1
15:41:51: mergeAll 2-2
15:41:53: mergeAll 2-3
15:41:54: mergeAll 3-2
15:41:57: mergeAll 3-3
Converts a higher-order Observable into a first-order Observable producing values only from the most recent observable sequence
import { interval, timer } from 'rxjs';
import { map, switchAll, take } from 'rxjs/operators';
const createSource = (index: number) => interval(index * 1000).pipe(
take(3),
map(value => `${index}-${value + 1}`)
)
console.log(`${new Date().toLocaleTimeString()}: switchAll start`)
const source$ = timer(0, 1000).pipe(
take(3)
);
source$.pipe(
map(v => createSource(v + 1)),
switchAll()
).subscribe({
next: val => console.log(`${new Date().toLocaleTimeString()}: switchAll`, val)
});
15:42:48: switchAll start
15:42:49: switchAll 1-1
15:42:53: switchAll 3-1
15:42:56: switchAll 3-2
15:42:59: switchAll 3-3
Returns an observable that, at the moment of subscription, will synchronously emit all values provided to this operator, then subscribe to the source and mirror all of its emissions to subscribers.
import { of } from 'rxjs';
import { startWith } from 'rxjs/operators';
const source1$ = of(1, 2, 3).pipe(
startWith(1000)
)
source1$.subscribe({
next: val => console.log(`${new Date().toLocaleTimeString()}: startWith`, val)
});
15:43:47: startWith 1000
15:43:47: startWith 1
15:43:47: startWith 2
15:43:47: startWith 3
Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emits.
import { interval } from 'rxjs';
import { take, withLatestFrom } from 'rxjs/operators';
const source1$ = interval(1000).pipe(
take(5)
)
const source2$ = interval(2500).pipe(
take(5)
)
console.log(`${new Date().toLocaleTimeString()}: withLatestFrom start`)
source1$.pipe(
withLatestFrom(source2$)
).subscribe({
next: val => console.log(`${new Date().toLocaleTimeString()}: withLatestFrom`, val)
});
15:44:11: withLatestFrom start
15:44:14: withLatestFrom [ 2, 0 ]
15:44:15: withLatestFrom [ 3, 0 ]
15:44:16: withLatestFrom [ 4, 1 ]
Ok guys, that's all for today!
You can find the code of this article here
See you soon!
Posted on September 22, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.