Stream Pipeline stops midway

65 views Asked by At

So, I wanted to execute a process in a stream. At first, it works well with just few data/iterations but if there's more data involved, the pipeline just stops midway.

Here's what my code looks like.

Basically, for this example, I wanted to process around 200 data.

but only 30 data are processed successfully. It stops after that. No error message and I don't have any clue why.

import { pipeline } from "stream/promises"

async function* func() {
  for (let i = 0; i < 200; i++) {
    console.log(`func ${i}`)
    yield i
  }
}
async function* func2(iterator: AsyncIterable<number>) {
  for await (let i of iterator) {
;    console.log(`func2 ${i}`)
    yield i
  }
}
async function* func3(iterator: AsyncIterable<number>) {
  for await (let i of iterator) {
    console.log(`func3 ${i}`)
    yield i
  }
}

async function main() {
  await pipeline(
    func,
    func2,
    func3
  )
}

main();

Expected output:

Iteration completes 200

Actual output:

Iteration stops at 30

2

There are 2 answers

0
TouchA On BEST ANSWER

After constant debugging, I found the solution and it's actually pretty simple lol.

The solution was to remove 'yield' on the last function (which this is the PipelineDestination).

I believe what happened was that all previous data just got accumulated in the stream since nothing consumed them.

import { pipeline } from "stream/promises"

async function* func() {
  for (let i = 0; i < 200; i++) {
    console.log(`func ${i}`)
    yield i
  }
}
async function* func2(iterator: AsyncIterable<number>) {
  for await (let i of iterator) {
;    console.log(`func2 ${i}`)
    yield i
  }
}
async function* func3(iterator: AsyncIterable<number>) {
  for await (let i of iterator) {
    console.log(`func3 ${i}`)
    // yield i <---- I removed this!
  }
}

async function main() {
  await pipeline(
    func,
    func2,
    func3
  )
}

main();
0
Mohsen Mahoski On

I add a writableStream to your code and it works as you expected:

import { pipeline } from 'stream/promises';
import { Writable } from 'stream';

async function* func() {
  for (let i = 0; i < 200; i++) {
    console.log(`func ${i}`)
    yield i;
  }
}

async function* func2(iterator: AsyncIterable<number>) {
  for await (let i of iterator) {
    console.log(`func2 ${i}`);
    yield i;
  }
}
async function* func3(iterator: AsyncIterable<number>) {
  for await (let i of iterator) {
    console.log(`func3 ${i}`);
    yield i.toString();
  }
}

const writableStream = new Writable({
  write(chunk, encoding, callback) {
    callback();
  }
});

async function main() {
  try {
    await pipeline(
      func,
      func2,
      func3,
      writableStream
    );
    console.log('Pipeline finished successfully');
  } catch (e) {
    console.error('Pipeline failed', e);
  }
}

main();