NodeJS - Understanding Streams

khattakdev

Arsalan Khattak

Posted on July 27, 2020

NodeJS - Understanding Streams

Suppose you have to convert a file data into the Upper case. You will need to first store all the data from a file in memory, convert it to uppercase and store it back. What an easy task right? But that 2GB of data needs to be stored in memory, isn't this a terrible thing to do? Yes, it is. So what's the solution? Streams.
In this article, I'll talk about Streams in Nodejs.

Table of Content

What are Streams?

Streams are one of the most powerful concepts of Nodejs. They are used to read and write data in chunks. It allows us to work with large amounts of data without consuming too much memory. Streams are not only limited to working with files. They are also used on streaming websites like Netflix and Youtube, where the server sends chunks of data to the client.
Note: Streams are not limited to Nodejs, they are also available on other languages but in this post, we will only talk about NodeJs Streams.

Types of Streams

There are four types of streams in Nodejs

  • Readable: Streams from which we can read data (fs.createReadStream())
  • Writeable: Streams to which we can write data (fs.createWriteStream())
  • Duplex: Streams that are both readable and writeable (net.socket)
  • Transform: Streams that can transform or modify data as it is written and read. (zlib.createGzip())

We will cover Readable, Writable, and a bit of Transform Streams in this post.

Creating a Readable Stream

Readable streams as said in Types section are used to read data. A simple example of a readable stream is:
Let's have a look into another example.

const fs = require("fs");

var readableStream = fs.createReadStream("./myfile.txt");
readableStream.setEncoding("utf-8");

readableStream.on("data", (res) => {
  console.log(res);
});

In this example, we are reading a file myfile.txt and logging its data to the console. That's it. .setEncoding as the name defines itself, is a function that helps you to set character encoding.

Let's look into another example.

const fs = require("fs");

var readableStream = fs.createReadStream("./myfile.txt");

