RxJS Patterns: Creating an Action Stream

cogoo

C.OG

Posted on October 9, 2019

RxJS Patterns: Creating an Action Stream

RxJS is an extremely powerful observable based library, used at the very core of Angular, that allows us to write our applications in a truly reactive paradigm.

Creation operators are standalone functions that can create a new observable.

of(1,2,3)
  .subscribe((val) => {
    console.log('::val', val) // output: 1, 2, 3
  })
Enter fullscreen mode Exit fullscreen mode

These type of operators pose a new challenge when you want to follow a truly reactive paradigm.

In #rxjs .subscribe() is where reactive programming ends

— Michael Rx Hladky (@michael_hladky) October 5, 2019

Using observables comes with a greater responsibility of always unsubscribing. Fortunately Angular provides an async pipe that handles the subscription and un-subscription logic for us. This worked great until I had to make a http post request.

There was no obvious way to use the async pipe in this context, as this request is made upon a user action. And I really, really, did not want to use .subscribe().

Cue the action stream:

<!-- some.component.html -->

<ng-container *ngIf="{ postAction: postStream$ | async } as postStream">
Enter fullscreen mode Exit fullscreen mode

We subscribe to a postStream$ using the async pipe. The observable will not emit any values until the http request has returned, therefore, we wrap the subscription in an object. This will be a truthy value, so the *ngIf condition will be met, which allows our template to always be rendered.

The complete template:

<!-- some.component.html -->

<ng-container *ngIf="{ postAction: postStream$ | async } as postStream">
  <p>Click the button to see the magic happen :)</p>
  <button (click)="onBtnClick()">Post data to back end</button>

  <!-- You can use this to show a success or error message, or loading state -->
  <p *ngIf="postStream.postAction">Saved!</p>
</ng-container>
Enter fullscreen mode Exit fullscreen mode

In our component file:

// app.component.ts

// create your action stream
private readonly postAction$ = new Subject();
Enter fullscreen mode Exit fullscreen mode

We create an action stream; simply a subject that we'll call .next() on when an event in our template has taken place.

// app.component.ts

// subscribe to this post stream (high order observable) in your template to start listening 
// for values emitted by the action stream
postStream$ = this.postAction$.pipe(
  // pick the correct operator for your use case
  exhaustMap(()=> {
    return of('my post'); // this could be your http post
  })
)
Enter fullscreen mode Exit fullscreen mode

We create a higher order observable postStream$, that has an outer observable, our action stream we created earlier, and an inner observable that we map to. For this example we use a creation operator of() but this could be a http post request.

// app.component.ts

onBtnClick() {
  this.postAction$.next()
}
Enter fullscreen mode Exit fullscreen mode

We then have a method on the class that will trigger our action stream.

The complete component:

// app.component.ts

export class AppComponent  {
  // create your action stream
  private readonly postAction$ = new Subject();

  // subscribe to this post stream (high order observable) in your template to start listening 
  // for values emitted by the action stream
  postStream$ = this.postAction$.pipe(
    // pick the correct operator for your use case
    exhaustMap(()=> {
      return of('my post'); // this could be your http post
    })
  )

  onBtnClick() {
    this.postAction$.next()
  }
}
Enter fullscreen mode Exit fullscreen mode

With this approach, we can avoid having to manually call .subscribe() when making http post requests.

You can find the full code on StackBlitz

💖 💪 🙅 🚩
cogoo
C.OG

Posted on October 9, 2019

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related