How to merge two forks back into one stream and have it consumed in highlandjs?

115 views Asked by At

When using highlandjs to create a stream, I know I can consume it like so:

import * as high from "highland";

console.log("Single stream:");

const stream0 = high([1, 2, 3, 4, 5]);
stream0.each(console.log); // each number will be printed

Now I want to fork the stream into two, do something on both of them, and then merge two two forks back together into one stream. (Or to phrase it differently: I want to partition the stream into two, yet the result of their actions should be the same response stream.)

Yet I am unable to make it work. The example is highly contrived but showcases my problem:

import * as high from "highland";

const stream = high([1, 2, 3, 4, 5]);

const fork1 = stream.fork().filter(i => i % 2 === 0);
const fork2 = stream.fork().filter(i => i % 2 !== 0);

const result = fork1.concat(fork2);

result.each(console.log); // doesn't do anything

Then result stream is paused. I think it has to to with fork only starting after all the forks have started processing the input stream, yet I am unsure how to start it.

2

There are 2 answers

0
k0pernikus On

I managed to get it working by resuming the source-stream after defining the forks and concatination:

const stream = high([1, 2, 3, 4, 5]);

const fork1 = stream.fork().filter(i => i % 2 === 0);
const fork2 = stream.fork().filter(i => i % 2 !== 0);
const result = fork1.concat(fork2);

stream.resume(); // starts the source stream!

result.each(console.log); // doesn't do anything

Now it will print each value (out of order, yet that is to be expected):

2
4
1
3
5
0
amsross On

I would use merge for this:

const high = require('highland')

const stream = high([1, 2, 3, 4, 5])

const fork1 = stream.fork().filter(i => i % 2 === 0)
const fork2 = stream.fork().filter(i => i % 2 !== 0)

high([fork1, fork2]).merge().each(console.log) // 1 2 3 4 5