diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java
index 99a2b4cd87..cc8833fb25 100644
--- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java
+++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java
@@ -148,6 +148,19 @@ public final class ExecutorConfig {
+ "executor will at most allow 10 partitions move out of a broker and 10 partitions move into a broker at any "
+ "given point. This is to avoid overwhelming the cluster by inter-broker partition movements.";
+ /**
+ * num.concurrent.empty.partition.movements.per.broker
+ */
+ public static final String NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER_CONFIG =
+ "num.concurrent.empty.partition.movements.per.broker";
+ public static final int DEFAULT_NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER = 100;
+ public static final String NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER_DOC = "The maximum number of empty "
+ + "(zero-byte) partitions the executor will move to or out of a broker at the same time. Since empty partition "
+ + "moves transfer no data and create no I/O pressure, a much higher concurrency than "
+ + NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG + " is safe. This limit is applied on top of (not instead "
+ + "of) the normal partition movement concurrency -- zero-byte moves that fit within the normal limit use normal "
+ + "slots first, and only overflow into this extended limit.";
+
/**
* num.concurrent.intra.broker.partition.movements
*/
@@ -706,6 +719,12 @@ public static ConfigDef define(ConfigDef configDef) {
atLeast(1),
ConfigDef.Importance.MEDIUM,
NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_DOC)
+ .define(NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER_CONFIG,
+ ConfigDef.Type.INT,
+ DEFAULT_NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER,
+ atLeast(1),
+ ConfigDef.Importance.MEDIUM,
+ NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER_DOC)
.define(NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG,
ConfigDef.Type.INT,
DEFAULT_NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS,
diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java
index dda14e9a41..40da5f0041 100644
--- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java
+++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java
@@ -79,7 +79,8 @@ public synchronized List getInterBrokerReplicaMovementTasks() {
ConcurrencyType.INTER_BROKER_REPLICA);
return _executionTaskPlanner.getInterBrokerReplicaMovementTasks(
brokersReadyForReplicaMovement, _inProgressPartitionsForInterBrokerMovement,
- _executionConcurrencyManager.maxClusterInterBrokerPartitionMovements());
+ _executionConcurrencyManager.maxClusterInterBrokerPartitionMovements(),
+ _executionConcurrencyManager.emptyPartitionMovementConcurrency());
}
/**
diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java
index 2f3f671c08..674dca2cd0 100644
--- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java
+++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java
@@ -343,16 +343,26 @@ public List getLeadershipMovementTasks(ExecutionConcurrencyManage
* controller does not allow updating the ongoing replica reassignment for a partition
* whose replica is being reassigned.
* @param maxInterBrokerPartitionMovements Maximum cap for number of partitions to move at any time
+ * @param maxEmptyPartitionMovementsPerBroker Maximum concurrent empty (zero-byte) partition moves per broker,
+ * used as an extended limit beyond the normal readyBrokers slots.
* @return A list of movements that is executable for the ready brokers.
*/
public List getInterBrokerReplicaMovementTasks(Map readyBrokers,
Set inProgressPartitions,
- int maxInterBrokerPartitionMovements) {
+ int maxInterBrokerPartitionMovements,
+ int maxEmptyPartitionMovementsPerBroker) {
LOG.trace("Getting inter-broker replica movement tasks for brokers with concurrency {}", readyBrokers);
List executableReplicaMovements = new ArrayList<>();
SortedSet interPartMoveBrokerIds = new TreeSet<>(_interPartMoveBrokerComparator);
List interPartMoveBrokerIdsList = new ArrayList<>(_interPartMoveTasksByBrokerId.keySet().size());
+ // Separate slot tracking for zero-byte (empty) partition moves. These slots are used only after
+ // normal readyBrokers slots are exhausted, allowing empty partition moves to proceed at higher concurrency.
+ Map readyBrokersEmptyPartitions = new HashMap<>();
+ for (Map.Entry entry : readyBrokers.entrySet()) {
+ readyBrokersEmptyPartitions.put(entry.getKey(), maxEmptyPartitionMovementsPerBroker);
+ }
+
/*
* The algorithm avoids unfair situation where the available movement slots of a broker is completely taken
* by another broker. It checks the proposals in a round-robin manner that makes sure each ready broker gets
@@ -403,33 +413,39 @@ public List getInterBrokerReplicaMovementTasks(Map slotsToDecrement = useEmptySlots ? readyBrokersEmptyPartitions : readyBrokers;
+ slotsToDecrement.put(sourceBroker, slotsToDecrement.get(sourceBroker) - 1);
for (int broker : destinationBrokers) {
- readyBrokers.put(broker, readyBrokers.get(broker) - 1);
+ slotsToDecrement.put(broker, slotsToDecrement.get(broker) - 1);
}
- // Mark proposal added to true so we will have another round of check.
newTaskAdded = true;
numInProgressPartitions++;
- LOG.debug("Found ready task {} for broker {}. Broker concurrency state: {}", task, brokerId, readyBrokers);
- // We can stop the check for proposals for this broker because we have found a proposal.
+ LOG.debug("Found ready task {} for broker {}. Broker concurrency state: {}, empty partition state: {}",
+ task, brokerId, readyBrokers, readyBrokersEmptyPartitions);
break;
}
}
diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java
index 22f210f0a5..7ca037ac6d 100644
--- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java
+++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java
@@ -85,6 +85,7 @@ public class Executor {
private static final Logger LOG = LoggerFactory.getLogger(Executor.class);
private static final Logger OPERATION_LOG = LoggerFactory.getLogger(OPERATION_LOGGER);
private static final long EXECUTION_PROGRESS_CHECK_INTERVAL_ADJUSTING_MS = 1000;
+ static final long ZERO_BYTE_MOVE_INITIAL_CHECK_DELAY_MS = 1000;
// The execution progress is controlled by the ExecutionTaskManager.
private final ExecutionTaskManager _executionTaskManager;
private final MetadataClient _metadataClient;
@@ -1827,8 +1828,26 @@ private List waitForInterBrokerReplicaTasksToFinish(AlterPartitio
}
boolean retry;
+ boolean isFirstIteration = true;
do {
- Cluster cluster = getClusterForExecutionProgressCheck();
+ Cluster cluster;
+ if (isFirstIteration) {
+ isFirstIteration = false;
+ long totalDataInMB = 0;
+ for (ExecutionTask task : inExecutionTasks()) {
+ totalDataInMB += task.proposal().dataToMoveInMB();
+ }
+ if (totalDataInMB == 0 && !inExecutionTasks().isEmpty()) {
+ LOG.debug("All in-execution tasks are zero-byte moves, using reduced initial delay of {}ms.",
+ ZERO_BYTE_MOVE_INITIAL_CHECK_DELAY_MS);
+ Thread.sleep(ZERO_BYTE_MOVE_INITIAL_CHECK_DELAY_MS);
+ cluster = _metadataClient.refreshMetadata().cluster();
+ } else {
+ cluster = getClusterForExecutionProgressCheck();
+ }
+ } else {
+ cluster = getClusterForExecutionProgressCheck();
+ }
List deadInterBrokerReplicaTasks = new ArrayList<>();
List stoppedInterBrokerReplicaTasks = new ArrayList<>();
List slowTasksToReport = new ArrayList<>();
diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/concurrency/ExecutionConcurrencyManager.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/concurrency/ExecutionConcurrencyManager.java
index bf8170ce88..9ee9592018 100644
--- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/concurrency/ExecutionConcurrencyManager.java
+++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/concurrency/ExecutionConcurrencyManager.java
@@ -37,6 +37,9 @@ public class ExecutionConcurrencyManager {
// The total allowed movement concurrency for all types in the cluster. This value cannot be overridden at runtime.
private final int _clusterMovementConcurrency;
+ // The allowed per-broker concurrency for empty (zero-byte) partition movements.
+ private final int _defaultEmptyPartitionMovementConcurrency;
+
// The allowed inter-broker partition movement concurrency for each broker.
private final int _defaultInterBrokerPartitionMovementConcurrency;
private Integer _requestedInterBrokerPartitionMovementConcurrency;
@@ -74,6 +77,8 @@ public class ExecutionConcurrencyManager {
public ExecutionConcurrencyManager(KafkaCruiseControlConfig config) {
_clusterMovementConcurrency = config.getInt(ExecutorConfig.MAX_NUM_CLUSTER_MOVEMENTS_CONFIG);
+ _defaultEmptyPartitionMovementConcurrency = config.getInt(
+ ExecutorConfig.NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER_CONFIG);
_defaultInterBrokerPartitionMovementConcurrency = config.getInt(ExecutorConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG);
_defaultIntraBrokerPartitionMovementConcurrency = config.getInt(ExecutorConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG);
_defaultClusterLeadershipMovementConcurrency = config.getInt(ExecutorConfig.NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG);
@@ -284,6 +289,13 @@ public synchronized int maxClusterLeadershipMovements() {
: _requestedClusterLeadershipMovementConcurrency;
}
+ /**
+ * @return The per-broker concurrency allowed for empty (zero-byte) partition movements.
+ */
+ public int emptyPartitionMovementConcurrency() {
+ return _defaultEmptyPartitionMovementConcurrency;
+ }
+
/**
* Get the concurrency of each broker without per-broker throttling. This concurrency is simply the overall allowed concurrency divided
* by broker count.
diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java
index 3a3a4c24dd..18245453fe 100644
--- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java
+++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java
@@ -98,6 +98,7 @@ public class ExecutionTaskPlannerTest {
new Node(4, "null", -1),
new Node(5, "null", -1));
private final int _defaultPartitionsMaxCap = ExecutorConfig.DEFAULT_MAX_NUM_CLUSTER_PARTITION_MOVEMENTS_CONFIG;
+ private final int _defaultEmptyPartitionConcurrency = ExecutorConfig.DEFAULT_NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER;
@Test
public void testGetLeaderMovementTasks() {
@@ -265,28 +266,32 @@ public void testGetInterBrokerPartitionMovementTasks() {
basePlanner.addExecutionProposals(proposals, strategyOptions, null);
List partitionMovementTasks = basePlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(),
- _defaultPartitionsMaxCap);
+ _defaultPartitionsMaxCap,
+ _defaultEmptyPartitionConcurrency);
assertEquals("First task", _partitionMovement0, partitionMovementTasks.get(0).proposal());
assertEquals("Second task", _partitionMovement2, partitionMovementTasks.get(1).proposal());
assertEquals("Third task", _partitionMovement1, partitionMovementTasks.get(2).proposal());
postponeUrpPlanner.addExecutionProposals(proposals, strategyOptions, null);
partitionMovementTasks = postponeUrpPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(),
- _defaultPartitionsMaxCap);
+ _defaultPartitionsMaxCap,
+ _defaultEmptyPartitionConcurrency);
assertEquals("First task", _partitionMovement1, partitionMovementTasks.get(0).proposal());
assertEquals("Second task", _partitionMovement3, partitionMovementTasks.get(1).proposal());
assertEquals("Third task", _partitionMovement0, partitionMovementTasks.get(2).proposal());
prioritizeLargeMovementPlanner.addExecutionProposals(proposals, strategyOptions, null);
partitionMovementTasks = prioritizeLargeMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(),
- _defaultPartitionsMaxCap);
+ _defaultPartitionsMaxCap,
+ _defaultEmptyPartitionConcurrency);
assertEquals("First task", _partitionMovement1, partitionMovementTasks.get(0).proposal());
assertEquals("Second task", _partitionMovement3, partitionMovementTasks.get(1).proposal());
assertEquals("Third task", _partitionMovement2, partitionMovementTasks.get(2).proposal());
prioritizeSmallMovementPlanner.addExecutionProposals(proposals, strategyOptions, null);
partitionMovementTasks = prioritizeSmallMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(),
- _defaultPartitionsMaxCap);
+ _defaultPartitionsMaxCap,
+ _defaultEmptyPartitionConcurrency);
assertEquals("First task", _partitionMovement0, partitionMovementTasks.get(0).proposal());
assertEquals("Second task", _partitionMovement2, partitionMovementTasks.get(1).proposal());
assertEquals("Third task", _partitionMovement3, partitionMovementTasks.get(2).proposal());
@@ -294,7 +299,8 @@ public void testGetInterBrokerPartitionMovementTasks() {
smallUrpMovementPlanner.addExecutionProposals(proposals, strategyOptions, null);
partitionMovementTasks = smallUrpMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(),
- _defaultPartitionsMaxCap);
+ _defaultPartitionsMaxCap,
+ _defaultEmptyPartitionConcurrency);
assertEquals("First task", _partitionMovement3, partitionMovementTasks.get(0).proposal());
assertEquals("Second task", _partitionMovement1, partitionMovementTasks.get(1).proposal());
assertEquals("Third task", _partitionMovement0, partitionMovementTasks.get(2).proposal());
@@ -302,7 +308,8 @@ public void testGetInterBrokerPartitionMovementTasks() {
contradictingMovementPlanner.addExecutionProposals(proposals, strategyOptions, null);
partitionMovementTasks = contradictingMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(),
- _defaultPartitionsMaxCap);
+ _defaultPartitionsMaxCap,
+ _defaultEmptyPartitionConcurrency);
assertEquals("First task", _partitionMovement3, partitionMovementTasks.get(0).proposal());
assertEquals("Second task", _partitionMovement1, partitionMovementTasks.get(1).proposal());
assertEquals("Third task", _partitionMovement0, partitionMovementTasks.get(2).proposal());
@@ -310,7 +317,8 @@ public void testGetInterBrokerPartitionMovementTasks() {
prioritizeMinIsrMovementPlanner.addExecutionProposals(proposals, strategyOptions, null);
partitionMovementTasks = prioritizeMinIsrMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(),
- _defaultPartitionsMaxCap);
+ _defaultPartitionsMaxCap,
+ _defaultEmptyPartitionConcurrency);
assertEquals("First task", _partitionMovement0, partitionMovementTasks.get(0).proposal());
assertEquals("Second task", _partitionMovement2, partitionMovementTasks.get(1).proposal());
assertEquals("Third task", _partitionMovement1, partitionMovementTasks.get(2).proposal());
@@ -354,7 +362,8 @@ public void testGetInterBrokerPartitionMovementWithMinIsrTasks() {
readyBrokers.put(5, 6);
prioritizeOneAboveMinIsrMovementPlanner.addExecutionProposals(proposals, strategyOptions, null);
List partitionMovementTasks
- = prioritizeOneAboveMinIsrMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap);
+ = prioritizeOneAboveMinIsrMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(),
+ _defaultPartitionsMaxCap, _defaultEmptyPartitionConcurrency);
assertEquals("First task", _rf4PartitionMovement2, partitionMovementTasks.get(0).proposal());
assertEquals("Second task", _rf4PartitionMovement3, partitionMovementTasks.get(1).proposal());
assertEquals("Third task", _rf4PartitionMovement1, partitionMovementTasks.get(2).proposal());
@@ -387,25 +396,32 @@ public void testDynamicConfigReplicaMovementStrategy() {
readyBrokers.put(3, 8);
planner.addExecutionProposals(proposals, strategyOptions, null);
List partitionMovementTasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(),
- _defaultPartitionsMaxCap);
+ _defaultPartitionsMaxCap,
+ _defaultEmptyPartitionConcurrency);
assertEquals("First task", _partitionMovement0, partitionMovementTasks.get(0).proposal());
assertEquals("Second task", _partitionMovement2, partitionMovementTasks.get(1).proposal());
assertEquals("Third task", _partitionMovement1, partitionMovementTasks.get(2).proposal());
planner.addExecutionProposals(proposals, strategyOptions, new PostponeUrpReplicaMovementStrategy());
- partitionMovementTasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap);
+ partitionMovementTasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(),
+ _defaultPartitionsMaxCap,
+ _defaultEmptyPartitionConcurrency);
assertEquals("First task", _partitionMovement1, partitionMovementTasks.get(0).proposal());
assertEquals("Second task", _partitionMovement3, partitionMovementTasks.get(1).proposal());
assertEquals("Third task", _partitionMovement0, partitionMovementTasks.get(2).proposal());
planner.addExecutionProposals(proposals, strategyOptions, new PrioritizeLargeReplicaMovementStrategy());
- partitionMovementTasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap);
+ partitionMovementTasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(),
+ _defaultPartitionsMaxCap,
+ _defaultEmptyPartitionConcurrency);
assertEquals("First task", _partitionMovement1, partitionMovementTasks.get(0).proposal());
assertEquals("Second task", _partitionMovement3, partitionMovementTasks.get(1).proposal());
assertEquals("Third task", _partitionMovement2, partitionMovementTasks.get(2).proposal());
planner.addExecutionProposals(proposals, strategyOptions, new PrioritizeSmallReplicaMovementStrategy());
- partitionMovementTasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap);
+ partitionMovementTasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(),
+ _defaultPartitionsMaxCap,
+ _defaultEmptyPartitionConcurrency);
assertEquals("First task", _partitionMovement0, partitionMovementTasks.get(0).proposal());
assertEquals("Second task", _partitionMovement2, partitionMovementTasks.get(1).proposal());
assertEquals("Third task", _partitionMovement3, partitionMovementTasks.get(2).proposal());
@@ -474,6 +490,123 @@ public void testGetIntraBrokerPartitionMovementTasks() {
EasyMock.verify(mockAdminClient);
}
+ @Test
+ public void testZeroByteTasksExceedNormalConcurrency() {
+ // Create 10 zero-byte proposals all moving from broker 0 to broker 1.
+ List proposals = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ proposals.add(new ExecutionProposal(new TopicPartition(TOPIC1, i), 0, _r0,
+ Arrays.asList(_r0, _r2), Arrays.asList(_r2, _r1)));
+ }
+
+ Properties props = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties();
+ ExecutionTaskPlanner planner = new ExecutionTaskPlanner(null, new KafkaCruiseControlConfig(props));
+
+ Set partitions = new HashSet<>();
+ for (ExecutionProposal p : proposals) {
+ partitions.add(generatePartitionInfo(p, false));
+ }
+ Cluster cluster = new Cluster(null, _expectedNodes, partitions, Collections.emptySet(), Collections.emptySet());
+ StrategyOptions strategyOptions = new StrategyOptions.Builder(cluster).build();
+
+ planner.addExecutionProposals(proposals, strategyOptions, null);
+
+ // Normal concurrency is 2 per broker -- too low for 10 proposals.
+ Map readyBrokers = new HashMap<>();
+ readyBrokers.put(0, 2);
+ readyBrokers.put(1, 2);
+ readyBrokers.put(2, 2);
+
+ // With empty-partition concurrency of 50, all 10 zero-byte tasks should be scheduled.
+ List tasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(),
+ _defaultPartitionsMaxCap, 50);
+ assertEquals("All 10 zero-byte tasks should be scheduled", 10, tasks.size());
+ }
+
+ @Test
+ public void testMixedZeroByteAndDataBearingTasks() {
+ // Data-bearing proposals added first so they consume the normal slot,
+ // then zero-byte proposals which should overflow into empty-partition slots.
+ List proposals = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ proposals.add(new ExecutionProposal(new TopicPartition(TOPIC1, i), 100, _r0,
+ Arrays.asList(_r0, _r2), Arrays.asList(_r2, _r1)));
+ }
+ for (int i = 3; i < 6; i++) {
+ proposals.add(new ExecutionProposal(new TopicPartition(TOPIC1, i), 0, _r0,
+ Arrays.asList(_r0, _r2), Arrays.asList(_r2, _r1)));
+ }
+
+ Properties props = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties();
+ ExecutionTaskPlanner planner = new ExecutionTaskPlanner(null, new KafkaCruiseControlConfig(props));
+
+ Set partitions = new HashSet<>();
+ for (ExecutionProposal p : proposals) {
+ partitions.add(generatePartitionInfo(p, false));
+ }
+ Cluster cluster = new Cluster(null, _expectedNodes, partitions, Collections.emptySet(), Collections.emptySet());
+ StrategyOptions strategyOptions = new StrategyOptions.Builder(cluster).build();
+
+ planner.addExecutionProposals(proposals, strategyOptions, null);
+
+ // Normal concurrency of 1 per broker.
+ Map readyBrokers = new HashMap<>();
+ readyBrokers.put(0, 1);
+ readyBrokers.put(1, 1);
+ readyBrokers.put(2, 1);
+
+ List tasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(),
+ _defaultPartitionsMaxCap, 50);
+
+ int dataBearingCount = 0;
+ int zeroByteCount = 0;
+ for (ExecutionTask t : tasks) {
+ if (t.proposal().dataToMoveInMB() == 0) {
+ zeroByteCount++;
+ } else {
+ dataBearingCount++;
+ }
+ }
+ // The first data-bearing proposal consumes the single normal slot per broker.
+ // After that, remaining data-bearing tasks can't be scheduled (no normal slots, not zero-byte).
+ // All 3 zero-byte tasks are then scheduled via the empty-partition slots.
+ assertEquals("Only 1 data-bearing task should be scheduled with normal concurrency 1", 1, dataBearingCount);
+ assertEquals("All 3 zero-byte tasks should be scheduled via empty partition slots", 3, zeroByteCount);
+ }
+
+ @Test
+ public void testClusterWideCapRespectedForZeroByteTasks() {
+ // Create 20 zero-byte proposals.
+ List proposals = new ArrayList<>();
+ for (int i = 0; i < 20; i++) {
+ proposals.add(new ExecutionProposal(new TopicPartition(TOPIC1, i), 0, _r0,
+ Arrays.asList(_r0, _r2), Arrays.asList(_r2, _r1)));
+ }
+
+ Properties props = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties();
+ ExecutionTaskPlanner planner = new ExecutionTaskPlanner(null, new KafkaCruiseControlConfig(props));
+
+ Set partitions = new HashSet<>();
+ for (ExecutionProposal p : proposals) {
+ partitions.add(generatePartitionInfo(p, false));
+ }
+ Cluster cluster = new Cluster(null, _expectedNodes, partitions, Collections.emptySet(), Collections.emptySet());
+ StrategyOptions strategyOptions = new StrategyOptions.Builder(cluster).build();
+
+ planner.addExecutionProposals(proposals, strategyOptions, null);
+
+ Map readyBrokers = new HashMap<>();
+ readyBrokers.put(0, 2);
+ readyBrokers.put(1, 2);
+ readyBrokers.put(2, 2);
+
+ // Set cluster-wide cap to 5 -- should stop at 5 even though empty concurrency allows more.
+ int clusterWideCap = 5;
+ List tasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(),
+ clusterWideCap, 100);
+ assertEquals("Cluster-wide cap should be respected", clusterWideCap, tasks.size());
+ }
+
@Test
public void testClear() {
List proposals = new ArrayList<>();
diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java
index 29cbb4ac0d..e92857a91b 100644
--- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java
+++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java
@@ -839,6 +839,32 @@ private void executeAndVerifyProposals(KafkaZkClient kafkaZkClient,
assertFalse("Concurrency manager is not reset after execution", executor.isConcurrencyManagerInitialized());
}
+ @Test
+ public void testZeroByteMoveExecution() throws InterruptedException, OngoingExecutionException {
+ KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(),
+ "ExecutorTestMetricGroup",
+ "ZeroByteMoveExecution",
+ false,
+ _zkClientConfig);
+ try {
+ Map topicDescriptions = createTopics(0);
+ int initialLeader0 = topicDescriptions.get(TOPIC0).partitions().get(0).leader().id();
+ int targetBroker = initialLeader0 == 0 ? 1 : 0;
+
+ // Zero-byte inter-broker move: exercises the reduced-delay first-iteration path
+ // in waitForInterBrokerReplicaTasksToFinish.
+ ExecutionProposal proposal =
+ new ExecutionProposal(TP0, 0, new ReplicaPlacementInfo(initialLeader0),
+ Collections.singletonList(new ReplicaPlacementInfo(initialLeader0)),
+ Collections.singletonList(new ReplicaPlacementInfo(targetBroker)));
+
+ executeAndVerifyProposals(kafkaZkClient, Collections.singletonList(proposal),
+ Collections.singletonList(proposal), false, null, false, true);
+ } finally {
+ KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient);
+ }
+ }
+
private Properties getExecutorProperties() {
Properties props = new Properties();
String capacityConfigFile = Objects.requireNonNull(