Say I have a flink job processing a data flow like 1, 2, control_flag, 3... When control_flag is met, the job should be stopped with savepoint and the following messages 3... should neither be processed or dropped. When centern actions are taken outside the flink and the job is restarted from savepoint, the job should go on process the following messages. However, if the job hangs with a sleeping loop inside the process operator to prevent the following messages to be processed, it can not be stopped with savepoint using flink api. So how do I stop the job at the position of control_flag and let the job to be restarted with the position next to it?
Related Questions in APACHE-FLINK
- Fine grained resource mangement and heap memory in flink task slot
- Does parallel flink tasks affect each other if they are unioned at the end?
- I am facing issue with ParquetFileWriting n hdfs in flink where parquet file size is around 382 KB . I want the parquet file in MB
- Apache Flink (AWS) does not recognize saved temporary function
- Flink 1.19 error Cannot determine simple type name "com"
- Unsupported options found for 'hudi'
- Flink 1.18 register custom API endpoint handler
- Flink Stuck on Broadcast
- Blunder about RichCoFlatMapFunction in flink 1.17.2 according to the official leanring guide
- Is there a way to store & retrieve a window's state in flink
- puzzled with flink window state
- Flink 1.15.2 OOM issue due to RocksDB
- How to create custom metrics with labels (python SDK + Flink Runner)
- flink-rpc-akka-loader - Security Vulnerability Issues
- I am new to Apache Flink and getting error FileNotFoundError: [WinError 2] at in_streaming_mode() The system cannot find the file specified
Related Questions in SAVEPOINTS
- Is there a way to store & retrieve a window's state in flink
- Apache Flink S3 ListBucket API calls
- MySQLi deletes savepoints (and only savepoints) upon transaction error?
- Is there any chance of `ROLLBACK TO SAVEPOINT` erasing data in PostgreSQL?
- How can i recover job by savepoint with multi-job run by executeAsync in application mode (flink 1.18)
- Identify illegal commit in oracle procedure
- Flink checkpointing issue: "org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job"
- How to stop a flink job at specified position
- "Cannot map checkpoint/savepoint state for operator" when using fromChangelogStream
- SqlClient equivalent of "BEGIN TRY...END TRY"
- Is it possible to rollback only one command (not whole transaction) in SQL?
- Flink 1.14 : Why can't resotore savepoint got error Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job
- How to know if I have started transaction control in MySQL already or not?
- Flink savepoint not saving the valuestates
- In SQL, what happens if you try to rollback to a savepoint from another user?
Popular Questions
- How do I undo the most recent local commits in Git?
- How can I remove a specific item from an array in JavaScript?
- How do I delete a Git branch locally and remotely?
- Find all files containing a specific text (string) on Linux?
- How do I revert a Git repository to a previous commit?
- How do I create an HTML button that acts like a link?
- How do I check out a remote Git branch?
- How do I force "git pull" to overwrite local files?
- How do I list all files of a directory?
- How to check whether a string contains a substring in JavaScript?
- How do I redirect to another webpage?
- How can I iterate over rows in a Pandas DataFrame?
- How do I convert a String to an int in Java?
- Does Python have a string 'contains' substring method?
- How do I check if a string contains a specific word?
Popular Tags
Trending Questions
- UIImageView Frame Doesn't Reflect Constraints
- Is it possible to use adb commands to click on a view by finding its ID?
- How to create a new web character symbol recognizable by html/javascript?
- Why isn't my CSS3 animation smooth in Google Chrome (but very smooth on other browsers)?
- Heap Gives Page Fault
- Connect ffmpeg to Visual Studio 2008
- Both Object- and ValueAnimator jumps when Duration is set above API LvL 24
- How to avoid default initialization of objects in std::vector?
- second argument of the command line arguments in a format other than char** argv or char* argv[]
- How to improve efficiency of algorithm which generates next lexicographic permutation?
- Navigating to the another actvity app getting crash in android
- How to read the particular message format in android and store in sqlite database?
- Resetting inventory status after order is cancelled
- Efficiently compute powers of X in SSE/AVX
- Insert into an external database using ajax and php : POST 500 (Internal Server Error)
Some suggestions can be found here.
There are a few possible ways that it can be done, but I think since You want to keep state between the runs, the best idea would be to have an operator that :
If the flag
stop_executionis false, processes data and outputs that for the downstream operators.If the flag
stop_executionis true, it adds the data it receives to list state.If it receives the
control_flagit emits side output meaning that job should be stopped.Now it's up to You to listen to the side output, this can be either external service that reads data from Kafka and executes correct REST calls to stop given job or anything else You want.