I have a Readable stream in object mode that I'm pushing data in, like this
const getReadStream = () => {
const stream = new Readable({ objectMode: true, read: () => {} });
const get = async (page = 1) => {
const { data } = await client
.request(...)
.catch((error) => {
stream.emit('error', error);
return { data: undefined };
});
const results = parseFn(data);
if (results.length > 0) {
results.forEach((row) => stream.push(row));
get(page + 1);
} else {
stream.push(null);
}
};
get();
return stream;
};
I would want to consume it into some Writable stream, like this
const consumeStream = async () => {
const readableStream = getReadStream();
const pipeline1 = pipeline(
stream,
transform1,
transform2,
someWritableStream,
);
if (!certainCondition) {
return pipeline1;
}
const writeStream2 = new Writable({
objectMode: true,
write: (rows: any[], _, callback) => {
somePromise(rows).then(() => callback());
},
});
const pipeline2 = pipeline(
stream,
transform3,
transform4,
writeStream2,
);
return Promise.all([pipeline1, pipeline2]);
};
My question is, in the case where consumeStream proceed with pipeline2 , is it the correct way to implement multiple writable stream with only 1 readable stream (as the number of objects processed by pipeline1 = the number of objects processed by pipeline2. And if it is not, what is the correct way to implement this?
You're going to have issues with the original stream data being already in progress or potentially ended by the time you start your second pipeline.
If you use a PassThrough stream to duplicate data coming off of the original, then your method will work.
Here's a little script that demonstrates
The results printed are
So in your case you would do something like