-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-38940] Start checkpointing when all tasks are running and job is running #27441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| <td><h5>execution.checkpointing.pause-checkpoints-if-tasks-not-running</h5></td> | ||
| <td style="word-wrap: break-word;">false</td> | ||
| <td>Boolean</td> | ||
| <td>When enabled, checkpoints will be paused as long as some tasks are not running (and not finished), even if the job status is running. This help to prevent an additional delay on job startup</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am struggling with these sentences.
- nit it would be simpler to say if any tasks rather than as long as some tasks
- What does it mean to say a task is not running, but has a status of running? I think we need to talk about non blocking output, maybe more words outside of the config definition to say how to recognise non blocking jobs - as this relates to the subset of jobs we are effecting with this option.
- nit: This help -> This helps
- the text says "prevent an additional delay on job startup". But the logic does not seem startup specific? Can we get into any other non startup related scenarios where this might be helpful
- Can we guide the user as to when they should consider using this option and any downside to using it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed this PR with @1996fanrui offline and agreed to remove the option because the new behavior is the only desirable (until FLIP-457). After FLIP-457, this logic will be changed.
| this.coordinator = checkNotNull(coordinator); | ||
| this.allTasksOutputNonBlocking = allTasksOutputNonBlocking; | ||
| return (jobId, newJobStatus, timestamp) -> { | ||
| if (newJobStatus == JobStatus.RUNNING && allTasksOutputNonBlocking) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I an curious - should we be checking the new config option here? I thought this is the condition we wanted to pause checkpointing for? Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check is in CheckpointCoordinator.createActivatorDeactivator, but as mentioned above, I'm going to remove this option.
| new CheckpointCoordinatorDeActivator(this, allTasksOutputNonBlocking); | ||
| if (deActivator == null) { | ||
| if (pauseCheckpointsIfTasksNotRunning && allTasksOutputNonBlocking) { | ||
| deActivator = new ExecutionTrackingCheckpointCoordinatorDeActivator(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a comment to indicate why this if body results in a pause.
2012346 to
bb34b02
Compare
| static CheckpointCoordinatorDeActivator forJobStatus( | ||
| CheckpointCoordinator coordinator, boolean allTasksOutputNonBlocking) { | ||
| this.coordinator = checkNotNull(coordinator); | ||
| this.allTasksOutputNonBlocking = allTasksOutputNonBlocking; | ||
| return (jobId, newJobStatus, timestamp) -> { | ||
| if (newJobStatus == JobStatus.RUNNING && allTasksOutputNonBlocking) { | ||
| // start the checkpoint scheduler if there is no blocking edge | ||
| coordinator.startCheckpointScheduler(); | ||
| } else { | ||
| // anything else should stop the trigger for now | ||
| coordinator.stopCheckpointScheduler(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forJobStatus is only called once, and allTasksOutputNonBlocking is false. so forJobStatus can be simplified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I've simplified the code, PTAL.
No description provided.