Deep Dive into Observables
Muhammad Muhktar Musa
Posted on August 19, 2021
Introduction
This article aims to give a better understanding of observables. How to subscribe to observables. The relationship between observables and observers. It also shows how to create an observable from scratch, unsubscribe and handle errors in an observable.
Observables
Observables represent a stream of data/events that will arrive over time. Observables and operators are combined to create a pipeline of transformation.
Observables as the name suggests are used for observing data. Observables come in two parts: The observable and the observer.
An observable retrieve and send data while an observer works with the data. Observables execute only once. It will go on observing the data until the data comes. Once the data comes, it will stop observing them.
Subscribing to an Observable
Let us take a look at an example code from an angular project.
export class TestComponent implements OnInit{
constructor(private route: ActivatedRoute) { }
ngOnInit(): void {
this.route.data.subscribe((data) => {
console.log(data);
});
}
In the above code, the subscribe method is an observer and the data is the observable
.subscribe((data)
Anytime the route sends the data, it will be captured in the subscribe method. The observable above is a built-in angular observable. There are loads of methods that we can use to create observables. Let us take a look at an interval method
The interval method creates an observable that emits sequential numbers every specified interval of time on a specified schedule.
It returns an Observable that emits an infinite sequence of ascending integers, with a constant interval of time of your choosing between those emissions. The first emission is not sent immediately, but only after the first period has passed. By default this operator uses the async SchedulerLike to provide a notion of time. You may also pass any SchedulerLike to it.
export class TestComponent implements OnInit {
constructor(private route: ActivatedRoute) { }
ngOnInit(): void {
interval(2000)
}
};
The number in the interval method in the code block below
interval(2000)
takes the period in time and it is set to milliseconds. From the above, the interval is set to send data after every two seconds. We can catch this data by subscribing to the observable which is our interval of course.
export class TestComponent implements OnInit {
constructor(private route: ActivatedRoute) { }
ngOnInit(): void {
interval(2000).subscribe(c => {
console.log(c);
})
}
};
Observables do not rely on a component which makes them susceptible to memory leak.
Unsubscribing an Observable
Memory leakage can lead to bad project performance. Memory leakage can be restricted by unsubscribing an observable. Let us create a variable of type Subscription
intervalSubscription: Subscription;
Subscription is imported from rxjs. The variable we have created above gives us access to the subscription. Thereby allowing us to assign our interval method to it.
export class TestComponent implements OnInit{
intervalSubscription: Subscription;
constructor(private route: ActivatedRoute) { }
ngOnInit(): void {
this.intervalSubscription =
interval(2000).subscribe(c => {
console.log(c);
})
}
};
This variable allows us to leave a component and destroy the data generated. It also allows us to unsubscribe to the observable generating the data.
To do this, we use a life cycle hook called onDestroy. It is called when a directive, pipe or service is destroyed. It can also be used for any custom clean up that needs to occur when the instance is destroyed. We create the onDestroy by including it in our export class
export class TestComponent implements OnInit, OnDestroy {
To unsubscribe an observable the ngOnDestroy method is used. This method is a callback method that performs custom clean-up. It is invoked immediately before a directive, pipe, or service instance is destroyed.
export class TestComponent implements OnInit, OnDestroy {
intervalSubscription: Subscription;
constructor(private route: ActivatedRoute) { }
ngOnInit(): void {
this.intervalSubscription =
interval(2000).subscribe(c => {
console.log(c);
})
}
ngOnDestroy() {
this.intervalSubscription.unsubscribe();
}
};
In this way, we can prevent memory leakage.
An Angular observable knows when a component is being left, thereby unsubscribing automatically. while the rxjs observable does not behave that way. You need to unsubscribe your rxjs observables. Unsubscribing an Angular observable can be done manually. It is not advisable to do so as it can lead to failures while running the app.
As can be seen from the little examples we can boldly say observables are used to achieve correctness in an app. We use it to write declarative functions thereby increasing performance and resiliency.
We can also understand observables as wrappers on data sources. They can also be wrapped around asynchronous data sources too. The observer executes some code whenever a new value or error is received from the observable or when the observable completes.
Observable method and Observer methods
The subscription is a single method that ties the observable to a stream of values. The observer listens out for these values. The observer on the other hand implements three methods on the observable. These are
next();
error();
complete();
next(): the next() method will be executed whenever a new value is is received
error(): the error() method is called whenever the observable encounters an error
complete(): This method is called whenever the observable is done
Some observables will never complete. This happens especially if they are wrapped on an onClick button. This is because there is a tendency for a user to click the button again and again.
The contract between an observable and an observer is the subscription. The observable knows that the observer could fire a next(), error(), complete() method. The observer knows that the observable can fire only one of these three methods.
We can have a single value or multiple values from a data stream. Whatever the case maybe we have an observer which can handle multiple values. In the end, we might have an endpoint when the observable is done or the end might never occur as in the case of the onClick. If we do complete the observable, we can call end and execute complete(). The observable has to provide this on the observer object. Note that if the stream completes, it cannot error out afterwards. If the stream errors out, it cannot complete afterwards.
Let us take a look at an example in code in the angular component. Let us create a button in our Html file
<button>Click me</button>
In the ts file,
constructor(private route: ActivatedRoute) { }
ngOnInit(): void {
this.route.data.subscribe((data) => {
console.log(data);
})
};
The subscribe method above is the observer and the function value is the next() function. We can wrap the above function in a variable. In that case it will look like this
var observer = {
next: function (data) {
console.log(data)
},
error: function (error) {
console.log(error)
},
complete: function () {
console.log("done")
}
};
The variable can easily be passed to a subscribe method. Example
ngOnInit(): void {
this.route.data.subscribe(observer);
var observer = {
next: function (data) {
console.log(data)
},
error: function (error) {
console.log(error)
},
complete: function () {
console.log("done")
}
}
};
Creating an Observable from scratch
To build an observable from scratch, an rxjs method called create() is used. The create() method takes only one argument which is the observer. Let us create an observable using this method.
We will use reactive.io to get our observable instance.
NB: Check reactive.io for documentation
We will pass an anonymous function to the create() method
Rx.Observable.create((obs) => {
obs.next().subscribe(observer)
});
This anonymous function takes an argument obs. It passes this argument as an observer to the anonymous function. This is how an observable is created.
The next() method can now be called in the anonymous function. The observables know that the observer has the next(), error() and complete() methods.
All these methods can be passed into the anonymous function. If an error occurs, the observable is finished. It won't call another next() or complete() method.
The complete() method completes an observable. This prevents the call of any other method when implemented. The error() and complete() methods are mutually exclusive methods. They both can never be called observables. Any one of them triggered prevents the other from being called.
Error handling
Observables produce values asynchronously. We can handle errors by specifying an error callback on the observer. When an observable produces an error, it cleans up subscriptions and stop producing values. Any given stream of observable data can only error out once ending the stream's life cycle.
Rx.Observable.create((obs) => {
obs.next().subscribe(
next(num) { console.log('Next num: ' + num) },
error(err) { console.log('Received an error: ' + err) }
)
});
Handling errors using the subscribe call is sometimes all that we need. But this error handling approach is limited. It is difficult to recover from the error or emit an alternative fallback value that will replace the value we were expecting.
catchError
The catchError operator is used for advanced error handling strategies. This error handling functionality is provided by rxjs. The catchError method is a function that takes in an input observable and outputs an output observable. With each call to catchError, a function is passed which we will call the error handling function.
The catchError operator takes as input an observable that might error out. It starts emitting the values of the input observable in its output observable. If no error occurs, the output observable produced by catchError works exactly the same way as the input observable.
If an error occurs, the catchError logic handles it. It returns an observable which is a replacement observable for the stream that errored out. This replacement observable is going to be subscribed to and its values are going to be used in place of the errored out input observable.
Rx.Observable.create((obs) => {
obs.next().subscribe(
next(num) { console.log('Next num: ' + num) },
catchError(() => of([]))
)
});
Only when an error occurs in the input observable of catchError, will the error handling function be called.
This type of error is called a catch and replace error. Let us break it down
We are passing to the catchError operator an error handling function.
The error handling function is not called immediately and in general it is usually not called.
Only when an error occurs in the input observable of the catchError will the error handling function be called.
If an error occurs in the input stream, the function returns an observable built using the of([ ]) function.
The of() function builds an observable that emits only one value ([ ]) and then completes.
The error handling function returns the recovery observable (of([ ])) that gets subscribed by the catchError operator.
The values of the recovery observable are then emitted as replacement values in the output observable returned by the catchError operator.
As a result, the observable we are subscribing to will no longer error out anymore instead an empty array value [ ] will be emitted. The observable is then completed.
This signifies that the original error call in an observable will no longer be called, if there is an error the catchError operator will handle it.
Posted on August 19, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.