RxJS Patterns: Creating an Action Stream
C.OG
Posted on October 9, 2019
RxJS is an extremely powerful observable based library, used at the very core of Angular, that allows us to write our applications in a truly reactive paradigm.
Creation operators are standalone functions that can create a new observable.
of(1,2,3)
.subscribe((val) => {
console.log('::val', val) // output: 1, 2, 3
})
These type of operators pose a new challenge when you want to follow a truly reactive paradigm.
In #rxjs .subscribe() is where reactive programming ends
— Michael Rx Hladky (@michael_hladky) October 5, 2019
Using observables comes with a greater responsibility of always unsubscribing. Fortunately Angular provides an async
pipe that handles the subscription and un-subscription logic for us. This worked great until I had to make a http post request.
There was no obvious way to use the async
pipe in this context, as this request is made upon a user action. And I really, really, did not want to use .subscribe()
.
Cue the action stream:
<!-- some.component.html -->
<ng-container *ngIf="{ postAction: postStream$ | async } as postStream">
We subscribe to a postStream$
using the async
pipe. The observable will not emit any values until the http request has returned, therefore, we wrap the subscription in an object. This will be a truthy value, so the *ngIf
condition will be met, which allows our template to always be rendered.
The complete template:
<!-- some.component.html -->
<ng-container *ngIf="{ postAction: postStream$ | async } as postStream">
<p>Click the button to see the magic happen :)</p>
<button (click)="onBtnClick()">Post data to back end</button>
<!-- You can use this to show a success or error message, or loading state -->
<p *ngIf="postStream.postAction">Saved!</p>
</ng-container>
In our component file:
// app.component.ts
// create your action stream
private readonly postAction$ = new Subject();
We create an action stream; simply a subject that we'll call .next()
on when an event in our template has taken place.
// app.component.ts
// subscribe to this post stream (high order observable) in your template to start listening
// for values emitted by the action stream
postStream$ = this.postAction$.pipe(
// pick the correct operator for your use case
exhaustMap(()=> {
return of('my post'); // this could be your http post
})
)
We create a higher order observable postStream$
, that has an outer observable, our action stream we created earlier, and an inner observable that we map to. For this example we use a creation operator of()
but this could be a http post request.
// app.component.ts
onBtnClick() {
this.postAction$.next()
}
We then have a method on the class that will trigger our action stream.
The complete component:
// app.component.ts
export class AppComponent {
// create your action stream
private readonly postAction$ = new Subject();
// subscribe to this post stream (high order observable) in your template to start listening
// for values emitted by the action stream
postStream$ = this.postAction$.pipe(
// pick the correct operator for your use case
exhaustMap(()=> {
return of('my post'); // this could be your http post
})
)
onBtnClick() {
this.postAction$.next()
}
}
With this approach, we can avoid having to manually call .subscribe()
when making http post requests.
You can find the full code on StackBlitz
Posted on October 9, 2019
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.