diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index bc65b09a7efc5..c86fe67ec1fb8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -187,7 +186,7 @@ public class CheckpointCoordinator { private final long alignedCheckpointTimeout; /** Actor that receives status updates from the execution graph this coordinator works for. */ - private JobStatusListener jobStatusListener; + private CheckpointCoordinatorDeActivator deActivator; /** * The current periodic trigger. Used to deduplicate concurrently scheduled checkpoints if any. @@ -2146,18 +2145,24 @@ private void restoreStateToCoordinators( // job status listener that schedules / cancels periodic checkpoints // ------------------------------------------------------------------------ - public JobStatusListener createActivatorDeactivator(boolean allTasksOutputNonBlocking) { + public CheckpointCoordinatorDeActivator createActivatorDeactivator( + boolean allTasksOutputNonBlocking) { synchronized (lock) { if (shutdown) { throw new IllegalArgumentException("Checkpoint coordinator is shut down"); } - if (jobStatusListener == null) { - jobStatusListener = - new CheckpointCoordinatorDeActivator(this, allTasksOutputNonBlocking); + if (deActivator == null) { + if (allTasksOutputNonBlocking) { + // checkpoint scheduler should only be activated in allTasksOutputNonBlocking + // case (FLIP-331) + deActivator = new ExecutionTrackingCheckpointCoordinatorDeActivator(this); + } else { + deActivator = CheckpointCoordinatorDeActivator.alwaysStopping(this); + } } - return jobStatusListener; + return deActivator; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java index 830ba4cf58e56..b6aa69bf220f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java @@ -18,35 +18,23 @@ package org.apache.flink.runtime.checkpoint; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener; import org.apache.flink.runtime.executiongraph.JobStatusListener; -import static org.apache.flink.util.Preconditions.checkNotNull; - /** * This actor listens to changes in the JobStatus and activates or deactivates the periodic * checkpoint scheduler. */ -public class CheckpointCoordinatorDeActivator implements JobStatusListener { - - private final CheckpointCoordinator coordinator; - private final boolean allTasksOutputNonBlocking; +public interface CheckpointCoordinatorDeActivator + extends JobStatusListener, ExecutionStateUpdateListener { - public CheckpointCoordinatorDeActivator( - CheckpointCoordinator coordinator, boolean allTasksOutputNonBlocking) { - this.coordinator = checkNotNull(coordinator); - this.allTasksOutputNonBlocking = allTasksOutputNonBlocking; + static CheckpointCoordinatorDeActivator alwaysStopping(CheckpointCoordinator coordinator) { + return (jobId, newJobStatus, timestamp) -> coordinator.stopCheckpointScheduler(); } @Override - public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) { - if (newJobStatus == JobStatus.RUNNING && allTasksOutputNonBlocking) { - // start the checkpoint scheduler if there is no blocking edge - coordinator.startCheckpointScheduler(); - } else { - // anything else should stop the trigger for now - coordinator.stopCheckpointScheduler(); - } - } + default void onStateUpdate( + ExecutionAttemptID execution, ExecutionState previousState, ExecutionState newState) {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ExecutionTrackingCheckpointCoordinatorDeActivator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ExecutionTrackingCheckpointCoordinatorDeActivator.java new file mode 100644 index 0000000000000..dfac2761ad51e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ExecutionTrackingCheckpointCoordinatorDeActivator.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link CheckpointCoordinatorDeActivator} that tracks {@link Execution} states in addition to + * {@link JobStatus} to avoid triggering checkpoints unnecessarily. + */ +public class ExecutionTrackingCheckpointCoordinatorDeActivator + implements CheckpointCoordinatorDeActivator { + private static final Logger LOG = + LoggerFactory.getLogger(ExecutionTrackingCheckpointCoordinatorDeActivator.class); + + private final CheckpointCoordinator coordinator; + private final Set pendingExecutions; + @Nullable private JobStatus jobStatus; + private boolean started; + + public ExecutionTrackingCheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) { + this.coordinator = checkNotNull(coordinator); + this.pendingExecutions = new HashSet<>(); + this.started = false; + } + + @Override + public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) { + this.jobStatus = newJobStatus; + maybeStartOrStopCheckpointScheduler(); + } + + @Override + public void onStateUpdate( + ExecutionAttemptID execution, ExecutionState previousState, ExecutionState newState) { + LOG.debug( + "Received state update for execution {} from {} to {}", + execution, + previousState, + newState); + if (newState == ExecutionState.RUNNING || newState == ExecutionState.FINISHED) { + pendingExecutions.remove(execution); + } else { + pendingExecutions.add(execution); + } + maybeStartOrStopCheckpointScheduler(); + } + + private void maybeStartOrStopCheckpointScheduler() { + if (jobStatus == JobStatus.RUNNING && allTasksRunning()) { + if (!started) { + LOG.info("Starting checkpoint scheduler"); + started = true; + coordinator.startCheckpointScheduler(); + } else { + LOG.info( + "Checkpoint scheduler is already started, ignoring status change (some task has finished?)"); + } + } else if (started) { + LOG.info("Stopping checkpoint scheduler, current job status: {}", jobStatus); + started = false; + coordinator.stopCheckpointScheduler(); + } else { + LOG.debug( + "Not Starting checkpoint scheduler, job status: {}, pending executions: {}", + jobStatus, + pendingExecutions); + } + } + + private boolean allTasksRunning() { + return pendingExecutions.isEmpty(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java index 1f6fa2420f836..587abac9e67bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator; import org.apache.flink.runtime.checkpoint.CheckpointFailureManager; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator; @@ -288,7 +289,7 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG private final ShuffleMaster shuffleMaster; private final ExecutionDeploymentListener executionDeploymentListener; - private final ExecutionStateUpdateListener executionStateUpdateListener; + private final List executionStateUpdateListeners; private final EdgeManager edgeManager; @@ -386,7 +387,8 @@ public DefaultExecutionGraph( this::createResultPartitionId, partitionTracker); this.executionDeploymentListener = executionDeploymentListener; - this.executionStateUpdateListener = executionStateUpdateListener; + this.executionStateUpdateListeners = new ArrayList<>(); + this.executionStateUpdateListeners.add(executionStateUpdateListener); this.initialAttemptCounts = initialAttemptCounts; @@ -549,8 +551,10 @@ public void failJobDueToTaskFailure( boolean allTasksOutputNonBlocking = tasks.values().stream() .noneMatch(vertex -> vertex.getJobVertex().isAnyOutputBlocking()); - registerJobStatusListener( - checkpointCoordinator.createActivatorDeactivator(allTasksOutputNonBlocking)); + CheckpointCoordinatorDeActivator activatorDeactivator = + checkpointCoordinator.createActivatorDeactivator(allTasksOutputNonBlocking); + registerJobStatusListener(activatorDeactivator); + registerExecutionStateUpdateListener(activatorDeactivator); } this.stateBackendName = checkpointStateBackend.getName(); @@ -1657,6 +1661,12 @@ public void registerJobStatusListener(JobStatusListener listener) { } } + public void registerExecutionStateUpdateListener(ExecutionStateUpdateListener listener) { + if (listener != null) { + executionStateUpdateListeners.add(listener); + } + } + private void notifyJobStatusChange( JobStatus oldState, JobStatus newState, @Nullable Throwable cause) { if (jobStatusListeners.size() > 0) { @@ -1710,8 +1720,8 @@ public void notifyExecutionChange( final Execution execution, ExecutionState previousState, final ExecutionState newExecutionState) { - executionStateUpdateListener.onStateUpdate( - execution.getAttemptId(), previousState, newExecutionState); + executionStateUpdateListeners.forEach( + l -> l.onStateUpdate(execution.getAttemptId(), previousState, newExecutionState)); } private void assertRunningInJobMasterMainThread() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionTrackingCheckpointCoordinatorDeActivatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionTrackingCheckpointCoordinatorDeActivatorTest.java new file mode 100644 index 0000000000000..70fb7a1525c9e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionTrackingCheckpointCoordinatorDeActivatorTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl; +import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo; +import org.apache.flink.runtime.testtasks.NoOpInvokable; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** {@link ExecutionTrackingCheckpointCoordinatorDeActivator} test. */ +class ExecutionTrackingCheckpointCoordinatorDeActivatorTest { + + private ManuallyTriggeredScheduledExecutorService executor; + private CheckpointCoordinator checkpointCoordinator; + private ExecutionTrackingCheckpointCoordinatorDeActivator activator; + private ExecutionAttemptID execution1; + private ExecutionAttemptID execution2; + private JobID jobId; + + @BeforeEach + void init() throws Exception { + executor = new ManuallyTriggeredScheduledExecutorService(); + checkpointCoordinator = + new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder() + .build(executor); + activator = new ExecutionTrackingCheckpointCoordinatorDeActivator(checkpointCoordinator); + execution1 = createExecution(executor).getAttemptId(); + execution2 = createExecution(executor).getAttemptId(); + jobId = new JobID(); + assertFalse(checkpointCoordinator.isPeriodicCheckpointingStarted()); + } + + @AfterEach + void destroy() throws Exception { + checkpointCoordinator.shutdown(); + executor.shutdownNow(); + } + + @Test + void testBasicLifecycle() { + activator.onStateUpdate(execution1, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING); + activator.onStateUpdate(execution2, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING); + assertFalse(checkpointCoordinator.isPeriodicCheckpointingStarted()); + + activator.jobStatusChanges(jobId, JobStatus.RUNNING, 0); + activator.onStateUpdate(execution1, ExecutionState.DEPLOYING, ExecutionState.RUNNING); + activator.onStateUpdate(execution2, ExecutionState.DEPLOYING, ExecutionState.RUNNING); + assertTrue(checkpointCoordinator.isPeriodicCheckpointingStarted()); + + activator.jobStatusChanges(jobId, JobStatus.FINISHED, 0); + assertFalse(checkpointCoordinator.isPeriodicCheckpointingStarted()); + } + + @Test + void testTaskFailure() { + activator.onStateUpdate(execution1, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING); + activator.onStateUpdate(execution2, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING); + assertFalse(checkpointCoordinator.isPeriodicCheckpointingStarted()); + + activator.jobStatusChanges(jobId, JobStatus.RUNNING, 0); + activator.onStateUpdate(execution1, ExecutionState.DEPLOYING, ExecutionState.RUNNING); + activator.onStateUpdate(execution2, ExecutionState.DEPLOYING, ExecutionState.RUNNING); + assertTrue(checkpointCoordinator.isPeriodicCheckpointingStarted()); + + activator.onStateUpdate(execution1, ExecutionState.RUNNING, ExecutionState.FAILED); + assertFalse(checkpointCoordinator.isPeriodicCheckpointingStarted()); + } + + @Test + void testJobRestart() { + activator.onStateUpdate(execution1, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING); + activator.onStateUpdate(execution2, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING); + assertFalse(checkpointCoordinator.isPeriodicCheckpointingStarted()); + + activator.jobStatusChanges(jobId, JobStatus.RUNNING, 0); + activator.onStateUpdate(execution1, ExecutionState.DEPLOYING, ExecutionState.RUNNING); + activator.onStateUpdate(execution2, ExecutionState.DEPLOYING, ExecutionState.RUNNING); + assertTrue(checkpointCoordinator.isPeriodicCheckpointingStarted()); + + for (JobStatus jobStatus : + new JobStatus[] {JobStatus.FAILED, JobStatus.RESTARTING, JobStatus.INITIALIZING}) { + activator.jobStatusChanges(jobId, jobStatus, 0); + assertFalse(checkpointCoordinator.isPeriodicCheckpointingStarted()); + } + activator.jobStatusChanges(jobId, JobStatus.RUNNING, 0); + assertTrue(checkpointCoordinator.isPeriodicCheckpointingStarted()); + } + + @Test + void testJobFinish() { + assertFalse(checkpointCoordinator.isPeriodicCheckpointingStarted()); + activator.jobStatusChanges(jobId, JobStatus.RUNNING, 0); + assertTrue(checkpointCoordinator.isPeriodicCheckpointingStarted()); + activator.jobStatusChanges(jobId, JobStatus.FINISHED, 0); + assertFalse(checkpointCoordinator.isPeriodicCheckpointingStarted()); + } + + private static Execution createExecution(ManuallyTriggeredScheduledExecutorService executor) + throws JobException, JobExecutionException { + DefaultExecutionGraph eg = TestingDefaultExecutionGraphBuilder.newBuilder().build(executor); + JobVertex jv = ExecutionGraphTestUtils.createJobVertex("task1", 1, NoOpInvokable.class); + ExecutionJobVertex ejv = + new ExecutionJobVertex( + eg, + jv, + new DefaultVertexParallelismInfo(1, 1, ignored -> Optional.empty()), + new CoordinatorStoreImpl(), + UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup()); + + ExecutionVertex evv = + new ExecutionVertex( + ejv, 0, new IntermediateResult[0], Duration.ofSeconds(3), 0, 0, 0); + return new Execution(executor, evv, 0, 0, Duration.ofSeconds(3)); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointStatsTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointStatsTracker.java new file mode 100644 index 0000000000000..9f42c5b159eb2 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointStatsTracker.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +/** Testing implementation of {@link CheckpointStatsTracker}. */ +public class TestingCheckpointStatsTracker implements CheckpointStatsTracker { + public final AtomicInteger numFailedCheckpoints = new AtomicInteger(); + public final AtomicInteger numPendingCheckpoints = new AtomicInteger(); + private static final PendingCheckpointStats PENDING_CHECKPOINT_STATS = + new PendingCheckpointStats( + 0L, + 0L, + CheckpointProperties.forCheckpoint( + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + Collections.singletonMap(new JobVertexID(), 1)); + + @Override + public void reportFailedCheckpointsWithoutInProgress() { + numFailedCheckpoints.incrementAndGet(); + } + + @Override + public CheckpointStatsSnapshot createSnapshot() { + return new CheckpointStatsSnapshot( + new CheckpointStatsCounts(), + CompletedCheckpointStatsSummarySnapshot.empty(), + new CheckpointStatsHistory(10), + null); + } + + @Override + public void reportFailedCheckpoint(FailedCheckpointStats failed) { + numFailedCheckpoints.incrementAndGet(); + } + + @Override + public void reportRestoredCheckpoint( + long checkpointID, + CheckpointProperties properties, + String externalPath, + long stateSize) {} + + @Override + public void reportCompletedCheckpoint(CompletedCheckpointStats completed) {} + + @Override + public PendingCheckpointStats getPendingCheckpointStats(long checkpointId) { + return PENDING_CHECKPOINT_STATS; + } + + @Override + public void reportIncompleteStats( + long checkpointId, ExecutionAttemptID attemptId, CheckpointMetrics metrics) {} + + @Override + public void reportInitializationStarted( + Set toInitialize, long initializationStartTs) {} + + @Override + public void reportInitializationMetrics( + ExecutionAttemptID executionAttemptId, + SubTaskInitializationMetrics initializationMetrics) {} + + @Override + public PendingCheckpointStats reportPendingCheckpoint( + long checkpointId, + long triggerTimestamp, + CheckpointProperties props, + Map vertexToDop) { + numPendingCheckpoints.incrementAndGet(); + return PENDING_CHECKPOINT_STATS; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTestUtils.java index e6a58a540bacf..6e5688327ad72 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTestUtils.java @@ -21,6 +21,7 @@ import org.apache.flink.runtime.testtasks.NoOpInvokable; import java.util.Arrays; +import java.util.function.Function; /** Utilities for creating {@link JobGraph JobGraphs} for testing purposes. */ public class JobGraphTestUtils { @@ -38,8 +39,15 @@ public static JobGraph singleNoOpJobGraph() { } public static JobGraph streamingJobGraph(JobVertex... jobVertices) { - return JobGraphBuilder.newStreamingJobGraphBuilder() - .addJobVertices(Arrays.asList(jobVertices)) + return streamingJobGraph(Function.identity(), jobVertices); + } + + public static JobGraph streamingJobGraph( + Function transformJgb, JobVertex... jobVertices) { + return transformJgb + .apply( + JobGraphBuilder.newStreamingJobGraphBuilder() + .addJobVertices(Arrays.asList(jobVertices))) .build(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 290a064487201..d5c312c4dfe07 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -42,6 +42,7 @@ import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.TestingCheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -970,6 +971,86 @@ void testJobStatusListenerNotifiedOfJobStatusChanges() throws Exception { assertThat(unexpectedJobStatusNotification.isDone()).isFalse(); } + @Test + void testCheckpointsStartAfterTaskRunning() throws Exception { + int checkpointInterval = 10; + final JobGraph jobGraph = + streamingJobGraph( + jg -> + jg.setJobCheckpointingSettings( + new JobCheckpointingSettings( + CheckpointCoordinatorConfiguration.builder() + .setCheckpointInterval(checkpointInterval) + .setCheckpointTimeout(Long.MAX_VALUE) + .build(), + null)), + createNoOpVertex(1)); + + final DefaultDeclarativeSlotPool declarativeSlotPool = + createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor); + + final Configuration configuration = new Configuration(); + configuration.set( + JobManagerOptions.SCHEDULER_SUBMISSION_RESOURCE_WAIT_TIMEOUT, + Duration.ofMillis(1L)); + + TestingCheckpointStatsTracker checkpointStatsTracker = new TestingCheckpointStatsTracker(); + CompletableFuture jobRunningFuture = new CompletableFuture<>(); + scheduler = + new AdaptiveSchedulerBuilder( + jobGraph, + singleThreadMainThreadExecutor, + EXECUTOR_RESOURCE.getExecutor()) + .setJobMasterConfiguration(configuration) + .setDeclarativeSlotPool(declarativeSlotPool) + .setCheckpointStatsTrackerFactory((ign0, ign1) -> checkpointStatsTracker) + .setJobStatusListener( + (jobId, newJobStatus, timestamp) -> { + if (newJobStatus == JobStatus.RUNNING) { + jobRunningFuture.complete(null); + } else if (newJobStatus == JobStatus.FAILED) { + jobRunningFuture.completeExceptionally( + new RuntimeException("job has failed")); + } + }) + .build(); + + final SubmissionBufferingTaskManagerGateway taskManagerGateway = + new SubmissionBufferingTaskManagerGateway(1); + runInMainThread( + () -> { + scheduler.startScheduling(); + offerSlots( + declarativeSlotPool, + createSlotOffersForResourceRequirements( + ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)), + taskManagerGateway); + }); + jobRunningFuture.join(); + ExecutionAttemptID task = taskManagerGateway.submittedTasks.take().getExecutionAttemptId(); + runInMainThread( + () -> + scheduler.updateTaskExecutionState( + new TaskExecutionState(task, ExecutionState.INITIALIZING))); + // allow some time to trigger checkpoint (and fail because not all tasks are running) + Thread.sleep(checkpointInterval * 10); + assertThat(checkpointStatsTracker.numPendingCheckpoints.get()) + .describedAs("checkpoints shouldn't be triggered until all tasks are running") + .isEqualTo(0); + assertThat(checkpointStatsTracker.numFailedCheckpoints.get()) + .describedAs( + "checkpoints shouldn't be triggered and fail until all tasks are running") + .isEqualTo(0); + // allow triggering checkpoints + runInMainThread( + () -> + scheduler.updateTaskExecutionState( + new TaskExecutionState(task, ExecutionState.RUNNING))); + while (checkpointStatsTracker.numPendingCheckpoints.get() == 0) { + Thread.sleep(50); + } + } + @Test void testCloseShutsDownCheckpointingComponents() throws Exception { final CompletableFuture completedCheckpointStoreShutdownFuture =