From RxJS to 𝗥𝘅𝑓𝑥, building an Event Bus for reliable async on any platform.

deanius

Dean Radcliffe

Posted on September 10, 2021

From RxJS to 𝗥𝘅𝑓𝑥, building an Event Bus for reliable async on any platform.

Intro

In the This Dot Labs post titled
How To Implement An Event Bus in Typescript, Luis Aviles showed us a type-safe implementation of an event bus. He also did a great job elaborating on how and when you can one: for reliable passing of information around an application, and triggering effects in a framework-free way that works, with or without a DOM, in Node, or React Native! 𝗥𝘅𝑓𝑥 can be the platform for async and data-sharing that does not dictate the rest of your application structure.

For a React application I was developing, my team could realize benefits of less prop-drilling by using a bus. So, I thought that with Luis' article as a reference, I might detail how an implementation might go, from scratch, starting only with an RxJS Subject. The code below gives only a glimpse of what an event bus can do, and weighs in at only 6Kb!

So let's dive in!

Instance management

An event bus will frequently be a singleton object within an app. To manage a singleton instance, we could write typical OO singleton code.

export class EventBus {
  private static instance?: EventBus = undefined;

  private constructor() {}

  public static getInstance(): EventBus {
    if (this.instance === undefined) {
      this.instance = new EventBus();
    }

    return this.instance;
  }
}
Enter fullscreen mode Exit fullscreen mode

Then the usage of it everywhere would look like:

EventBus.getInstance().some-method()
Enter fullscreen mode Exit fullscreen mode

But that's a bit verbose. Let's instead use the ES2015 module system to export a single object from a bus.ts file

// bus.ts
export const bus = EventBus.getInstance()
Enter fullscreen mode Exit fullscreen mode

and then callers could simply do:

import { bus } from './bus'
bus.some_method()
Enter fullscreen mode Exit fullscreen mode

No matter from which file you import bus, you get the same reference, so we are effectively using the module system to enforce a single reference without complicating callers.

Now, what should callers have as the interface to our EventBus?

To listen or not to listen?

Event Buses implement the pub-sub (publish-subscribe) metaphor, so there are two sides to them. First we'll create a fresh API for for the side that responds to events. In Luis' article, the method name was register. In the DOM, you listen for events with addEventListener. Let's shorten that to just listen. Let's assume strings are on the bus for now, and assume that TypeScript knows this. When the bus is in scope we have:

bus.listen(
  item => item.startsWith('hello-'),
  (item) => {
     const who = item.replace('hello-','');
     console.log('Hello '+ who);
  }
);
Enter fullscreen mode Exit fullscreen mode

The first argument is a function that returns true or false - called a Predicate. A predicate function is the most flexible way to match an item, and allows for any synchronous function which returns a Boolean. Here we use the nice string method startsWith, from ES2015. Never again do we have to test with indexOf, whew! Now let's see how to put an event on the bus.

Trigger Me!

While dispatch is a popular term from Redux (and the DOM) for sending an action, it's definition implies an action is directed at something or someone. However, the essence of an event bus is that the sender of an event doesn't know who or what is listening. So let's name our method for sending an undirected event trigger.

The argument to trigger should be the type of item the bus allows. Assuming the bus instance was already typed to string, then the code to trigger is simply:

bus.trigger('hello-dave');
Enter fullscreen mode Exit fullscreen mode

Notice you don't have to provide a type argument at the moment of triggering, because the bus knows it. Now- what about when a listener needs to be unregistered?

Will You Stop Listening, Already?

Just as the DOM has removeEventListener, an Event Bus should be able to stop listening - for cleanup purposes at least. In Luis' article, he demonstrated a style of un-listening like this:

const registry = EventBus.getInstance().register(matcher, fn);
registry.unregister();
Enter fullscreen mode Exit fullscreen mode

It's an improvement over removeEventListener because the return value lets you cancel without the original arguments. I like how it resembles getting a Subscription object from an RxJS Observable, on which you can call unsubscribe. So in our example, let's actually return a Subscription:

const listener:Subscription = bus.listen(matcher, fn);
listener.unsubscribe();
Enter fullscreen mode Exit fullscreen mode

This gives us some things for free such as a .closed property on listener, and the ability to shut down several listeners at once by creating an aggregate Subscription via Subscription#add. Since a Subscription object can represent any process that can be shut down, it's actually a great fit for a listener that can be shut down to stop listening.

Implementation

Although a production-ready library that implements this pattern is at @rxfx/bus, the code that implements only what's shown in this article is simply this:

class Bus<T> {
  private events: Subject<T>;
  constructor() {
    this.events = new Subject();
  }
  listen(matcher: Predicate<T>, handler: (item: T) => void): Subscription {
    return this.events
      .asObservable()
      .pipe(filter(matcher), tap(handler))
      .subscribe();
  }
  trigger(item: T) {
    this.events.next(item);
  }
}
export const bus = new Bus<string>();
// now bus.listen / bus.trigger
Enter fullscreen mode Exit fullscreen mode

Play with a working version of this in the 𝗥𝘅𝑓𝑥 Repl at Repl.it

Next Steps

To truly be useful, an event bus needs:

  • True error isolation (between listeners, between triggerers and listeners)
  • Async listeners which return Promises or Observables
  • The ability to re-trigger from listeners' results
  • The ability to queue, or otherwise deal with listeners that overlap
  • To be able to tie listeners to component lifetimes as in React
  • A way to validate what events are allowed on the bus at runtime

You can read about them more in the documentation and source on Github.

I hope you enjoyed seeing this use of Typescript to create a powerful, simple Event Bus, with a friendly and interoperable API. Let me know what more you'd like an Event Bus to do for you!

💖 💪 🙅 🚩
deanius
Dean Radcliffe

Posted on September 10, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related