As the DataStream API's iterativeStream method has been deprecated, it has been recommended to use Flink ML's iteration as an alternative.
Firstly, is there any simple, non-ML toy example that we can run using the new iterations support? I checked the documentation but could not find any concrete example.
Secondly, I am trying to implement the following example from the iterativeStream documentation using Flink ML's iteration (Iterations.iterateUnboundedStreams)but I'm having trouble wrapping my head around it:
DataStream<Long> someIntegers = env.generateSequence(0, 1000);
IterativeStream<Long> iteration = someIntegers.iterate();
DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception {
return value - 1 ; } });
DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception {
return (value > 0); } });
iteration.closeWith(stillGreaterThanZero);
DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception {
return (value <= 0); }
To construct an iteration, Users are required to provide DataStreamList initVariableStreams, DataStreamList dataStreams and IterationBody body.
I'm assuming DataStream<Long> someIntegers will be equivalent to initVariableStreams, DataStream<Long> stillGreaterThanZero to newModelUpdate and DataStream<Long> lessThanZero to modelOutput
However, what do I set dataStreams to?
To boot, can this example be replicated using the new Flink ML iterations support?
According to my understanding,
datasetis the initial model that theinitVariableStreamswill be updating in subsequent iterations. In the "subtracts 1" example, there is no need for an initial model we can set it to any dummy stream.Here's the above iterations example from the deprecated API converted to run on Flink ML's iterations: