Understanding Observables
Jan van Brügge
Posted on September 1, 2017
Reactive programming has gained a lot of traction lately. Libraries like RxJS and Most.js and frameworks like Cycle.js make it easy to compose complex async behavior. But understanding how those observables or streams (I will use both terms interchangeable from now on) work is often difficult to explain. In my experience, if you can build something yourself, you understood it. That's why we will build a toy RxJS in this article!
What are we trying to achieve
As many people are not familar with streams, here is short summary: Streams are arrays over time. What I mean by this:
const myArray = [1, 2, 3, 4];
const myValue = myArray
.map(i => i * 2)
.reduce((acc, curr) => acc + curr, 0);
console.log(myValue);
In this code snippet we are taking an array and sum up all the elements in there. But what if we get the values from an external source, like from an API? Then we could use promises:
const myValuePromise = getData() //uses a promise based API
.then(data => data
.map(i => i*2)
.reduce((acc, curr) => acc + curr, 0)
)
.then(console.log);
This also works very well. But what if we get the data from a websocket? A websocket is not a single value in the future like a Promise, but many values! This is where streams become helpful:
let websocket = new Websocket(/* ... */);
const websocketStream = Observable.create(observer => {
websocket.onMessage = (msg) => observer.onNext(msg);
websocket.onClose = () => observer.complete();
return () => websocket.close();
});
const myValueStream = websocketStream
.map(i => i * 2)
.scan((acc, curr) => acc + curr, 0)
.subscribe(console.log);
Now, every time a new value arrives via the websocket, scan
will emit the new sum. If you want to wait until the websocket closes and then just print the final sum, you can use reduce
.
Building a toy RxJS
Now that we know how to use streams, it's time to start building a stream library. Let's first ask us, what whe want when to happen. We want to have some observer that can subscribe to an observable. The observer will then receive the values from upstream. So, to start simple we will first define our observable. I will use typescript here, as it helps to understand what's going on.
interface Observer<T> {
next(t: T): void;
complete(): void;
}
As you can see, an observer is an object with a next
and a complete
function. Now we need the observable. For this we will start bottom up, this means for now, our observable just needs a subscribe
method.
interface Observable<T> {
subscribe(observer: Observer<T>): void;
}
So to use that naively, we would just create an object with a single method. Let's replicate our websocket example:
let websocket = new Websocket(/* ... */);
const websocketStream = {
subscribe(observer) {
websocket.onMessage = msg => observer.next(msg);
websocket.onClose = () => observer.complete();
}
}
Okay, that looks almost as the real RxJS example. The only difference is the missing cleanup, but for simplicity sake, I won't cover that. Next, we have to define a map function that takes a function and an observable and returns a new one:
function map<T, U>(fn: (t: T) => U): (s: Observable<T>) => Observable<U> {
return stream => ({
subscribe(observer: Observer<U>) {
stream.subscribe({
next: (value: T) => observer.next(fn(value)),
complete: observer.complete
});
}
});
}
We are basicly just creating a factory function that subscribes to the previous observable with an internal observer that applies the function and returns the values to the next observer. Again Typescript helps for understanding what's going on.
Now we can do this (extending the previous example):
const myValueStream = map(i => i * 2)(websocketStream);
While this works, it's not the most beautiful API. We are used to calling functions on the observable. Luckily, this can be fixed quite easily:
class Stream<T> implements Observable<T> {
constructor(public subscribe: (o: Observer<T>) => void) {}
public compose<U>(operator: (s: Stream<T>) => Stream<U>): Stream<U> {
return operator(this);
}
public map<U>(fn: (t: T) => U): Stream<U> {
return this.compose(map(fn));
}
}
Now we have an ES6 class
that gets a subscribe
method as constructor argument and has map
on it's prototype. This means our example looks like this:
let websocket = new Websocket(/* ... */);
-const websocketStream = {
- subscribe(observer) {
+const websocketStream = new Stream(observer => {
websocket.onMessage = msg => observer.next(msg);
websocket.onClose = () => observer.complete();
}
}
const myValueStream = websocketStream
.map(i => i * 2);
Now to implement scan
is rather easy, so we will instead implement reduce
which waits until the last value has arrived an then emits the result once:
function fold<T, U>(fn: (acc: U, curr: T) => U, seed: U): (s: Stream<T>) => Stream<U> {
return stream => new Stream(observer => {
let accumulator = seed;
stream.subscribe({
next: value => {
accumulator = fn(accumulator, value);
},
complete: () => {
observer.next(accumulator);
observer.complete();
}
});
});
}
It can be seen that we have an internal state that gets updated on every event from the previous stream. Once the previous stream completes, we emit the value and complete too. We could implement scan
the same way except that we would emit every time there is a new value and not on completion.
With that we can now replicate our websocket example (assume we have added scan
to the Stream class just like map
):
let websocket = new Websocket(/* ... */);
const websocketStream = new Stream(observer => {
websocket.onMessage = (msg) => observer.onNext(msg);
websocket.onClose = () => observer.complete();
});
const myValueStream = websocketStream
.map(i => i * 2)
.scan((acc, curr) => acc + curr, 0)
.subscribe({
next: console.log,
complete: () => {}
});
Let's take it even a step further. We want an initial HTTP request and future updates via websocket. Without streams this is difficult to do. For this we first need something to convert a Promise into a stream:
function fromPromise<T>(p: Promise<T>): Stream<T> {
return new Stream<T>(observer => {
p.then(data => observer.next(data));
});
}
Then, we need a way to convert an stream of arrays to a stream of individual items (assuming our API returns an array of data and the websocket just singular items). We can split this into one function that converts an array into a stream and a second function that "flattens" a stream:
function fromArray<T>(array: T[]): Stream<T> {
return new Stream(observer => {
array.forEach(e => {
observer.next(e);
});
observer.complete();
});
}
function flatMap<T, U>(fn: (t: T) => Stream<U>): (s: Stream<T>) => Stream<U> {
return stream => new Stream<U>(observer => {
stream.subscribe({
next(s: Stream<U>) {
s.subscribe({
next: observer.next,
complete: () => {}
});
},
complete: () => observer.complete()
});
});
}
As you can see in fromArray
we just take every element and push it into the stream. flatMap
is a lot more interesting here. We first subscribe to the outer stream and on every new inner stream that we receive, we subscribe to that too and output all values to the next observer.
Let's use our new methods (assume we have added flatMap to the Stream class):
let websocket = new Websocket(/* ... */);
const websocketStream = new Stream(observer => {
websocket.onMessage = (msg) => observer.onNext(msg);
websocket.onClose = () => observer.complete();
});
let httpStream = fromPromise(getData())
.flatMap(data => fromArray(data));
const myValueStream = websocketStream
.map(i => i * 2)
.scan((acc, curr) => acc + curr, 0)
.subscribe({
next: console.log,
complete: () => {}
});
The last bit missing is something to merge those two streams:
function merge<T>(...streams: Stream<T>[]): Stream<T> {
return new Stream(observer => {
let numCompleted = 0;
streams.forEach(s => {
s.subscribe({
next: value => observer.next(value),
complete: () => {
numCompleted++;
if(numCompleted === streams.length) {
observer.complete();
}
}
});
});
});
}
As you can see, we are simply subscribing to all streams and emit a value when any one of them emits. We complete the stream if all streams complete. With this we can finally finish our example:
let websocket = new Websocket(/* ... */);
const websocketStream = new Stream(observer => {
websocket.onMessage = (msg) => observer.onNext(msg);
websocket.onClose = () => observer.complete();
});
let httpStream = fromPromise(getData())
.flatMap(data => fromArray(data));
const myValueStream = merge(httpStream, websocketStream)
.map(i => i * 2)
.scan((acc, curr) => acc + curr, 0)
.subscribe({
next: console.log,
complete: () => {}
});
Wrapping it up
Observables can be extremely useful if you have complex async behavior. They are not that hard to write yourself too! The toy RxJS I showed here is not how the mayor stream libraries are implemented because the closures are expensive in performance in Javascript. But the core ideas stay the same.
I hope you liked the article and learned something new. If you are interested in reactive programming, take a look at Cycle.js, a fully reactive framework where I am part of the core team.
Posted on September 1, 2017
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.