Observables and Observers in RxJS
Rajesh Rathore
Posted on January 9, 2024
RxJS is a library for composing asynchronous and event-based programs by using observable sequences. It provides one core type, the Observable, satellite types (Observer, Schedulers, Subjects) and operators inspired by Array methods (map, filter, reduce, every, etc) to allow handling asynchronous events as collections.
Think of RxJS as Lodash for events.
The essential concepts in RxJS which solve async event management are:
- Observable: represents the idea of an invokable collection of future values or events.
- Observer: is a collection of callbacks that knows how to listen to values delivered by the Observable.
- Subscription: represents the execution of an Observable, is primarily useful for cancelling the execution.
- Operators: are pure functions that enable a functional programming style of dealing with collections with operations like map, filter, concat, reduce, etc.
- Subject: is equivalent to an EventEmitter, and the only way of multicasting a value or event to multiple Observers.
- Schedulers: are centralized dispatchers to control concurrency, allowing us to coordinate when computation happens on e.g. setTimeout or requestAnimationFrame or others.
Before going to deep understanding of Oberservables and promises. We need to know about Pull and Push
Pull versus Push
Pull and Push are two different protocols that describe how a data Producer can communicate with a data Consumer.
What is Pull? In Pull systems, the Consumer determines when it receives data from the data Producer. The Producer itself is unaware of when the data will be delivered to the Consumer.
Every JavaScript Function is a Pull system. The function is a Producer of data, and the code that calls the function is consuming it by "pulling" out a single return value from its call.
What is Push? In Push systems, the Producer determines when to send data to the Consumer. The Consumer is unaware of when it will receive that data.
Promises are the most common type of Push system in JavaScript today. A Promise (the Producer) delivers a resolved value to registered callbacks (the Consumers), but unlike functions, it is the Promise which is in charge of determining precisely when that value is "pushed" to the callbacks
RxJS introduces Observables, a new Push system for JavaScript. An Observable is a Producer of multiple values, "pushing" them to Observers (Consumers).
SINGLE Value | MULTIPLE Value | |
---|---|---|
Pull | Function | Iterator |
Push | Promise | Observable |
Observable :
Observables are lazy Push collections of multiple values.
Observables are like functions with zero arguments, but generalize those to allow multiple values.
Consider the following:
function foo() {
console.log('Hello');
return 42;
}
const x = foo.call(); // same as foo()
console.log(x);
const y = foo.call(); // same as foo()
console.log(y);
We expect to see as output:
"Hello"
42
"Hello"
42
You can write the same behavior above, but with Observables:
import { Observable } from 'rxjs';
const foo = new Observable((subscriber) => {
console.log('Hello');
subscriber.next(42);
});
foo.subscribe((x) => {
console.log(x);
});
foo.subscribe((y) => {
console.log(y);
});
And the output is the same:
"Hello"
42
"Hello"
42
This happens because both functions and Observables are lazy computations. If you don't call the function, the console.log('Hello') won't happen. Also with Observables, if you don't "call" it (with subscribe), the console.log('Hello') won't happen
Subscribing to an Observable is analogous to calling a Function.
Some people claim that Observables are asynchronous. That is not true. If you surround a function call with logs, like this:
console.log('before');
console.log(foo.call());
console.log('after');
You will see the output:
"before"
"Hello"
42
"after"
And this is the same behavior with Observables:
console.log('before');
foo.subscribe((x) => {
console.log(x);
});
console.log('after');
And the output is:
"before"
"Hello"
42
"after"
Which proves the subscription of foo was entirely synchronous, just like a function.
Observables are able to deliver values either synchronously or asynchronously.
What is the difference between an Observable and a function? Observables can "return" multiple values over time, something which functions cannot. You can't do this:
function foo() {
console.log('Hello');
return 42;
return 100; // dead code. will never happen
}
Functions can only return one value. Observables, however, can do this:
import { Observable } from 'rxjs';
const foo = new Observable((subscriber) => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100); // "return" another value
subscriber.next(200); // "return" yet another
});
console.log('before');
foo.subscribe((x) => {
console.log(x);
});
console.log('after');
With synchronous output:
"before"
"Hello"
42
100
200
"after"
But you can also "return" values asynchronously:
import { Observable } from 'rxjs';
const foo = new Observable((subscriber) => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100);
subscriber.next(200);
setTimeout(() => {
subscriber.next(300); // happens asynchronously
}, 1000);
});
console.log('before');
foo.subscribe((x) => {
console.log(x);
});
console.log('after');
Output:
"before"
"Hello"
42
100
200
"after"
300
Conclusion:
- func.call() means "give me one value synchronously"
- observable.subscribe() means "give me any amount of values, either synchronously or asynchronously"
Core Observable concerns:
- Creating Observables: The Observable constructor takes one argument: the subscribe function.
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
const id = setInterval(() => {
subscriber.next('hi');
}, 1000);
});
Observables can be created with new Observable. Most commonly, observables are created using creation functions, like of, from, interval, etc.
- Subscribing to Observables: The Observable observable in the example can be subscribed to, like this:
observable.subscribe((x) => console.log(x));
Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.
- Executing the Observable: The code inside new Observable(function subscribe(subscriber) {...}) represents an "Observable execution", a lazy computation that only happens for each Observer that subscribes. The execution produces multiple values over time, either synchronously or asynchronously.
There are three types of values an Observable Execution can deliver:
- "Next" notification: sends a value such as a Number, a String, an Object, etc.
- "Error" notification: sends a JavaScript Error or exception.
-
"Complete" notification: does not send a value.
In an Observable Execution, zero to infinite Next notifications may be delivered. If either an Error or Complete notification is delivered, then nothing else can be delivered afterwards.
Disposing Observables:
Because Observable Executions may be infinite, and it's common for an Observer to want to abort execution in finite time, we need an API for canceling an execution. Since each execution is exclusive to one Observer only, once the Observer is done receiving values, it has to have a way to stop the execution, in order to avoid wasting computation power or memory resources.
import { from } from 'rxjs';
const observable = from([10, 20, 30]);
const subscription = observable.subscribe((x) => console.log(x));
// Later:
subscription.unsubscribe();
When you subscribe, you get back a Subscription, which represents the ongoing execution. Just call unsubscribe() to cancel the execution.
Observer:
What is an Observer? An Observer is a consumer of values delivered by an Observable. Observers are simply a set of callbacks, one for each type of notification delivered by the Observable: next, error, and complete. The following is an example of a typical Observer object:
const observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
To use the Observer, provide it to the subscribe of an Observable:
observable.subscribe(observer);
Observers are just objects with three callbacks, one for each type of notification that an Observable may deliver.
NOTE: Observers in RxJS may also be partial. If you don't provide one of the callbacks, the execution of the Observable will still happen normally, except some types of notifications will be ignored, because they don't have a corresponding callback in the Observer.
π Thank You for Joining the Journey! π
I hope you found this blog post informative and engaging. Your support means the world to me, and I'm thrilled to have you as part of my community. To stay updated on my latest content.
π Follow me on Social Media! π
π Visit my Website
π’ Connect with me on Twitter
π· Follow me on Instagram
π Connect on LinkedIn
π Check out my GitHub
π A Special Message to You! π
To all my dedicated readers and fellow tech enthusiasts, I want to express my gratitude for your continuous support. Your engagement, comments, and feedback mean the world to me. Let's keep learning, growing, and sharing our passion for development!
π₯ Let's Stay Connected! π₯
If you enjoy my content and want to stay in the loop with my latest posts, please consider following me on my social media platforms. Your support is invaluable.
Thank you for being a part of this amazing journey! π
Posted on January 9, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.