Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,19 @@ public final class ExecutorConfig {
+ "executor will at most allow 10 partitions move out of a broker and 10 partitions move into a broker at any "
+ "given point. This is to avoid overwhelming the cluster by inter-broker partition movements.";

/**
* <code>num.concurrent.empty.partition.movements.per.broker</code>
*/
public static final String NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER_CONFIG =
"num.concurrent.empty.partition.movements.per.broker";
public static final int DEFAULT_NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER = 100;
public static final String NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER_DOC = "The maximum number of empty "
+ "(zero-byte) partitions the executor will move to or out of a broker at the same time. Since empty partition "
+ "moves transfer no data and create no I/O pressure, a much higher concurrency than "
+ NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG + " is safe. This limit is applied on top of (not instead "
+ "of) the normal partition movement concurrency -- zero-byte moves that fit within the normal limit use normal "
+ "slots first, and only overflow into this extended limit.";

/**
* <code>num.concurrent.intra.broker.partition.movements</code>
*/
Expand Down Expand Up @@ -706,6 +719,12 @@ public static ConfigDef define(ConfigDef configDef) {
atLeast(1),
ConfigDef.Importance.MEDIUM,
NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_DOC)
.define(NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER_CONFIG,
ConfigDef.Type.INT,
DEFAULT_NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER,
atLeast(1),
ConfigDef.Importance.MEDIUM,
NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER_DOC)
.define(NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG,
ConfigDef.Type.INT,
DEFAULT_NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public synchronized List<ExecutionTask> getInterBrokerReplicaMovementTasks() {
ConcurrencyType.INTER_BROKER_REPLICA);
return _executionTaskPlanner.getInterBrokerReplicaMovementTasks(
brokersReadyForReplicaMovement, _inProgressPartitionsForInterBrokerMovement,
_executionConcurrencyManager.maxClusterInterBrokerPartitionMovements());
_executionConcurrencyManager.maxClusterInterBrokerPartitionMovements(),
_executionConcurrencyManager.emptyPartitionMovementConcurrency());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,16 +343,26 @@ public List<ExecutionTask> getLeadershipMovementTasks(ExecutionConcurrencyManage
* controller does not allow updating the ongoing replica reassignment for a partition
* whose replica is being reassigned.
* @param maxInterBrokerPartitionMovements Maximum cap for number of partitions to move at any time
* @param maxEmptyPartitionMovementsPerBroker Maximum concurrent empty (zero-byte) partition moves per broker,
* used as an extended limit beyond the normal readyBrokers slots.
* @return A list of movements that is executable for the ready brokers.
*/
public List<ExecutionTask> getInterBrokerReplicaMovementTasks(Map<Integer, Integer> readyBrokers,
Set<TopicPartition> inProgressPartitions,
int maxInterBrokerPartitionMovements) {
int maxInterBrokerPartitionMovements,
int maxEmptyPartitionMovementsPerBroker) {
LOG.trace("Getting inter-broker replica movement tasks for brokers with concurrency {}", readyBrokers);
List<ExecutionTask> executableReplicaMovements = new ArrayList<>();
SortedSet<Integer> interPartMoveBrokerIds = new TreeSet<>(_interPartMoveBrokerComparator);
List<Integer> interPartMoveBrokerIdsList = new ArrayList<>(_interPartMoveTasksByBrokerId.keySet().size());

// Separate slot tracking for zero-byte (empty) partition moves. These slots are used only after
// normal readyBrokers slots are exhausted, allowing empty partition moves to proceed at higher concurrency.
Map<Integer, Integer> readyBrokersEmptyPartitions = new HashMap<>();
for (Map.Entry<Integer, Integer> entry : readyBrokers.entrySet()) {
readyBrokersEmptyPartitions.put(entry.getKey(), maxEmptyPartitionMovementsPerBroker);
}

/*
* The algorithm avoids unfair situation where the available movement slots of a broker is completely taken
* by another broker. It checks the proposals in a round-robin manner that makes sure each ready broker gets
Expand Down Expand Up @@ -403,33 +413,39 @@ public List<ExecutionTask> getInterBrokerReplicaMovementTasks(Map<Integer, Integ
continue;
}
TopicPartition tp = task.proposal().topicPartition();
// Check if the proposal is executable.
if (isExecutableProposal(task.proposal(), readyBrokers)
&& !inProgressPartitions.contains(tp)
&& !partitionsInvolved.contains(tp)) {
if (inProgressPartitions.contains(tp) || partitionsInvolved.contains(tp)) {
continue;
}

boolean isZeroByte = task.proposal().dataToMoveInMB() == 0;
// Try normal slots first; for zero-byte tasks that don't fit normal slots, try the extended limit.
boolean executable = isExecutableProposal(task.proposal(), readyBrokers);
boolean useEmptySlots = false;
if (!executable && isZeroByte) {
executable = isExecutableProposal(task.proposal(), readyBrokersEmptyPartitions);
useEmptySlots = executable;
}

if (executable) {
partitionsInvolved.add(tp);
executableReplicaMovements.add(task);
// Record the brokers as involved in this round and stop involving them again in this round.
brokerInvolved.add(sourceBroker);
brokerInvolved.addAll(destinationBrokers);
// The first task of each involved broker might have changed.
// Let's remove the brokers before the tasks change, then add them again later by comparing their new first tasks.
interPartMoveBrokerIds.remove(sourceBroker);
interPartMoveBrokerIds.removeAll(destinationBrokers);
// Remove the proposal from the execution plan.
removeInterBrokerReplicaActionForExecution(task);
interPartMoveBrokerIds.add(sourceBroker);
interPartMoveBrokerIds.addAll(destinationBrokers);
// Decrement the slots for both source and destination brokers
readyBrokers.put(sourceBroker, readyBrokers.get(sourceBroker) - 1);

Map<Integer, Integer> slotsToDecrement = useEmptySlots ? readyBrokersEmptyPartitions : readyBrokers;
slotsToDecrement.put(sourceBroker, slotsToDecrement.get(sourceBroker) - 1);
for (int broker : destinationBrokers) {
readyBrokers.put(broker, readyBrokers.get(broker) - 1);
slotsToDecrement.put(broker, slotsToDecrement.get(broker) - 1);
}
// Mark proposal added to true so we will have another round of check.
newTaskAdded = true;
numInProgressPartitions++;
LOG.debug("Found ready task {} for broker {}. Broker concurrency state: {}", task, brokerId, readyBrokers);
// We can stop the check for proposals for this broker because we have found a proposal.
LOG.debug("Found ready task {} for broker {}. Broker concurrency state: {}, empty partition state: {}",
task, brokerId, readyBrokers, readyBrokersEmptyPartitions);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class Executor {
private static final Logger LOG = LoggerFactory.getLogger(Executor.class);
private static final Logger OPERATION_LOG = LoggerFactory.getLogger(OPERATION_LOGGER);
private static final long EXECUTION_PROGRESS_CHECK_INTERVAL_ADJUSTING_MS = 1000;
static final long ZERO_BYTE_MOVE_INITIAL_CHECK_DELAY_MS = 1000;
// The execution progress is controlled by the ExecutionTaskManager.
private final ExecutionTaskManager _executionTaskManager;
private final MetadataClient _metadataClient;
Expand Down Expand Up @@ -1827,8 +1828,26 @@ private List<ExecutionTask> waitForInterBrokerReplicaTasksToFinish(AlterPartitio
}

boolean retry;
boolean isFirstIteration = true;
do {
Cluster cluster = getClusterForExecutionProgressCheck();
Cluster cluster;
if (isFirstIteration) {
isFirstIteration = false;
long totalDataInMB = 0;
for (ExecutionTask task : inExecutionTasks()) {
totalDataInMB += task.proposal().dataToMoveInMB();
}
if (totalDataInMB == 0 && !inExecutionTasks().isEmpty()) {
LOG.debug("All in-execution tasks are zero-byte moves, using reduced initial delay of {}ms.",
ZERO_BYTE_MOVE_INITIAL_CHECK_DELAY_MS);
Thread.sleep(ZERO_BYTE_MOVE_INITIAL_CHECK_DELAY_MS);
cluster = _metadataClient.refreshMetadata().cluster();
} else {
cluster = getClusterForExecutionProgressCheck();
}
} else {
cluster = getClusterForExecutionProgressCheck();
}
List<ExecutionTask> deadInterBrokerReplicaTasks = new ArrayList<>();
List<ExecutionTask> stoppedInterBrokerReplicaTasks = new ArrayList<>();
List<ExecutionTask> slowTasksToReport = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public class ExecutionConcurrencyManager {
// The total allowed movement concurrency for all types in the cluster. This value cannot be overridden at runtime.
private final int _clusterMovementConcurrency;

// The allowed per-broker concurrency for empty (zero-byte) partition movements.
private final int _defaultEmptyPartitionMovementConcurrency;

// The allowed inter-broker partition movement concurrency for each broker.
private final int _defaultInterBrokerPartitionMovementConcurrency;
private Integer _requestedInterBrokerPartitionMovementConcurrency;
Expand Down Expand Up @@ -74,6 +77,8 @@ public class ExecutionConcurrencyManager {

public ExecutionConcurrencyManager(KafkaCruiseControlConfig config) {
_clusterMovementConcurrency = config.getInt(ExecutorConfig.MAX_NUM_CLUSTER_MOVEMENTS_CONFIG);
_defaultEmptyPartitionMovementConcurrency = config.getInt(
ExecutorConfig.NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER_CONFIG);
_defaultInterBrokerPartitionMovementConcurrency = config.getInt(ExecutorConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG);
_defaultIntraBrokerPartitionMovementConcurrency = config.getInt(ExecutorConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG);
_defaultClusterLeadershipMovementConcurrency = config.getInt(ExecutorConfig.NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG);
Expand Down Expand Up @@ -284,6 +289,13 @@ public synchronized int maxClusterLeadershipMovements() {
: _requestedClusterLeadershipMovementConcurrency;
}

/**
* @return The per-broker concurrency allowed for empty (zero-byte) partition movements.
*/
public int emptyPartitionMovementConcurrency() {
return _defaultEmptyPartitionMovementConcurrency;
}

/**
* Get the concurrency of each broker without per-broker throttling. This concurrency is simply the overall allowed concurrency divided
* by broker count.
Expand Down
Loading
Loading