Best Practice for deduplication using Flink SQL Query from raw_topic and create new topic with deduplicates(Confluent)

141 views Asked by At

I have a question regrading Flink SQL via Confluent. I have .NET Windows Service app that produces that data to the Kafka topic and the application runs 24 hours and it will have massive amount of duplicates since it is running by different multiple schedulers. So my logic on top of my head to deduplicate this data looks like this: Procedure to deduplicate messages.

By doing this I have couple of questions:

  1. Is that look best practice? if not, greatly appreciated if any suggestions you may have
  2. Flink SQL Query:
CREATE TABLE...WITH() 

automatically creates Kafka topic and I need to deduplicate the messages from the raw topic but how can I run the query persistently? Sorry I am new on Confluent Kafka and Flink, wonder this need to be done by user to run the deduplicate query(Row number()... Partition By()..) manually? Or is there any other options that Flink SQL script can be run constantly? 4. Confluent Flink documentation shows how to deduplicate data from the table, https://docs.confluent.io/cloud/current/flink/reference/queries/deduplication.html.

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY column1[, column2...]]
       ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1

But this doesn't delete the duplicates it just shows you the each data without duplicates, I need to eventually consume data from the clean_topic without duplicates so I am wondering in this case how I should handle it?

Any suggestion appreciated

1

There are 1 answers

3
Martijn Visser On

You'll need to send these deduplicated results to another table, where you'll consume the results from. An example deduplication query that I use is:

INSERT INTO latest_page_per_ip
SELECT ip, page_url, $rowtime
FROM (
   SELECT *, $rowtime,
     ROW_NUMBER() OVER (PARTITION BY ip
       ORDER BY $rowtime DESC) AS rownum
   FROM shoe_clickstream)
WHERE rownum = 1;

This query runs a deduplication query over my shoe_clickstream table, which maps to my input Kafka topic. It uses $rowtime which is mapped to my event time. Last but not least, I'm selecting the ip, page_url and the event time $rowtime and I'm storing those in my output table latest_page_per_ip, which is backed by my output Kafka topic. Keep in mind that this table will be a changelog stream, so you potentially have to deal with retractions.