So, I wanted to execute a process in a stream. At first, it works well with just few data/iterations but if there's more data involved, the pipeline just stops midway.
Here's what my code looks like.
Basically, for this example, I wanted to process around 200 data.
but only 30 data are processed successfully. It stops after that. No error message and I don't have any clue why.
import { pipeline } from "stream/promises"
async function* func() {
for (let i = 0; i < 200; i++) {
console.log(`func ${i}`)
yield i
}
}
async function* func2(iterator: AsyncIterable<number>) {
for await (let i of iterator) {
; console.log(`func2 ${i}`)
yield i
}
}
async function* func3(iterator: AsyncIterable<number>) {
for await (let i of iterator) {
console.log(`func3 ${i}`)
yield i
}
}
async function main() {
await pipeline(
func,
func2,
func3
)
}
main();
Expected output:
Iteration completes 200
Actual output:
Iteration stops at 30
After constant debugging, I found the solution and it's actually pretty simple lol.
The solution was to remove 'yield' on the last function (which this is the
PipelineDestination).I believe what happened was that all previous data just got accumulated in the stream since nothing consumed them.