Welcome to new things

[Technical] [Electronic work] [Gadget] [Game] memo writing

Memo on how to use stream in Node.js

When processing data, we tend to be lazy and read all the data into memory before processing it.

This is not a problem when the data size is small, but when dealing with huge data, it consumes a lot of memory and takes a long time because other read/write operations are blocked until reading or writing of one piece of data is completed.

In such cases, stream processing, in which data is read in little by little and processed each time, requires only enough memory for the buffer and read/write data is executed in parallel, making it convenient to finish processing quickly.

Perhaps because stream processing is compatible with asynchronous processing, Node.js includes stream-related functions by default, and many external libraries are also capable of stream processing.

Stream processing often appears in Node.js, so it is useful to know how to use it, but because it deals with asynchronous functions, I think it is a little more difficult to grasp than when processing after reading the whole file.

Here, I would like to make a note of how to use such Node.js streams.

Read Stream

It provides a function that has the original data and reads data from it little by little.

  • The read data is acquired with the data event.
  • When data reaches the end, an end event is issued

Example: File loading

import * as fs from "fs";

(async () => {
    await new Promise((resolve, reject) => {

        const rs = fs.createReadStream('input.txt');

        // Read Stream
        rs.on('data', (chunk) => {
            console.log(chunk.toString());
        });

        rs.on('end', () => {
            console.log("READ STRAM END");
            resolve();
        });

    });

    console.log("PROGRAM END");
})();

write stream

It provides the ability to have an output destination for the data and to write out a little bit of data to it.

  • Writes data to the write stream with the function write()
  • After all data has been sent, the end() function tells the write stream that the data is finished
  • When the write stream completes outputting data, an finish event is issued

Example: File write

import * as fs from "fs";

(async () => {
    await new Promise((resolve, reject) => {

        const writeData = ["data_1", "data_2", "data_3"];

        const ws = fs.createWriteStream("output.txt");

        // Write Data
        writeData.forEach((v) => {
            ws.write(v);
        });
        ws.end();

        // Write Stream
        ws.on('finish', () => {
            console.log("WRITE STRAM END");
            resolve();
        });

    });

    console.log("PROGRAM END");
})();

Connecting read and write streams

When data is read from the read stream and output directly to the write stream, a scratch implementation is as follows.

  • The data event receives data from the read stream and write() writes the data to the write stream
  • When the read stream reaches the end of the data with the end event, it tells the write stream that the data is at the end with end().
  • All work is completed when the finish event is received, indicating that the write stream has been written.

Write to file while reading file (proprietary implementation)

import * as fs from "fs";

(async () => {
    await new Promise((resolve, reject) => {

        const rs = fs.createReadStream('input.txt');
        const ws = fs.createWriteStream("output.txt");

        // Read Stream
        rs.on('data', (chunk) => {
            ws.write(chunk);
        });

        rs.on('end', () => {
            ws.end();
        });

        // Write Stream
        ws.on('finish', () => {
            console.log("WRITE STRAM END");
            resolve();
        });

    });

    console.log("PROGRAM END");
})();

pipe

The above connection from the read stream to the write stream is done automatically by using the pipe() function of the read stream.

All work is completed when the finish event is received, which completes the writing of the write stream, as described above.

Writing to a file while reading a file (pipe)

import * as fs from "fs";

(async () => {
    await new Promise((resolve, reject) => {

        const rs = fs.createReadStream('input.txt');
        const ws = fs.createWriteStream("output.txt");

        rs.pipe(ws);

        // Write Stream
        ws.on('finish', () => {
            console.log("WRITE STRAM END");
            resolve();
        });
    });

    console.log("PROGRAM END");
})();

Duplex Tranform stream

This stream has both read and write functions.

It is mainly used in streams that perform data conversion, such as glib, which compresses and decompresses data.

It is a bit complicated, but the data to be converted is written in the "write stream" and the converted data is retrieved from the "read stream.

It is often used in the intermediate stream of pipe() described below, and is rarely used by itself.

  • Starts with writing input data to the write stream, retrieves result data from the read stream, and ends with completion of reading from the read stream
  • The end of data transmission to the write is communicated by the end() function of the write stream
  • All work is completed when the end event is received, indicating that the end of the read stream has been reached.

Example: Convert data to base64

import { Base64Encode } from 'base64-stream';

(async () => {
    await new Promise((resolve, reject) => {

        const base64 = new Base64Encode;

        // Write Data
        base64.write("data");
        base64.end();

        // Read Stream
        base64.on('data', (chunk) => {
            console.log(chunk);
        });

        base64.on('end', () => {
            console.log("READ STRAM END");
            resolve();
        });

    });

    console.log("PROGRAM END");
})();

The pipe() function for a read stream returns the stream passed as argument.

Passing a Duplex/Tranform stream to pipe() returns a Duplex/Tranform stream, which can then be used to call pipe() for the next stream.

As mentioned above, Duplex and Tranform streams are often used with pipe() to connect between streams.

Example: Compressing and saving a file

import * as fs from "fs";
import * as zlib from "zlib";

(async () => {
    await new Promise((resolve, reject) => {

        const rs = fs.createReadStream('input.txt');
        const ws = fs.createWriteStream("input.txt.gz");
        const gzip = zlib.createGzip();

        rs.pipe(gzip).pipe(ws);

        // Write Stream
        ws.on('finish', () => {
            console.log("WRITE STRAM END");
            resolve();
        });
    });

    console.log("PROGRAM END");
})();

Duplex and Tranform streams can be connected to each other by pipe().

The completion of all processing is detected by the finish event of the last write stream.

Example: Convert a file to base64 and save it compressed.

import * as fs from "fs";
import * as zlib from "zlib";
import { Base64Encode } from 'base64-stream';

(async () => {
    await new Promise((resolve, reject) => {

        const rs = fs.createReadStream('input.txt');
        const ws = fs.createWriteStream("output.txt.gz");
        const base64 = new Base64Encode;
        const gzip = zlib.createGzip();

        rs.pipe(base64).pipe(gzip).pipe(ws);

        // Write Stream
        ws.on('finish', () => {
            console.log("WRITE STRAM END");
            resolve();
        });
    });

    console.log("PROGRAM END");
})();

Streams data from memory

There are times when you want to pass memory data to a write stream.

You can write directly to the write stream with write(), or you can create a read stream from arbitrary data and pass it to the write stream with pipe().

To create a read stream of arbitrary data, follow these steps

  1. First, an empty read stream is created and
  2. Writes the data to be played in push() to the stream
  3. When there is no more data to flow, the end is indicated by writing null to push().

Example: Stream memory data to a file

import * as fs from "fs";
import { Readable } from 'stream';

(async () => {
    await new Promise((resolve, reject) => {

        const rs = new Readable();
        const ws = fs.createWriteStream("output.txt");

        rs.pipe(ws);

        // create read stream data
        rs.push("data");
        rs.push(null);

        ws.on('finish', () => {
            console.log("WRITE STRAM END");
            resolve();
        });
    });

    console.log("PROGRAM END");
})();

Impressions, etc.

It is possible to create an original stream that performs its own input source, output destination, and conversions, but the level of difficulty is high, so I think it is safe to use only available streams to get by.

Streams require the use of callback functions, but in these days of async and await, this seems a bit old-fashioned.

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com