Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
Expand Down Expand Up @@ -187,7 +186,7 @@ public class CheckpointCoordinator {
private final long alignedCheckpointTimeout;

/** Actor that receives status updates from the execution graph this coordinator works for. */
private JobStatusListener jobStatusListener;
private CheckpointCoordinatorDeActivator deActivator;

/**
* The current periodic trigger. Used to deduplicate concurrently scheduled checkpoints if any.
Expand Down Expand Up @@ -2146,18 +2145,24 @@ private void restoreStateToCoordinators(
// job status listener that schedules / cancels periodic checkpoints
// ------------------------------------------------------------------------

public JobStatusListener createActivatorDeactivator(boolean allTasksOutputNonBlocking) {
public CheckpointCoordinatorDeActivator createActivatorDeactivator(
boolean allTasksOutputNonBlocking) {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}

if (jobStatusListener == null) {
jobStatusListener =
new CheckpointCoordinatorDeActivator(this, allTasksOutputNonBlocking);
if (deActivator == null) {
if (allTasksOutputNonBlocking) {
// checkpoint scheduler should only be activated in allTasksOutputNonBlocking
// case (FLIP-331)
deActivator = new ExecutionTrackingCheckpointCoordinatorDeActivator(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a comment to indicate why this if body results in a pause.

} else {
deActivator = CheckpointCoordinatorDeActivator.alwaysStopping(this);
}
}

return jobStatusListener;
return deActivator;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,23 @@

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
import org.apache.flink.runtime.executiongraph.JobStatusListener;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* This actor listens to changes in the JobStatus and activates or deactivates the periodic
* checkpoint scheduler.
*/
public class CheckpointCoordinatorDeActivator implements JobStatusListener {

private final CheckpointCoordinator coordinator;
private final boolean allTasksOutputNonBlocking;
public interface CheckpointCoordinatorDeActivator
extends JobStatusListener, ExecutionStateUpdateListener {

public CheckpointCoordinatorDeActivator(
CheckpointCoordinator coordinator, boolean allTasksOutputNonBlocking) {
this.coordinator = checkNotNull(coordinator);
this.allTasksOutputNonBlocking = allTasksOutputNonBlocking;
static CheckpointCoordinatorDeActivator alwaysStopping(CheckpointCoordinator coordinator) {
return (jobId, newJobStatus, timestamp) -> coordinator.stopCheckpointScheduler();
}

@Override
public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) {
if (newJobStatus == JobStatus.RUNNING && allTasksOutputNonBlocking) {
// start the checkpoint scheduler if there is no blocking edge
coordinator.startCheckpointScheduler();
} else {
// anything else should stop the trigger for now
coordinator.stopCheckpointScheduler();
}
}
default void onStateUpdate(
ExecutionAttemptID execution, ExecutionState previousState, ExecutionState newState) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.HashSet;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* {@link CheckpointCoordinatorDeActivator} that tracks {@link Execution} states in addition to
* {@link JobStatus} to avoid triggering checkpoints unnecessarily.
*/
public class ExecutionTrackingCheckpointCoordinatorDeActivator
implements CheckpointCoordinatorDeActivator {
private static final Logger LOG =
LoggerFactory.getLogger(ExecutionTrackingCheckpointCoordinatorDeActivator.class);

private final CheckpointCoordinator coordinator;
private final Set<ExecutionAttemptID> pendingExecutions;
@Nullable private JobStatus jobStatus;
private boolean started;

public ExecutionTrackingCheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
this.coordinator = checkNotNull(coordinator);
this.pendingExecutions = new HashSet<>();
this.started = false;
}

@Override
public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) {
this.jobStatus = newJobStatus;
maybeStartOrStopCheckpointScheduler();
}

@Override
public void onStateUpdate(
ExecutionAttemptID execution, ExecutionState previousState, ExecutionState newState) {
LOG.debug(
"Received state update for execution {} from {} to {}",
execution,
previousState,
newState);
if (newState == ExecutionState.RUNNING || newState == ExecutionState.FINISHED) {
pendingExecutions.remove(execution);
} else {
pendingExecutions.add(execution);
}
maybeStartOrStopCheckpointScheduler();
}

private void maybeStartOrStopCheckpointScheduler() {
if (jobStatus == JobStatus.RUNNING && allTasksRunning()) {
if (!started) {
LOG.info("Starting checkpoint scheduler");
started = true;
coordinator.startCheckpointScheduler();
} else {
LOG.info(
"Checkpoint scheduler is already started, ignoring status change (some task has finished?)");
}
} else if (started) {
LOG.info("Stopping checkpoint scheduler, current job status: {}", jobStatus);
started = false;
coordinator.stopCheckpointScheduler();
} else {
LOG.debug(
"Not Starting checkpoint scheduler, job status: {}, pending executions: {}",
jobStatus,
pendingExecutions);
}
}

private boolean allTasksRunning() {
return pendingExecutions.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator;
import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator;
Expand Down Expand Up @@ -288,7 +289,7 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
private final ShuffleMaster<?> shuffleMaster;

private final ExecutionDeploymentListener executionDeploymentListener;
private final ExecutionStateUpdateListener executionStateUpdateListener;
private final List<ExecutionStateUpdateListener> executionStateUpdateListeners;

private final EdgeManager edgeManager;

Expand Down Expand Up @@ -386,7 +387,8 @@ public DefaultExecutionGraph(
this::createResultPartitionId, partitionTracker);

this.executionDeploymentListener = executionDeploymentListener;
this.executionStateUpdateListener = executionStateUpdateListener;
this.executionStateUpdateListeners = new ArrayList<>();
this.executionStateUpdateListeners.add(executionStateUpdateListener);

this.initialAttemptCounts = initialAttemptCounts;

Expand Down Expand Up @@ -549,8 +551,10 @@ public void failJobDueToTaskFailure(
boolean allTasksOutputNonBlocking =
tasks.values().stream()
.noneMatch(vertex -> vertex.getJobVertex().isAnyOutputBlocking());
registerJobStatusListener(
checkpointCoordinator.createActivatorDeactivator(allTasksOutputNonBlocking));
CheckpointCoordinatorDeActivator activatorDeactivator =
checkpointCoordinator.createActivatorDeactivator(allTasksOutputNonBlocking);
registerJobStatusListener(activatorDeactivator);
registerExecutionStateUpdateListener(activatorDeactivator);
}

this.stateBackendName = checkpointStateBackend.getName();
Expand Down Expand Up @@ -1657,6 +1661,12 @@ public void registerJobStatusListener(JobStatusListener listener) {
}
}

public void registerExecutionStateUpdateListener(ExecutionStateUpdateListener listener) {
if (listener != null) {
executionStateUpdateListeners.add(listener);
}
}

private void notifyJobStatusChange(
JobStatus oldState, JobStatus newState, @Nullable Throwable cause) {
if (jobStatusListeners.size() > 0) {
Expand Down Expand Up @@ -1710,8 +1720,8 @@ public void notifyExecutionChange(
final Execution execution,
ExecutionState previousState,
final ExecutionState newExecutionState) {
executionStateUpdateListener.onStateUpdate(
execution.getAttemptId(), previousState, newExecutionState);
executionStateUpdateListeners.forEach(
l -> l.onStateUpdate(execution.getAttemptId(), previousState, newExecutionState));
}

private void assertRunningInJobMasterMainThread() {
Expand Down
Loading