Understanding Reactive Dataflow by making a Toy Implementation

snosme

Alexander Drozdov

Posted on September 12, 2023

Understanding Reactive Dataflow by making a Toy Implementation

Whenever you hear the word event you want to use reactive programming. Unlike the Observer pattern, there is no need to manage subscriptions in an imperative way. Simply by reading the variable, you subscribe to it. If the execution path changes and that variable is no longer in use, no worries, the runtime will automatically unsubscribe from it.

Let’s implement such a runtime in just 130 LOC.
To be more specific, implementations can choose the method of change propagation. A lazy, pull-based reactivity will be described here.

This is what it looks like in practice:

const ctx = new Context();
const a = Cell(ctx, 1);
const b = Cell(ctx, 2);
const sum = Computed(ctx, () => a.value + b.value);
const effect = MicrotaskEffect(ctx, () => {
    console.log(sum.value);
});
Enter fullscreen mode Exit fullscreen mode

The reactivity graph is made up of nodes defined in source code, while edges are dynamic and determined at execution time.

Reactivity graph

Because we're dealing with data flow, nodes can function as a data Source, a Sink, or both. Compare this to Subject and Observer.

The edges (array of references) are bidirectional:

  • from Source to Sink to notify about changes;
  • from Sink to Source to unsubscribe.

A shared Context allows the Source to know which Sink is calling it in order to establish a bidirectional edge.

export class Context {
    currentSink: SinkLike | null = null;
}

interface SourceLike {
    removeSink(sink: SinkLike): void;
}

interface SinkLike {
    markDirty(): void;
    addSource(source: SourceLike): void;
}
Enter fullscreen mode Exit fullscreen mode

Since prototype-based languages don't allow multiple inheritance, I'll use mixins to get around this.

const SourceMixin = (ctx: Context) => ({
    _ctx: ctx,
    _sinks: new Set<SinkLike>(),

    removeSink(sink: SinkLike): void {
        this._sinks.delete(sink);
    },

    _subscribeCaller(): void {
        const { currentSink } = this._ctx;
        if (currentSink) {
            this._sinks.add(currentSink);
            currentSink.addSource(this);
        }
    },

    _notifySinks(): void {
        for (const sink of this._sinks) {
            sink.markDirty();
        }
    },
});

const SinkMixin = (ctx: Context, dirty: boolean) => ({
    _ctx: ctx,
    _dirty: dirty,
    _sources: new Set<SourceLike>(),

    addSource(source: SourceLike): void {
        this._sources.add(source);
    },

    markDirty(): void {
        throw new NotImplementedError();
    },

    _capture(): boolean {
        if (this._dirty) {
            for (const source of this._sources) {
                source.removeSink(this);
            }
            this._sources.clear();
            this._ctx.currentSink = this;
            return true;
        }
        return false;
    },
});
Enter fullscreen mode Exit fullscreen mode

Let's define the input node of the graph, which is often referred to as Cell, Observable, Ref, or Signal. When it gets updated, it notifies Sinks, and this propagation occurs recursively.

export const Cell = <T>(ctx: Context, value: T) => ({
    ...SourceMixin(ctx),

    _value: value,

    get value(): T {
        this._subscribeCaller();
        return this._value;
    },

    set value(value: T) {
        this._value = value;
        this._notifySinks();
    },
});
Enter fullscreen mode Exit fullscreen mode

In the middle we have acyclically-connected operation nodes, typically named Computed or Transform. Upon receiving a change notification it only marks itself as dirty, while actual recomputation is performed on read.

export const Computed = <T>(ctx: Context, transformFn: () => T) => ({
    ...SourceMixin(ctx),
    ...SinkMixin(ctx, true),

    _value: undefined as T,
    _transformFn: transformFn,

    get value(): T {
        const saved = this._ctx.currentSink;
        if (this._capture()) {
            this._value = this._transformFn();
            this._ctx.currentSink = saved;
            this._dirty = false;
        }
        this._subscribeCaller();
        return this._value;
    },

    markDirty(): void {
        if (!this._dirty) {
            this._dirty = true;
            this._notifySinks();
        }
    },
});
Enter fullscreen mode Exit fullscreen mode

Finally, the output node, also known as Effect or Reaction, performs side-effects. It schedules itself to run upon receiving a notification.

export const MicrotaskEffect = (ctx: Context, effectFn: () => void) => ({
    ...SinkMixin(ctx, false),

    _effectFn: effectFn,

    _run(): void {
        if (this._capture()) {
            this._effectFn();
            this._ctx.currentSink = null;
            this._dirty = false;
        }
    },

    schedule(): void {
        this.markDirty();
    },

    markDirty(): void {
        if (!this._dirty) {
            this._dirty = true;
            queueMicrotask(this._run.bind(this));
        }
    },
});
Enter fullscreen mode Exit fullscreen mode

Scheduling effects asynchronously creates a natural batching of changes to input nodes. And if an effect never gets to run, nothing is recomputed along the path to it (e.g., user closes a browser tab without returning to it first).

That’s all, the toy reactivity system is finished, but some bits are missing such as performance, disposal, cycle detection.

Lastly, a playground has been set up for you to experiment with https://tsplay.dev/WvyXQm


💖 💪 🙅 🚩
snosme
Alexander Drozdov

Posted on September 12, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related