readableStream.on("readable", () => {
  let chunk;
  while (null !== (chunk = readableStream.read())) {
    console.log(chunk.length`);
  }
});

In the above example, we are trying to read a file myfile.txt and it will return you bytes of data, it received.
.on is a function which is an event emitter that lookup for events and executes the function passed into the second argument as a callback.
.read is a function is that reads some data out of the internal buffer and returns it.
If instead of chunk.length we console chunk we will receive a buffer.
For example, in my case, chunk.length was 11 bytes and console.log(chunk) returned

<Buffer 48 65 6c 6c 6f 20 57 6f 72 6c 64>

If you have noticed, the first example was printing human-readable content while this example shows Buffer. That's because in the first example we used .setEncoding().
If you want to convert this buffer into a human-readable format, you can simply use .toString() and it will return you the original context or you can go with .setEncoding() function.

Creating a Writeable Stream

So now you know how to create a readable stream, it will be easy for you to pick up writeable streams.
Here's a simple example of a writeable stream.

const fs = require("fs");

var writeableStream = fs.createWriteStream("./newfile.txt");

writeableStream.write("Hello Humans\n");
writeableStream.write("Hello Aliens\n");
writeableStream.write("Hello Other Creatures");

This will create a new file (if not exist) or override (if exist) myfile.txt and store the content we passed into .write(). On executing this example, the content of myfile.txt will be

Hello Humans
Hello Aliens
Hello Other Creatures

At the beginning of this post, I talked about a scenario, where you have to convert file data into the upper case. Let's do that by creating a readable and writeable stream.

const fs = require("fs");

var readableStream = fs.createReadStream("./newfile.txt");
var writeableStream = fs.createWriteStream("./uppercasefile.txt");
readableStream.setEncoding("utf-8");

readableStream.on("data", (data) => {
  writeableStream.write(data.toUpperCase());
});

Note: You can't use the read and write data into the same file simultaneously, which will result in losing the file content.
The above example will

  • Read newfile.txt data in chunks
  • Convert each chunk to Upper Case
  • Store Upper case Chunk to uppercasefile.txt

Stream Modes

There are two stream modes

  • Flowing Mode: In this mode, data is read automatically and provided as soon as possible via Event Emitters (.on())
  • Paused Mode: In this mode, .read() must be called explicitly to read the data.

In Creating a Readable Stream section, we used two examples, the first one was flowing mode, where we were using event emitter (.on("data"...) to read the data as soon as it is available. While in the second example, we were using .read() to read the content explicitly.

By default, the mode is set to paused mode but we can convert them from one mode to another.

Pause to Flowing

The paused mode can be converted to the flowing mode by one of the following ways:

  • Using .on("data",... event handler
  • Calling .resume() method
  • Calling .pipe() method

We have already seen the first method. Let's see the other two

resume()

const { Readable } = require("stream");

async function* values() {
  yield "Hello";
  yield "World";
  yield "Hi";
  yield "There";
  yield "Man";
}

var readable = Readable.from(values());
readable.on("end", () => {
  console.log("Reached the end, but did not read anything.");
});

In this above example, we are using Readable from streams instead of createReadStream from fs. Readable is also a read stream.
In this above example, we are also using generators.
I expect this example to print Reached the end but did not read anything. when reading from file ends. But it will print nothing on the screen. The reason for that is the stream is in pause mode.

To convert it into flowing mode and print that message, we need to add .resume() before .on().

var readable = Readable.from(values());
readable.resume().on("end", () => {
  console.log("Reached the end, but did not read anything.");
});

This will now work because it changes the mode to flowing. If generators are new to you or you don't understand this problem because of generators or Readable. Let's convert this example into fs.createReadStream().

const fs = require("fs");

var readable = fs.createReadStream("./myfile.txt");
readable.resume().on("end", () => {
  console.log("Reached the end, but did not read anything.");
});

.pipe()

This job of .pipe() function is to get the data from a readable stream and write it to a writeable stream.

const fs = require("fs");

var readableStream = fs.createReadStream("./newfile.txt");
var writeableStream = fs.createWriteStream("./secondfile.txt");

readableStream.pipe(writeableStream);

In this example, we are reading data from newfile.txt and piping it to secondfile.txt. This means as soon as we have a chunk available in readableStream, it will automatically be written into writeableStream.

You might be thinking, what If we want to operate on that data before we store it (for example convert it into uppercase). For that, we have to use Transform. We will take about those in a bit but first, let's see how to convert a flowing mode into a paused mode.

.pause()

We can convert a flowing mode into a pause by using .pause() on a readable stream.

const { Readable } = require("stream");

async function* values() {
  yield "Hello";
  yield "World";
  yield "Hi";
  yield "There";
  yield "Man";
}

var readable = Readable.from(values());
var count = 0;
readable.on("data", (res) => {
  console.log(res);
  if (count == 0) readable.pause();
  count++;
});

This example when executed reads the first value Hello from a generator and converts the stream into the paused mode. Because of this, we can only see Hello on the screen.

Transform

We will not go into much detail but cover this type of stream with a basic example. As mentioned in Types of Streams section, Transform is a stream, that can modify a data. Let's try to use pipe and convert the data into uppercase before storing it.

const fs = require("fs");
const { Transform } = require("stream");
var readableStream = fs.createReadStream("./myfile.txt");
var createableStream = fs.createWriteStream("./uppercasefile.txt");

var processData = new Transform({
  transform(chunk, encoding, cb) {
    this.push(chunk.toString().toUpperCase());
  },
});
readableStream.pipe(processData).pipe(createableStream);

In this example, we are piping readableStream to processData, which converts it to uppercase and then we pipe it again to createableStream which stores it. processData is a Transform stream, which converts available chunk into uppercase.

transform(chunk, encoding, cb) {
    this.push(chunk.toString().toUpperCase());
  },

This piece of code is used to modify the chunk of data it receives. In this example, we are modifying the chunk by converting it into uppercase and pushing it to the stream.

Zipping and Unzipping of Streams

Nodejs has a built-in module zlib which helps Zipping and Unzipping files and it's pretty simple to use.
Let's see it in action by compressing a local file.

const fs = require("fs");
const zlib = require("zlib");

var readableStream = fs.createReadStream("./myfile.txt");
var compressedStream = zlib.createGzip();
var writeableStream = fs.createWriteStream("./myfile.txt.gz");

readableStream.pipe(compressedStream).pipe(writeableStream);

This simple example takes data from myfile.txt, compress it and store it into myfile.txt.gz.
If you type cat myfile.txt.gz you will see some weird characters but you can use vim to see original content vim myfile.txt.gz which will show you original content. (Vim automatically unzip the content if it has .txt.gz extension)

Extracting is also as easy as compressing.

const fs = require("fs");
const zlib = require("zlib");

var readableStream = fs.createReadStream("./myfile.txt.gz");
var extractStream = zlib.createGunzip();
var writeableStream = fs.createWriteStream("./extractfile.txt");

readableStream.pipe(extractStream).pipe(writeableStream);

This will extract myfile.txt.gz and store the data in extractfile.txt

Conclusion

This was Streams in Nodejs. By now you have a good understanding of streams and you will be able to work with them.

  • You have learned what problem streams solve and why are they so useful.
  • You have learned different types of streams and modes.
  • You have learned how to read and write data from files using stream.
  • You have learned how to compress and extract data using zlib
💖 💪 🙅 🚩
khattakdev
Arsalan Khattak

Posted on July 27, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related