Understanding Reactive Dataflow by making a Toy Implementation
Alexander Drozdov
Posted on September 12, 2023
Whenever you hear the word event you want to use reactive programming. Unlike the Observer pattern, there is no need to manage subscriptions in an imperative way. Simply by reading the variable, you subscribe to it. If the execution path changes and that variable is no longer in use, no worries, the runtime will automatically unsubscribe from it.
Let’s implement such a runtime in just 130 LOC.
To be more specific, implementations can choose the method of change propagation. A lazy, pull-based reactivity will be described here.
This is what it looks like in practice:
const ctx = new Context();
const a = Cell(ctx, 1);
const b = Cell(ctx, 2);
const sum = Computed(ctx, () => a.value + b.value);
const effect = MicrotaskEffect(ctx, () => {
console.log(sum.value);
});
The reactivity graph is made up of nodes defined in source code, while edges are dynamic and determined at execution time.
Because we're dealing with data flow, nodes can function as a data Source, a Sink, or both. Compare this to Subject and Observer.
The edges (array of references) are bidirectional:
- from Source to Sink to notify about changes;
- from Sink to Source to unsubscribe.
A shared Context allows the Source to know which Sink is calling it in order to establish a bidirectional edge.
export class Context {
currentSink: SinkLike | null = null;
}
interface SourceLike {
removeSink(sink: SinkLike): void;
}
interface SinkLike {
markDirty(): void;
addSource(source: SourceLike): void;
}
Since prototype-based languages don't allow multiple inheritance, I'll use mixins to get around this.
const SourceMixin = (ctx: Context) => ({
_ctx: ctx,
_sinks: new Set<SinkLike>(),
removeSink(sink: SinkLike): void {
this._sinks.delete(sink);
},
_subscribeCaller(): void {
const { currentSink } = this._ctx;
if (currentSink) {
this._sinks.add(currentSink);
currentSink.addSource(this);
}
},
_notifySinks(): void {
for (const sink of this._sinks) {
sink.markDirty();
}
},
});
const SinkMixin = (ctx: Context, dirty: boolean) => ({
_ctx: ctx,
_dirty: dirty,
_sources: new Set<SourceLike>(),
addSource(source: SourceLike): void {
this._sources.add(source);
},
markDirty(): void {
throw new NotImplementedError();
},
_capture(): boolean {
if (this._dirty) {
for (const source of this._sources) {
source.removeSink(this);
}
this._sources.clear();
this._ctx.currentSink = this;
return true;
}
return false;
},
});
Let's define the input node of the graph, which is often referred to as Cell, Observable, Ref, or Signal. When it gets updated, it notifies Sinks, and this propagation occurs recursively.
export const Cell = <T>(ctx: Context, value: T) => ({
...SourceMixin(ctx),
_value: value,
get value(): T {
this._subscribeCaller();
return this._value;
},
set value(value: T) {
this._value = value;
this._notifySinks();
},
});
In the middle we have acyclically-connected operation nodes, typically named Computed or Transform. Upon receiving a change notification it only marks itself as dirty, while actual recomputation is performed on read.
export const Computed = <T>(ctx: Context, transformFn: () => T) => ({
...SourceMixin(ctx),
...SinkMixin(ctx, true),
_value: undefined as T,
_transformFn: transformFn,
get value(): T {
const saved = this._ctx.currentSink;
if (this._capture()) {
this._value = this._transformFn();
this._ctx.currentSink = saved;
this._dirty = false;
}
this._subscribeCaller();
return this._value;
},
markDirty(): void {
if (!this._dirty) {
this._dirty = true;
this._notifySinks();
}
},
});
Finally, the output node, also known as Effect or Reaction, performs side-effects. It schedules itself to run upon receiving a notification.
export const MicrotaskEffect = (ctx: Context, effectFn: () => void) => ({
...SinkMixin(ctx, false),
_effectFn: effectFn,
_run(): void {
if (this._capture()) {
this._effectFn();
this._ctx.currentSink = null;
this._dirty = false;
}
},
schedule(): void {
this.markDirty();
},
markDirty(): void {
if (!this._dirty) {
this._dirty = true;
queueMicrotask(this._run.bind(this));
}
},
});
Scheduling effects asynchronously creates a natural batching of changes to input nodes. And if an effect never gets to run, nothing is recomputed along the path to it (e.g., user closes a browser tab without returning to it first).
That’s all, the toy reactivity system is finished, but some bits are missing such as performance, disposal, cycle detection.
Lastly, a playground has been set up for you to experiment with https://tsplay.dev/WvyXQm
Posted on September 12, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.