TIL: RxJS Observables are unicast and we can multicast them.
Pedro Osternack Corrêa
Posted on March 12, 2021
While working on an Angular + NgRx project I started to deal with RxJS Observables more and more, and with that, I've started to notice some interesting behaviours that were not making a lot of sense for me, a complete newcomer to the world of RxJS. One of them was during the debugging process on a test where I was asserting the number of times that an observable side-effect was being triggered, but I had subscribed to the observable to check the value being emitted.
// my observable
observable$ = event$.pipe(
tap((val) => this.dataService.handleEvent(val))
);
// the test
it('should use dataService to handle the eventData', () => {
// I've added this line for debugging purposes
component.observable$.subscribe((eventVal) => console.log(eventVal));
// this was falling because the handleEvent was being called twice
expect(mockDataService.handleEvent).toHaveBeenCalledTimes(1);
});
I've noticed that that test started to fail when I've added my new subscription, and would start to work again when I remove that line. That's when the concept of observable being unicast
finally made sense in my head. What I was seeing was the because each of my subscribers to observable$
were getting their own copy
of the observable chain
or Producer
(think of everything inside of the observable pipe
call). So since I had 2 subscribers to my observable, every time the event$
observable emitted a value, the tap
on the observable$
would be called twice, one for each subscriber.
A more concrete example
Consider the following block of code:
// creating an observable that emits once every 2 seconds.
// we want to use on the 3 first values
// and every time a value is emitted we will log a random number in the console
const observable$ = interval(2000).pipe(
take(3),
tap(() => console.log(Math.random()))
);
observable$.subscribe(
val => console.log(`subscriber 1: value = ${val}`)
);
observable$.subscribe(
val => console.log(`subscriber 2: value = ${val}`)
);
This code will generate the following output on the console:
0.9843346569918552
subscriber 1: value = 0
0.34568357780332915
subscriber 2: value = 0
0.6003854545732459
subscriber 1: value = 1
0.12901900745674388
subscriber 2: value = 1
0.3773574643703079
subscriber 1: value = 2
0.5661793730325613
subscriber 2: value = 2
So we get one random number (the tap
of the original observable) for each execution of one of our subscribers. That's because each subscriber has their own version of the producer and its values (or the observable chain as I called before).
Multicasting our values
RxJS offers some operators that allow us to turn our regular unicast observables into multicast, which means that all subscribers will share the same instance of the producer and its values.
Two of them are share
and shareReplay
. There are others, but I find these to be the most useful ones for my use cases.
If we change the previous block of code and add the share
operator to our observable like this:
const observable$ = interval(2000).pipe(
take(3),
tap(() => console.log(Math.random())),
share()
);
observable$.subscribe(
val => console.log(`subscriber 1: value = ${val}`)
);
observable$.subscribe(
val => console.log(`subscriber 2: value = ${val}`)
);
It will now produce the following output in the console:
0.15433905642184453
subscriber 1: value = 0
subscriber 2: value = 0
0.6301263674328053
subscriber 1: value = 1
subscriber 2: value = 1
0.20325573662904373
subscriber 1: value = 2
subscriber 2: value = 2
So now we have one random number for each execution of all of our subscribers. That's because now our producer is the same for all of our subscribers.
shareReplay
does the same but it also replays the last N emitted values for new subscribers. Let's take a look at another example to compare them.
ex.:
// share
const observable$ = new Subject().pipe(
map(() => Math.random()),
share()
);
observable$.subscribe(
val => console.log(`subscriber 1: value = ${val}`)
);
observable$.subscribe(
val => console.log(`subscriber 2: value = ${val}`)
);
// lets emit one value
observable$.next();
// creating a third "late" subscriber
observable$.subscribe(
val => console.log(`subscriber 3: value = ${val}`)
);
// emitting a new value
observable$.next();
This will produce the following output:
subscriber 1: value = 0.20616823116285787
subscriber 2: value = 0.20616823116285787
subscriber 1: value = 0.3700900273970813
subscriber 2: value = 0.3700900273970813
subscriber 3: value = 0.3700900273970813
So everyone is getting the same value as expected. Also, the third subscriber only received the value emitted after its creation. We can use shareReplay
to cache the last emitted value (or N last values) so every new subscriber can process it.
Let's change the example above and replace the share
operator with the shareReplay
one.
// shareReplay
const observable$ = new Subject().pipe(
map(() => Math.random()),
shareReplay(1) // we can define how many values we want to cache and emit to our new subscribers, in this case we will keep only the last one.
);
observable$.subscribe(
val => console.log(`subscriber 1: value = ${val}`)
);
observable$.subscribe(
val => console.log(`subscriber 2: value = ${val}`)
);
// lets emit one value
observable$.next();
// creating a third "late" subscriber
observable$.subscribe(
val => console.log(`subscriber 3: value = ${val}`)
);
// emitting a new value
observable$.next();
Now our code will generate the following:
subscriber 1: value = 0.990542441625698
subscriber 2: value = 0.990542441625698
subscriber 3: value = 0.990542441625698
subscriber 1: value = 0.8445875342331315
subscriber 2: value = 0.8445875342331315
subscriber 3: value = 0.8445875342331315
As we can see, the third observable received the first emitted value even though it was late to the party.
This is all for now, I hope this can be helpful to someone else.
Cheers and bye for now.
References:
https://rxjs.dev/api/operators/share
https://rxjs.dev/api/operators/shareReplay
Posted on March 12, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.