Using Observables to transform data in TypeScript
Matt Angelosanto
Posted on October 21, 2021
Written by Emmanuel John ✏️
Introduction
Event processing is one of the common implementations when building TypeScript applications.
Registering an event as well as executing some actions when an event has been triggered requires an Observable pattern, which provides a unidirectional flow to streams of events, making debugging and error handling with event streams a lot easier.
In this article, we will explore Observables, how they are used to handle event-driven data, and errors and asynchronous data.
- Introduction to Observables
- Observable operators
- Transforming arrays with Observable
- Combining multiple operators for event stream transformation
- Handling errors with Observables
Introduction to Observables
One of the interesting, but sometimes brain-teasing, parts of JavaScript in general is handling events.
Consider a real-time chat application like Facebook, which has multiple events taking place within a particular time frame. A user could be typing some text on his news feed while receiving some notifications and messages from other friends in no particular order. It is now the responsibility of the application to handle these events. This is where Observables come in.
An Observable is a collection of multiple input values that get processed using array methods such as map
, reduce
, filter
, and so on. It comes in handy when handling asynchronous operations such as making HTTP requests, user-input events, and so on.
Observable pattern is a design pattern for registering an event, as well as implementing a process when an event has been triggered. One of the most powerful and popular JavaScript libraries that specializes in event processing is the Reactive Extensions for JavaScript library, also known as RxJS.
For us to get started, we need to ensure we have the RxJS library installed. So, we install the library with this command:
npm install rxjs
Already, this library has all the declaration files that are needed by TypeScript, so there’s no need to independently install them.
In order to create an Observable, we need the Observable
type and of
method from RxJS as follows:
import { of, Observable } from "rxjs";
const emitter : Observable<string> = of("Sam", "Ray", "Thomas");
In the above snippet, we imported the of
function and the Observable
type from the RxJS library followed by creating an Observable
from the strings "Sam"
, "Ray"
, and "Thomas"
.
Next, we will subscribe to an Observable as follows:
emitter.subscribe((value: string) => {
console.log(`Name: ${value}`)
})
In the above snippet, we registered an Observer by calling the subscribe
method on the emitter
variable. Since the emitter
variable is of type Observable, we can access the subscribe
method automatically. The subscribe
method receives a function as a parameter, and this function will be called once for each value that is emitted by the Observable.
The above code will output the following Observable stream:
Name: Sam
Name: Ray
Name: Thomas
Observable operators
Pipeable Operators and Creation Operators are the two kinds of operators in RxJS.
The Pipeable Operators are methods that take an Observable as input and return another Observable. They can be piped to Observables using the syntax observableInstance.pipe(operator())
. It includes the filter(...)
and mergeMap(...)
methods.
Creation Operators are operators that create a new Observable when called.
Creation Operators includes the following:
You can reference the RxJS official docs for a complete list of operators.
Transforming arrays with Observable
The RxJS from
method allows the transformation of data in an array. It receives an array as an input and transforms each datum in the array into an Observable.
Let’s consider the code snippet below:
const transformArray: Observable<number> = from([1, 2, 3, 4]);
transformArray.subscribe((value: number) => {
console.log(`value: ${value}`);
});
The above snippet uses the from
method to transform the values in the array to an Observable followed by calling the subscribe
method on the transformArray
variable. The subscribe
method accepts a function that gets executed for each value emitted by the Observable.
Combining multiple operators for event stream transformation
RxJS provides us with the pipe
method that allows multiple operator methods to be combined for the complex transformation of event streams.
Let’s consider the following code:
const emitter = of(4, 9, 16, 25)
const mapValue = emitter.pipe(
map((value: number) => {
return Math.sqrt(value)
}),
map((value: number) => {
return `square root: ${value}`
})
)
mapValue.subscribe((value: string) => {
console.log(`string value emitted ${value}`)
})
Here, the RxJS of
method is used to create an Observable from the numbers 4
, 9
, 16
, and 25
. The pipe
method takes in two map
methods. The first map
method returns the square root of the input value and the second map
method transforms the output of the first map
method to a string.
Finally, the subscribe
method is called on each Observable emitted.
Running the above code will output as follows:
string value emitted square root: 2
string value emitted square root: 3
string value emitted square root: 4
string value emitted square root: 5
Observable errors
Handling exceptions within an Observable stream requires a well-structured mechanism to catch these exceptions.
Let's consider the code snippet below:
interface IName {
value: string;
}
interface IObj {
name?: IName;
}
const emitObj: Observable<IObj> = of(
{ name: { value: "Bob" } },
{},
{ name: { value: "Sam" } }
);
In the above snippet, we created IName
with the property value
of type string
. Also, we created IObj
with an optional property name
of type IName
. We then create the emitObj
Observable, which emits three values.
Now let’s consider the following Observable stream:
const returnName = emitObj.pipe(
map((value: IObj) => {
return value.name!.value;
})
);
returnName.subscribe((value: string) => {
console.log(`name: ${value} `)
});
In the above snippet, we created an Observable stream returnName
, which returns the name.value
property for the input stream value. Then, we subscribe to this stream and log the received value to the console.
When we run this snippet, we will get the following output:
name: Bob
TypeError: Cannot read property 'value' of undefined
This error occurs because the second value emitted in our Observable stream does not have a name property, resulting in an undefined value.
In order to fix this error, we can create an error handler for the subscribe
function in our Observable stream.
Let’s consider the following:
returnName.subscribe(
// called for each observable value
(value: string| null) => {
console.log(`name: ${value} `);
},
// called if an error occurs
(error: unknown) => {
console.log(`error : ${error}`);
},
// called when execution is done
() => {
console.log(`done`);
}
);
Here, we have provided the subscribe
method with three functions as arguments, with the first called for each value that is emitted by the Observable stream, the second function called if an error occurs, and the last function called when the subscribe
method execution is completed.
When we run this snippet, we will get the following output:
name: Bob
TypeError: Cannot read property 'value' of undefined
What happens here is that the first value emitted by the Observable stream is handled by the first function provided to the subscribe
method. The second value emitted by the Observable stream causes an error, which is being trapped by the error function provided to the subscribe
method. The last function is not called because of the error that occurs in the Observable stream.
catchError
Another way to handle Observable errors is to use the catchError
operator within an Observable stream itself, so that we can catch these errors early enough.
Let’s consider the following code snippet:
const returnName = emitObj.pipe(
map((value: IObj) => {
return value!.name!.value;
}),
catchError((error: unknown) => {
console.log(`stream caught : ${error}`);
return of(null);
})
);
Here, we have added a catchError
operator to the Observable stream. Within this function, we log the error message to the console and then return an Observable value of null
. With this implementation, the last function gets triggered even when an error occurs in the Observable.
When we run this snippet, we will get the following output:
Name: Bob
stream caught : TypeError: Cannot read property 'value' of undefined
received null
done
What happens here is that the map
method generates an error for the second value of the Observable stream, after which our catchError
function is being called with the following error: "Cannot read property 'value' of undefined." Also, the last method gets triggered.
Once an error occurs within an Observable stream, the stream will stop emitting values. This is why the emitObj
Observable stream does not emit the value of its last parameter.
Conclusion
In this article, we’ve explored data transformation with Observable streams and how to implement them. Also, we’ve discussed error handling with Observables and provided strategies to implement error handlers when working with Observable streams.
Hopefully you’ve found this post informative and helpful. You can also check out the official RxJS documentation for a deep dive into the RxJS Observable module. Also, I recommend Brian Troncone’s learnrxjs.io for a more visual explanation of RxJS concepts.
Writing a lot of TypeScript? Watch the recording of our recent TypeScript meetup to learn about writing more readable code.
TypeScript brings type safety to JavaScript. There can be a tension between type safety and readable code. Watch the recording for a deep dive on some new features of TypeScript 4.4.
Posted on October 21, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
September 22, 2022