Flink - How to perform an aggregate function on all records resulting from JOIN and EXPLODE operations on the original record in streaming mode?

35 views Asked by At

I'm working with data coming from kafka in real-time. As a result of processing a record from one line, several lines are formed (due to the JOIN and EXPLODE operations). Then I need to select among all these rows only one, with the highest value of a certain field.

The question is how to collect all these rows obtained from one source record into a window and apply an aggregate function to them. In Spark Structured Streaming I managed to solve a similar problem using foreachBatch, but I'm stuck in Flink.

The closest to the desired result was obtained using Top-N, but it is not suitable for append-mode and I'm not entirely sure how to store it in the target storage. Am I even going in the right direction?

I'm new to Flink, all answers are appreciated.


Update:

OK, I tried using session windows with an aggregate function to find the maximum:

DataStream<Row> joined = tableEnv.toDataStream(tableEnv.from("joined_msip"));


DataStream<Row> filtered = joined
                .keyBy((KeySelector<Row, Object>) value -> value.getField("unique_id"))
                .window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(1000)))
                .aggregate(new MaxStartTimeAggregate());

And it works, but I am forced to set a session gap, and this results in a loss of time compared to just collecting all the records spawned by the original record.

Well, this does not look like the best solution, so the question remains.

0

There are 0 answers