Skip to content

Conversation

@rkhachatryan
Copy link
Contributor

No description provided.

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 19, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@rkhachatryan rkhachatryan marked this pull request as ready for review January 19, 2026 12:54
<td><h5>pipeline.sources.pause-until-first-checkpoint</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Don't pull any data from sources until the first checkpoint is triggered. This might be helpful in reducing recovery times. Incompatible 0 value for execution.checkpointing.interval-during-backlog</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest expanding on "This might be helpful in reducing recovery times." to be specific about the scenarios that this would be helpful in. Are there scenarios it would not be helpful in? If there are no downsides to this change - then do we need a config option?

I am not sure what Incompatible 0 value for execution.checkpointing.interval-during-backlog means .

&& checkpointConfig.isPauseSourcesUntilFirstCheckpoint()) {
throw new IllegalArgumentException(
"Pausing sources until first checkpoint is incompatible with disabling checkpoints during backlog processing. "
+ "Please consult "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "Please consult ...." -> "Please review and choose whether you require + CheckpointingOptions.PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT.key()
+ " or"
+ CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG.key());

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Jan 20, 2026
Copy link
Member

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @rkhachatryan , thanks for the PR, I have several questions about this approach.

Pause sources until the 1st checkpoint to prioritize processing recovered records
Don't pull any data from sources until the first checkpoint is triggered.

If so, the source does not work even if all recovered buffers are consumed, right?


Let me understand the existing issue and the current approach:

  1. The task will be switched from INITIALIZATION to RUNNING once all recovered input buffers and output buffers are consumed.
  2. The recovered buffer of some input channels are fully consumed, and there are some new buffers is coming. The recovered buffer of rest of channels are not fully consumed.

Issue: If task starts consume new buffers before all recovered buffer are consumed, it will be switched to running later.

IIUC,, the purpose of pause source is avoid new buffers are generated. Is it correct?

If so, I do not think it works perfect since new buffers can be generated from the recovered buffers of upstream task. Of course, pause source could avoid new buffers from outside of flink during recovery.

Blocking channels whose recovered buffers are fully consumed maybe more fine-grained that pausing source, it allows task consumes recovered buffers before new buffers, as well as the upstream tasks and source are not blocked as early as possible.

Also, FLIP-547 part 4.6 will introduce fine-grained blocking mechanism. Not sure whether pausing source is still needed if new mechanism will be introduced in the near future?

Looking forward to your opinion, thanks

@rkhachatryan
Copy link
Contributor Author

Yes @1996fanrui, you're right.

The purpose of this change is to prevent new input records from delaying the switch of the downstream tasks to RUNNING.
This doesn't help in every case; an example where it is useful is two JOINed sources, where one of the channels has a lot of checkpointed buffers; and the other one is fast in producing new data (from source).

In a sense, this is a lightweight alternative to FLIP-547.
To my understanding the timeline of implementing and stabilizing FLIP-547 is relatively long (PCMIIW), so this feature still makes sense.

Copy link
Member

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rkhachatryan for the comment!

Sounds make sense for considering this approach as a lightweight alternative first. I only left one comment, please take a look when you are available, thanks

public static final ConfigOption<Boolean> PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT =
key("pipeline.sources.pause-until-first-checkpoint")
.booleanType()
.defaultValue(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value should be false since it has a regression when before the first checkpoint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, changed to false.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants