RxJS from Scratch: Pipeable Operators
Andrea Bertoli
Posted on December 5, 2019
This article is part of a series where we’ll implement RxJS from scratch step by step. We’ll follow an architecture equivalent to the real RxJS codebase but without all the optimizations and non-essential features.
In the previous chapter we built the fundamental RxJS entity named Observable. Observables are push systems that can be subscribed in order to receive notifications of the events they will emit. Observables are even more useful when they are chained using pipeable operators. In this article we will implement the Observables concatenation logic and some of the most common pipeable operators.
In order to easily follow this post it is strongly recommended to have already read the first one of the series, where we created Observables, Subscriptions and Subscribers.
Introduction
There are two types of operators: creation and pipeable ones. The formers are used to easily generate Observables from synchronous and asynchronous data sources (primitive types, arrays, Promises, HTTP requests, intervals, DOM events and more). Pipeable operators allow us to chain together several Observables forming a “listening” chain able to handle data flows and process values.
We can read from the docs:
A Pipeable Operator is a function that takes an Observable as its input and returns another Observable. It is a pure operation: the previous Observable stays unmodified. Subscribing to the output Observable will also subscribe to the input Observable.
In other words, pipeable operators are the essential pieces that allow complex asynchronous code to be easily composed in a declarative manner. They provide three main benefits:
- clear control flow: pipeable operators help you easily control how the events flow through your Observables
- function purity: pipeable operators are (almost) pure functions, so they can produce and process values “safely”
- value transformation: there are more than 70 operators that allow you to transform emitted values as you want
We start with interval, an Observable that emits data periodically. You can find all the details in my previous post, but just to refresh your memory here’s the implementation.
const interval = (period) => {
return new Observable(observer => {
let counter = 0
const id = setInterval(() => observer.next(++counter), period)
return () => {
clearInterval(id)
}
})
}
Let’s start working!
RxJS public API
Often it’s useful to start from the end to completely understand the goal we want to achieve. Let's look at how RxJS allows us to use pipeable operators.
const intervalObx = interval(700)
.pipe(
throttleTime(1500),
map(x => `Result: ${x * 3}`),
take(3)
)
const subscription = intervalObx.subscribe({
next: (val) => console.log(val),
error: (err) => console.warn(err),
complete: () => console.log('Completed!')
})
// Result: 0
// Result: 9
// Result: 18
// Completed!
The newly created Observable from interval(700)
is chained with throttleTime , map and take operators. This operation creates a new Observable that can be subscribed in order to be started. As well as in functional programming, the pipe method is used to compose functions returned by operators. When and how the second Observable created with pipe will emit values?
The intervalObx
Observable emits values every 700 milliseconds. The throttleTime
operator receives these events and only emits one if 1500 milliseconds have passed since the last received one. The result is to rarefy event emission. When the value is emitted again, map
multiplies it by 3 and in its turn it emits the string value to take
. The latter operator simply carries on data completing the Observable after three received values. At the end, our Observer will eventually receive and log the string via the next method.
Operators as functions
Now that we've seen how the external API is used, let's explore how things work under the hood. The composition logic requires each operator to receive a source Observable (not a destination one).
Pipeable operators are higher-order functions that return another function waiting for a source Observable. When the source Observable is provided, they return a new Observable. This new Observable, when subscribed, in turn subscribes to the original one.
Let's take an example with map.
// MAP operator
const map = (mapFunc) => (sourceObservable) => {
// return a new Observable
return new Observable(observer => {
const sourceSubscription = sourceObservable.subscribe({
next(val) {
let next
try {
next = mapFunc(val)
} catch (e) {
this.error(e)
this.complete()
}
observer.next(next)
},
error(err) {
observer.error(err)
},
complete() {
observer.complete()
}
})
return () => {
// --- operator specific TEARDOWN LOGIC
// when the new Obx is unsubscribed
// simply unsubscribe from the source Obx
sourceSubscription.unsubscribe()
}
})
}
When we provide the source Observable a new Observable is returned. Note again the chaining mechanism: when the new Observable is subscribed (from the outside), the init function of the latter will subscribe to the source Observable with an inner operator-specific Observer.
The values emitted by the source Observable will be listened to by the new Observable, which will apply a specific logic to the value and pass it to our “external” Observer. The map
operator has a simple and synchronous logic: just apply a function to the value and pass the result to the Observer.
Let's try to use it, in a non-idiomatic way.
// GENERATE A NEW OBSERVABLE from the previous one
// - mapFunc = x => x * 2
// - sourceObservable = intervalObx
const newObx = map(x => x * 2)(intervalObx)
const subscription = newObx.subscribe({
next: (val) => console.log(val),
error: (err) => console.warn(err),
complete: () => console.log('Completed!')
})
The previously returned newObx
is itself an Observable so when can chain it even more.
const newObx2 = map(x => 'Test: ' + x)( map(x => x * 2)(intervalObx) )
const subscription = newObx2.subscribe({
next: (val) => console.log(val),
error: (err) => console.error(err),
complete: () => console.log('Completed!')
})
As you can see, this approach is neither ergonomic nor scalable. Let’s see how to solve the issue. 😁
Operator composition with pipe
We need to pass the result from an operator to the next one, whatever the number of operators is (so basically we need to do f(g(x))). Luckily, there is a convenient way to perform this operation by exploiting functional programming. We are talking about the pipe utility. Using pipe we are going to concatenate n functions, calling each of them with the output of the previous one. This is exactly what we need to chain Observables (to get more details about functional techniques have a look at my previous article about functional programming).
Let’s provide the Observable class with a pipe
method that will take care of passing each source Observable to the next operator.
// f(g(h(x))) = pipe(f, g, h)(x)
const pipe = (...fns) => (val) => fns.reduce((acc, f) => f(acc), val)
class Observable {
constructor(initFunc) {
this.initFunc = initFunc;
}
subscribe(observer) {
const subscription = new Subscription();
const subscriber = new Subscriber(observer, subscription);
const teardown = this.initFunc(subscriber)
subscription.add(teardown);
return subscription;
}
pipe(...fns) {
// provide source Obx to each function returned from pipeable operators,
// to start the chaining operation provide the current source Obx (this)
return pipe(...fns)(this);
}
}
The pipe method will return another Observable. When the subscription takes place, the last Observable in the chain starts by calling his init function. Now the previously explained logic will take place for every operator: the init function subscribes to the previous Observable with an operator-specific inner Observer, then the subscription chain will carry on until the first Observable (the last being subscribed, in reverse order). Now data emission will start.
As mentioned, the return value of the pipe method is an Observable itself that we could either subscribe, save in a variable, concatenate again, pass as an argument or treat as we want! The final syntax of our library API will be the one presented at the beginning of the article, proposed again in the snippet below. The dollar symbol at the end of the variable name is a common RxJS convention.
const chainedObservable$ = interval(700)
.pipe(
throttleTime(1500),
map(x => x * 3),
map(x => `Result: ${x}`)
)
const subscription = chainedObservable$.subscribe({
next: (val) => console.log(val),
error: (err) => console.warn(err),
complete: () => console.log('Completed!')
})
It's easy now to implement some other pipeable operators. Let's have a look at these three below.
// THROTTLE TIME operator
const throttleTime = (time) => (sourceObservable) => {
let lastEventTime = 0
return new Observable(observer => {
const sourceSubscription = sourceObservable.subscribe({
next(val) {
// rarefy event emission
if (Date.now() - lastEventTime > time) {
lastEventTime = Date.now()
observer.next(val)
}
},
error: (err) => observer.error(err),
complete: () => observer.complete()
})
return () => sourceSubscription.unsubscribe()
})
}
// DEBOUNCE TIME operator
const debounceTime = (delay) => (sourceObservable) => {
let interval
return new Observable(observer => {
const sourceSubscription = sourceObservable.subscribe({
next: (val) => {
// postpone and group rapid sequences of events
clearInterval(interval)
interval = setTimeout(() => observer.next(val), delay)
},
error: (err) => observer.error(err),
complete: () => observer.complete()
})
return () => {
// teardown logic
clearInterval(interval)
sourceSubscription.unsubscribe()
}
})
}
// TAKE operator
const take = (howMany) => (sourceObservable) => {
let counter = 0
return new Observable(observer => {
const sourceSubscription = sourceObservable.subscribe({
next: (val) => {
counter++
observer.next(val)
if (counter >= howMany) {
this.complete()
sourceSubscription.unsubscribe()
}
},
error: (err) => observer.error(err),
complete: () => observer.complete()
})
return () => sourceSubscription.unsubscribe()
})
}
There are a lot of different operations that can be performed when linking Observables and therefore there are as many operators, more than 70. The most interesting ones are those which allow us to join, switch and flat Observables.
A taste of advanced operators
RxJS allows us to manage asynchronous data in a simple and declarative way. To demonstrate that, let's have a look at some more realistic use cases. For example, after an HTTP request we want to start some other asynchronous operation. In general, this situation can be resumed as follows: data emission by an Observable must “switch” the data source to a new inner Observable.
The switchMap operator allows us to accomplish this task: whenever it receives a new value from a source it calls a function that returns a new inner Observable. The returned Observable will be subscribed, unsubscribing from the previously created one. Then the values emitted from the inner Observable will carry on to the following operators. The first time I saw this operator it amazed me!
Here’s a rough implementation of the switchMap
operator.
// SWITCH MAP operator
const switchMap = (innerObxReturningFunc) => (sourceObx) => {
let innerSubscription
return new Observable(observer => {
const sourceSubscription = sourceObx.subscribe({
next(val) {
// unsubscribe from previous subscription if exists
innerSubscription && innerSubscription.unsubscribe()
// subscribe to inner Observable
const innerObx = innerObxReturningFunc(val)
innerSubscription = innerObx.subscribe({ // <- start the inner Obx
next: (_val) => observer.next(_val),
error: (_err) => observer.error(_err),
complete: () => observer.complete(),
})
},
error() {
// doesn’t care about source Obx errors
},
complete() {
// doesn’t care about source Obx completion
}
})
return () => {
innerSubscription.unsubscribe()
sourceSubscription.unsubscribe()
}
})
}
To better clarify how switchMap works, in the next example we’ll chain it with a "verbose" interval (with some logging), in order to easily follow the logic flow.
// DEBUG-ONLY CREATION operator
const verboseInterval = (time, name) => {
return new Observable(observer => {
let counter = 0
console.log(`Starting from ${name}`)
const id = setInterval(() => {
console.log(`Emitting from ${name}`)
observer.next(++counter)
}, time)
return () => {
console.log(`Teardown of ${name} Obx`)
clearInterval(id)
}
})
}
const subscription = verboseInterval(2000, 'source')
.pipe(
switchMap(ev => {
console.log('Switching to the inner Obx')
return verboseInterval(150, 'inner')
}),
map(x => x * 2)
)
.subscribe({ next: console.log })
/////////// --- CONSOLE
// Starting from source
// --- after 2000ms..
// Emitting from source
// Switching to the inner Obx
// Starting from inner
// Emitting from inner
// 2
// 4
// ...
// ...
// --- after 2000 ms
// Emitting from source
// Switching to the inner Obx
// Teardown of inner Obx <- appears from the second "switch"
// Starting from inner
// Emitting from inner
// 2
// 4
// ...
// ...
Have a look at your console. What is happening? Every 2000 milliseconds the source Observable emits a new value, then switchMap
calls the provided function and the returned inner Observable is subscribed which causes every 150 milliseconds a new event to be emitted. The latter value is then passed to the next operator in the chain (map
) and to the provided Observer.
At the same time, every 2000 milliseconds switchMap
unsubscribes from the previously created inner Observable while subscribing again to the newly created one. As you have already guessed, it’s really easy to chain complex and sequential data flows.
Another easy but explicative example is the following one, related to autocomplete hints (obtained from a server) in “search” text inputs. We don't want to make a request to the server at each keystroke since most of them are just "intermediate". We can manage this stream of events with RxJS, the mechanism is the same as explained above.
const searchInput = document.querySelector('#mySearchInput')
const subscription = fromEvent(searchInput, 'input')
.pipe(
debounceTime(700),
map(e => e.target.value),
switchMap(input => fromFetch(`API_ENDPOINT?query=${input}`))
)
.subscribe({
next: (result) => {
// update DOM with autocomplete hints
}
})
SwitchMap is just one of many useful RxJS operators! I invite you to explore and try to implement some other ones, including concatMap, mergeMap, forkJoin, concat, and exhaustMap! 😉
Conclusions
We have combined some functional programming utilities with Observables, implementing the fundamental chaining mechanism of RxJS. Observables composition is a powerful technique to declaratively manage complex and asynchronous data flows. RxJS operators are more than 70 and allow us to compose event flows in a really clear and precise way.
If you are an Angular developer you will find RxJS everywhere, indeed Observables are first-class citizens of the framework. In any case, RxJS is becoming more and more used in front end development regardless of the adopted library/framework.
I hope this explanation may have helped you to understand the philosophy and the power of RxJS, clarifying its most difficult aspects! Now our basic version of RxJS is kind of complete, but in the future I might extend this series to include advanced topics like Subjects and Schedulers.
Thank you for reading! 😁
PS: English is not my mother tongue, so errors are just around the corner. Feel free to comment with corrections!
Posted on December 5, 2019
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.