Slice of Dev Logo

Streams in NodeJS

Cover Image for Streams in NodeJS
Author's Profile Pic
Rajkumar Gaur

In NodeJS applications, data can be streamed from one location to another using streams, essentially a series of chunks (Buffers or strings) of data read or written one at a time. Streams can be used to process data as it’s being read or written, which allows for efficient and low-memory usage processing of data.

Streams in NodeJS come in different types, including readable, writable, duplex, and transform streams, each with unique characteristics and use cases.

In the following sections, we will explore the different types of streams in NodeJS and how they can be used to process data efficiently in various scenarios.

Readable Streams

Readable streams are used for reading data from various sources such as files, HTTP requests, or even other streams. Readable streams emit a data event every time there is data available to be read, and an end event when the stream has ended.

To use a readable stream in NodeJS, you can create an instance of the stream and then register a data event listener to handle incoming data.

Here’s an example of using a readable stream to read data from a file and write it to the console:

// data.txt

Far far away, behind the word mountains, far from the countries Vokalia and Consonantia, there live the blind texts. Separated they live in Bookmarksgrove right at the coast of the Semantics, a large language ocean.

const fs = require('fs');

const readableStream = fs.createReadStream('data.txt');

readableStream.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data -> ${chunk}`);
});

readableStream.on('end', () => {
  console.log('End of stream.');
});

// Output

Received 215 bytes of data -> Far far away, behind the word mountains, far from the countries Vokalia and Consonantia, there live the blind texts. Separate
d they live in Bookmarksgrove right at the coast of the Semantics, a large language ocean.
End of stream.

Chunks where? The use case of a stream is hard to be seen here as we only have 215 bytes of data that got read at once because the default maximum chunk size is 64 kilobytes. We can reduce the default chunk size to 100 to see the above file get split into 3 different chunks.

const readableStream = fs.createReadStream("data.txt", {
  highWaterMark: 100,
});

// Output

Received 100 bytes of data -> Far far away, behind the word mountains, far from the countries Vokalia and Consonantia, there live
Received 100 bytes of data -> the blind texts. Separated they live in Bookmarksgrove right at the coast of the Semantics, a large
Received 15 bytes of data -> language ocean.
End of stream.

We can also create our own readable stream from scratch with custom streaming logic using the Readable class from the stream module.

We create a new instance of Readable and pass an object that has a read function. The read function gets called when data is requested from the stream.

The read function is implemented to return the data string in chunks of 50 characters at a time. Once all the data has been read, we push a null value to signal the end of the stream.

const { Readable } = require("stream");

const data =
  "Far far away, behind the word mountains, far from the countries Vokalia and Consonantia, there live the blind texts.";

const readableStream = new Readable({
  read() {
    if (!this.i) this.i = 0;
    if (this.i === data.length) {
      this.push(null);
    } else {
      const chunk = data.substring(this.i, this.i + 50);
      this.push(chunk);
    }
    this.i += 50;
  },
});

readableStream.on("data", (chunk) => {
  console.log(`Received ${chunk.length} bytes of data -> ${chunk}`);
});

readableStream.on("end", () => {
  console.log("End of stream.");
});

// output

Received 50 bytes of data -> Far far away, behind the word mountains, far from
Received 50 bytes of data -> the countries Vokalia and Consonantia, there live
Received 16 bytes of data -> the blind texts.

Writable Streams

Writable streams are instances of the Writable class in Node.js. They provide methods such as write() and end() that can be used to write data to the stream. When data is written to a writable stream, it is buffered and written to the underlying destination in chunks.

Some examples of built-in Node.js modules that use writable streams include:

  • fs.createWriteStream(): used to write data to a file
  • http.ServerResponse: used to write HTTP responses

const fs = require('fs');
const writeStream = fs.createWriteStream('example.txt');

writeStream.write('Hello, world!\n');
writeStream.write('This is an example file.\n');
writeStream.end();

But why use streams when we can write using writeFile ?

When data is written directly to a file using the fs.writeFile() or fs.appendFile() method, the entire contents of the file must be loaded into memory before the new data can be appended to it. This can be a problem when dealing with large files, as it can quickly consume a significant amount of memory. With streams, data is written to the file in chunks, so only a small portion of the file needs to be loaded into memory at any given time.

Let’s see how to create a stream using the Writable class from the stream module.

const { Writable } = require("stream");

// Create a writable stream to append data to a string
let output = "";
const writeStream = new Writable({
  write(chunk, encoding, callback) {
    // Append the chunk to the output string
    output += chunk.toString();
    callback();
  },
});

// Write data to the stream
writeStream.write("Hello, world!\n", "utf-8");
writeStream.write("This is an example string.\n", "utf-8");
writeStream.end();

// Output the final string
writeStream.on("finish", () => {
  console.log(output);
});

// output
Hello, world!
This is an example string.

The code is self-descriptive, but what is the callback function?

By calling the callback function, the write method signals to the stream that it is ready to receive more data. If there is an error while processing the current chunk, the callback function should be called with an error object as the first argument, which will cause the stream to emit an error event.

We can take this to the next level by adding the highWaterMark option while constructing the writable stream. writableStream.write will return false if the chunk length is greater than the highWaterMark, letting the sender or writer know to slow down and not send more chunks.

const { Writable } = require('stream');

// Create a writable stream with a highWaterMark of 5 bytes
const writableStream = new Writable({
  highWaterMark: 5,
  write(chunk, encoding, callback) {
    // Log the chunk received
    console.log(`Received chunk: ${chunk.toString()}`);

    // Delay the processing of the chunk by 1 second
    setTimeout(() => {
      console.log(`Processed chunk: ${chunk.toString()}`);

      // Call the callback function to signal that the chunk has been processed
      callback();
    }, 1000);
  }
});

Let’s also create a helper function called writer which respects the buffer size of the stream and slows down till the stream is ready to process more chunks.

If the data cannot be written to the stream because the buffer is full, the writer function waits for the drain event to be emitted by the stream, which indicates that the buffer has been emptied and it is safe to write more data.

// Helper function to write data
function writer(chunk) {
  return new Promise((resolve, reject) => {
    if (!writableStream.write(chunk)) {
      writableStream.on("drain", () => {
        resolve(chunk);
      });
    } else {
      resolve(chunk);
    }
  });
}

writer("1234")
  .then(() => writer("67890"))
  .then(() => writer("abcdef"))
  .then(() => "All chunks processed!");

// output
Received chunk: 1234
Processed chunk: 1234
Received chunk: 67890
Processed chunk: 67890
Received chunk: abcdef
Processed chunk: abcdef

Duplex Streams

Duplex streams are useful for situations where data needs to be both read and written simultaneously, such as in network communication or real-time data processing.

To create a duplex stream in Node.js, you can use the Duplex class provided by the stream module. The Duplex class extends both the Readable and Writable classes, making it possible to both read and write data to the stream.

const { Duplex } = require("stream");

// Define a duplex stream that transforms each chunk to uppercase
const upperCaseStream = new Duplex({
  write(chunk, encoding, callback) {
    const upperCaseChunk = chunk.toString().toUpperCase();
    this.push(upperCaseChunk);
    callback();
  },
  read() {},
});

// Write data to the stream
upperCaseStream.write("hello");
upperCaseStream.write("world");

// Read the transformed data from the stream
upperCaseStream.on("data", (chunk) => {
  console.log(chunk.toString()); // Output: HELLO WORLD
});

// End the stream
upperCaseStream.end();

In this example, we define a Duplex stream called upperCaseStream that transforms each chunk to uppercase using the write method. We implement the read method as a no-op function because we don't need to do anything special when the stream is being read from.

Combining Streams

It is often necessary to send or transform data from one stream to another to create complex pipelines.

Following is the most basic example of sending the data from one stream to another:

const { createReadStream, createWriteStream } = require('fs');

const readStream = createReadStream('input.txt');
const writeStream = createWriteStream('output.txt');

readStream.pipe(writeStream);

It is also possible to have multiple writable streams listening to a single readable stream:

const { createReadStream, createWriteStream } = require('fs');

const readStream = createReadStream('input.txt');
const writeStream1 = createWriteStream('output1.txt');
const writeStream2 = createWriteStream('output2.txt');

readStream.pipe(writeStream1).pipe(writeStream2);

If you need to do any kind of transformation between the reading and writing steps, Transform streams can be used. Transform streams are a special kind of stream that extends the Duplex class and allows the transformation of the data.

const { createReadStream, createWriteStream } = require('fs');
const { Transform } = require('stream');

const readStream = createReadStream('input.txt');
const writeStream = createWriteStream('output.txt');

const transformStream = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

readStream
  .pipe(transformStream)
  .pipe(writeStream);

The example from the previous section about the Duplex stream could have also been used here instead of the Transform stream.

Another way to make the data flow between the streams is the pipeline method.

Pipelines are implemented using the pipeline() method, which is provided by the stream module in Node.js. The pipeline() method takes an arbitrary number of streams and connects them together so that data flows from the first stream to the last stream.

const { pipeline } = require('stream');

pipeline(
  // input stream
  readableStream,
  
  // intermediate streams
  transformStream1,
  transformStream2,
  
  // output stream
  writableStream,
  
  // callback function
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

Conclusion

Wise use of streams can make the handling of large data elegant and more performant. I hope this helped you in understanding streams.

Thank you for reading and see you at the next one!

Follow me on Twitter 🐤

Helpful Links


Cover Image for React Interview Experience

React Interview Experience

This blog is about my recent React interview experiences and some interesting questions that were asked. These questions might help you prepare for your next interview. Guess The Output | useState vs useReducer | useCallback and useMemo | Redux Vs Context API | Manage The Focus Of Elements Using React | Why is useRef used | Coding Problem.

Author's Profile Pic
Rajkumar Gaur
Cover Image for Promises with Loops and Array Methods in JavaScript

Promises with Loops and Array Methods in JavaScript

Promises are objects that represent the eventual completion (or failure) of an asynchronous operation and allow you to handle the result of that operation when it completes. It’s important to understand how loops behave and how to use promises to control the flow of execution.

Author's Profile Pic
Rajkumar Gaur