Can I use Node.JS PassThrough stream as a simple input buffer?

78 views Asked by At

I am running a subprocess from Node.js, let's call it generator. I need to read its output and process it. At some times, the generator spits out a big chunk of data, around 50MB/s. But most of the time, it produces data at much slower rate.

The code that reads the data also sometimes slows down and doesn't read as fast. Overall, my Node.js program reads faster than the generator produces, but these changes in both read speed on my side and output speed of the generator result in occasional back pressure which slows the generator.

I want to have up to about 50MB of generators output buffered in Node.js. I tried this, but I am not seeing much improvement and I do not know how to accurately benchmark this:

/**
 * 
 * @param nodeInputStream
 * @returns {Promise<null>} returns when end of stream is reached
 */
async function readAndProcessStream(nodeInputStream)  {
    // implementation redundant
    return;
}

async function createProcessAndRead() {
    const childArgs = ["arg1", "arg2"];
    const programName = "my_program";
    console.log("Spawn with args: ", programName, childArgs.join(" "));
    const childProc = child_process.spawn(
        programName,
        childArgs,
        {
            stdio:["ignore", "pipe", "ignore"],
            detached: true
        }
    );

    const exitCodePromise = new Promise((resolve, reject) => {
        childProc.once('close', resolve);
    });

    // Try to make a 50MB buffer
    const bufferStream = new PassThrough({emitClose: true, highWaterMark: 50*1024*1024});
    childProc.stdout.pipe(bufferStream);
    await readAndProcessStream(bufferStream);
    // make sure to wait till the process really exists
    await exitCodePromise;
}

Is the above code correct to make a 50MB worth of buffer space between the generator and the function that handles the stream? If not, what is the correct approach?

1

There are 1 answers

1
Patryk Falba On

This should be simplier. Wait until your bufferStream receives an end event or in the readAndProcessStream chunk is null. This means that you have processed the entire output from the subprocess. There is no need to wait additionally for the subprocess to finish.