-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-38939][runtime] Pause sources until the 1st checkpoint to prioritize processing recovered records #27440
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…eived This allows to prioritize processing of recovered records (when recovering from an unaligned checkpoint)
…g for a checkpoint
| <td><h5>pipeline.sources.pause-until-first-checkpoint</h5></td> | ||
| <td style="word-wrap: break-word;">true</td> | ||
| <td>Boolean</td> | ||
| <td>Don't pull any data from sources until the first checkpoint is triggered. This might be helpful in reducing recovery times. Incompatible 0 value for execution.checkpointing.interval-during-backlog</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest expanding on "This might be helpful in reducing recovery times." to be specific about the scenarios that this would be helpful in. Are there scenarios it would not be helpful in? If there are no downsides to this change - then do we need a config option?
I am not sure what Incompatible 0 value for execution.checkpointing.interval-during-backlog means .
| && checkpointConfig.isPauseSourcesUntilFirstCheckpoint()) { | ||
| throw new IllegalArgumentException( | ||
| "Pausing sources until first checkpoint is incompatible with disabling checkpoints during backlog processing. " | ||
| + "Please consult " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "Please consult ...." -> "Please review and choose whether you require + CheckpointingOptions.PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT.key()
+ " or"
+ CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG.key());
1996fanrui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @rkhachatryan , thanks for the PR, I have several questions about this approach.
Pause sources until the 1st checkpoint to prioritize processing recovered records
Don't pull any data from sources until the first checkpoint is triggered.
If so, the source does not work even if all recovered buffers are consumed, right?
Let me understand the existing issue and the current approach:
- The task will be switched from INITIALIZATION to RUNNING once all recovered input buffers and output buffers are consumed.
- The recovered buffer of some input channels are fully consumed, and there are some new buffers is coming. The recovered buffer of rest of channels are not fully consumed.
Issue: If task starts consume new buffers before all recovered buffer are consumed, it will be switched to running later.
IIUC,, the purpose of pause source is avoid new buffers are generated. Is it correct?
If so, I do not think it works perfect since new buffers can be generated from the recovered buffers of upstream task. Of course, pause source could avoid new buffers from outside of flink during recovery.
Blocking channels whose recovered buffers are fully consumed maybe more fine-grained that pausing source, it allows task consumes recovered buffers before new buffers, as well as the upstream tasks and source are not blocked as early as possible.
Also, FLIP-547 part 4.6 will introduce fine-grained blocking mechanism. Not sure whether pausing source is still needed if new mechanism will be introduced in the near future?
Looking forward to your opinion, thanks
|
Yes @1996fanrui, you're right. The purpose of this change is to prevent new input records from delaying the switch of the downstream tasks to RUNNING. In a sense, this is a lightweight alternative to FLIP-547. |
1996fanrui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rkhachatryan for the comment!
Sounds make sense for considering this approach as a lightweight alternative first. I only left one comment, please take a look when you are available, thanks
| public static final ConfigOption<Boolean> PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT = | ||
| key("pipeline.sources.pause-until-first-checkpoint") | ||
| .booleanType() | ||
| .defaultValue(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default value should be false since it has a regression when before the first checkpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, changed to false.
No description provided.