TIL: RxJS Observables are unicast and we can multicast them.

pedrostc

Pedro Osternack Corrêa

Posted on March 12, 2021

TIL: RxJS Observables are unicast and we can multicast them.

While working on an Angular + NgRx project I started to deal with RxJS Observables more and more, and with that, I've started to notice some interesting behaviours that were not making a lot of sense for me, a complete newcomer to the world of RxJS. One of them was during the debugging process on a test where I was asserting the number of times that an observable side-effect was being triggered, but I had subscribed to the observable to check the value being emitted.

// my observable
observable$ = event$.pipe(
  tap((val) => this.dataService.handleEvent(val))
);

// the test
it('should use dataService to handle the eventData', () => {
  // I've added this line for debugging purposes 
  component.observable$.subscribe((eventVal) => console.log(eventVal));

  // this was falling because the handleEvent was being called twice
  expect(mockDataService.handleEvent).toHaveBeenCalledTimes(1); 
});
Enter fullscreen mode Exit fullscreen mode

I've noticed that that test started to fail when I've added my new subscription, and would start to work again when I remove that line. That's when the concept of observable being unicast finally made sense in my head. What I was seeing was the because each of my subscribers to observable$ were getting their own copy of the observable chain or Producer (think of everything inside of the observable pipe call). So since I had 2 subscribers to my observable, every time the event$ observable emitted a value, the tap on the observable$ would be called twice, one for each subscriber.

A more concrete example

Consider the following block of code:

// creating an observable that emits once every 2 seconds.
// we want to use on the 3 first values
// and every time a value is emitted we will log a random number in the console
const observable$ = interval(2000).pipe(
  take(3),
  tap(() => console.log(Math.random()))
);

observable$.subscribe(
  val => console.log(`subscriber 1: value = ${val}`)
);
observable$.subscribe(
  val => console.log(`subscriber 2: value = ${val}`)
);
Enter fullscreen mode Exit fullscreen mode

This code will generate the following output on the console:

0.9843346569918552 ​​​​​
 subscriber 1: value = 0 
0.34568357780332915 ​​​​​
 subscriber 2: value =  0 
0.6003854545732459 ​​​​​
 subscriber 1: value = 1 
0.12901900745674388 ​​​​​
 subscriber 2: value =  1 
0.3773574643703079 ​​​​​
 subscriber 1: value = 2 
0.5661793730325613 ​​​​​
 subscriber 2: value =  2 
Enter fullscreen mode Exit fullscreen mode

So we get one random number (the tap of the original observable) for each execution of one of our subscribers. That's because each subscriber has their own version of the producer and its values (or the observable chain as I called before).

Multicasting our values

RxJS offers some operators that allow us to turn our regular unicast observables into multicast, which means that all subscribers will share the same instance of the producer and its values.
Two of them are share and shareReplay. There are others, but I find these to be the most useful ones for my use cases.

If we change the previous block of code and add the share operator to our observable like this:

const observable$ = interval(2000).pipe(
  take(3),
  tap(() => console.log(Math.random())),
  share()
);

observable$.subscribe(
  val => console.log(`subscriber 1: value = ${val}`)
);
observable$.subscribe(
  val => console.log(`subscriber 2: value = ${val}`)
);
Enter fullscreen mode Exit fullscreen mode

It will now produce the following output in the console:

0.15433905642184453 ​​​​​ 
 subscriber 1: value = 0 
 subscriber 2: value =  0 
0.6301263674328053 ​​​​​
 subscriber 1: value = 1 
 subscriber 2: value =  1 
0.20325573662904373 ​​​​​
 subscriber 1: value = 2 
 subscriber 2: value =  2 
Enter fullscreen mode Exit fullscreen mode

So now we have one random number for each execution of all of our subscribers. That's because now our producer is the same for all of our subscribers.

shareReplay does the same but it also replays the last N emitted values for new subscribers. Let's take a look at another example to compare them.

ex.:

// share
const observable$ = new Subject().pipe(
    map(() => Math.random()),
    share()
  );

observable$.subscribe(
  val => console.log(`subscriber 1: value = ${val}`)
);
observable$.subscribe(
  val => console.log(`subscriber 2: value = ${val}`)
);

// lets emit one value
observable$.next();

// creating a third "late" subscriber
observable$.subscribe(
  val => console.log(`subscriber 3: value = ${val}`)
);

// emitting a new value
observable$.next();
Enter fullscreen mode Exit fullscreen mode

This will produce the following output:

subscriber 1: value = 0.20616823116285787 
subscriber 2: value = 0.20616823116285787 
 
subscriber 1: value = 0.3700900273970813 
subscriber 2: value = 0.3700900273970813 
subscriber 3: value = 0.3700900273970813 
Enter fullscreen mode Exit fullscreen mode

So everyone is getting the same value as expected. Also, the third subscriber only received the value emitted after its creation. We can use shareReplay to cache the last emitted value (or N last values) so every new subscriber can process it.
Let's change the example above and replace the share operator with the shareReplay one.

// shareReplay
const observable$ = new Subject().pipe(
    map(() => Math.random()),
    shareReplay(1) // we can define how many values we want to cache and emit to our new subscribers, in this case we will keep only the last one.
  );

observable$.subscribe(
  val => console.log(`subscriber 1: value = ${val}`)
);
observable$.subscribe(
  val => console.log(`subscriber 2: value = ${val}`)
);

// lets emit one value
observable$.next();

// creating a third "late" subscriber
observable$.subscribe(
  val => console.log(`subscriber 3: value = ${val}`)
);

// emitting a new value
observable$.next();
Enter fullscreen mode Exit fullscreen mode

Now our code will generate the following:

subscriber 1: value = 0.990542441625698 
subscriber 2: value = 0.990542441625698 
subscriber 3: value = 0.990542441625698 
 
subscriber 1: value = 0.8445875342331315 
subscriber 2: value = 0.8445875342331315 
subscriber 3: value = 0.8445875342331315 
Enter fullscreen mode Exit fullscreen mode

As we can see, the third observable received the first emitted value even though it was late to the party.

This is all for now, I hope this can be helpful to someone else.
Cheers and bye for now.

References:
https://rxjs.dev/api/operators/share
https://rxjs.dev/api/operators/shareReplay

💖 💪 🙅 🚩
pedrostc
Pedro Osternack Corrêa

Posted on March 12, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related