How to use Iterations provided in Flink ML?

33 views Asked by At

As the DataStream API's iterativeStream method has been deprecated, it has been recommended to use Flink ML's iteration as an alternative.

Firstly, is there any simple, non-ML toy example that we can run using the new iterations support? I checked the documentation but could not find any concrete example.

Secondly, I am trying to implement the following example from the iterativeStream documentation using Flink ML's iteration (Iterations.iterateUnboundedStreams)but I'm having trouble wrapping my head around it:

DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {   @Override   public Long map(Long value) throws Exception {
    return value - 1 ;   } });

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {   @Override   public boolean filter(Long value) throws Exception {
    return (value > 0);   } });

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {   @Override   public boolean filter(Long value) throws Exception {
    return (value <= 0);   }

To construct an iteration, Users are required to provide DataStreamList initVariableStreams, DataStreamList dataStreams and IterationBody body.

I'm assuming DataStream<Long> someIntegers will be equivalent to initVariableStreams, DataStream<Long> stillGreaterThanZero to newModelUpdate and DataStream<Long> lessThanZero to modelOutput However, what do I set dataStreams to?

To boot, can this example be replicated using the new Flink ML iterations support?

1

There are 1 answers

0
K.M On

According to my understanding, dataset is the initial model that the initVariableStreams will be updating in subsequent iterations. In the "subtracts 1" example, there is no need for an initial model we can set it to any dummy stream.

Here's the above iterations example from the deprecated API converted to run on Flink ML's iterations:

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.iteration.DataStreamList;
import org.apache.flink.iteration.IterationBodyResult;
import org.apache.flink.iteration.Iterations;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class IterationsML {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Long> initParameters = env.generateSequence(0, 1000);
        DataStream<Long> dataset = env.generateSequence(0, 0); //dummyStream

        DataStreamList result = Iterations.iterateUnboundedStreams(
            DataStreamList.of(initParameters),
            DataStreamList.of(dataset),
            (variableStreams, dataStreams) -> {

                DataStream<Long> feedback = variableStreams.get(0);
                DataStream<Long> minusOne = feedback.map(new MapFunction<Long, Long>() {
                    @Override
                    public Long map(Long value) throws Exception {
                        return value - 1;
                    }
                });

                DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
                    @Override
                    public boolean filter(Long value) throws Exception {
                        return (value > 0);
                    }
                });

                DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
                    @Override
                    public boolean filter(Long value) throws Exception {
                        return (value <= 0);
                    }
                });

                return new IterationBodyResult(
                        DataStreamList.of(stillGreaterThanZero),
                        DataStreamList.of(lessThanZero));
            });
            result.<Integer>get(0).print();

        env.execute("Flink Java API Skeleton");

    }

}