From 4210480e29017d2ae26a950a85201b2c8f83b96d Mon Sep 17 00:00:00 2001 From: Razvan Dobre Date: Fri, 3 Apr 2026 17:40:39 +0300 Subject: [PATCH 1/2] tmp --- .../config/constants/ExecutorConfig.java | 19 +++ .../executor/ExecutionTaskManager.java | 3 +- .../executor/ExecutionTaskPlanner.java | 44 +++-- .../cruisecontrol/executor/Executor.java | 18 +- .../ExecutionConcurrencyManager.java | 12 ++ .../executor/ExecutionTaskPlannerTest.java | 158 ++++++++++++++++-- 6 files changed, 225 insertions(+), 29 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java index 99a2b4cd87..cc8833fb25 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java @@ -148,6 +148,19 @@ public final class ExecutorConfig { + "executor will at most allow 10 partitions move out of a broker and 10 partitions move into a broker at any " + "given point. This is to avoid overwhelming the cluster by inter-broker partition movements."; + /** + * num.concurrent.empty.partition.movements.per.broker + */ + public static final String NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER_CONFIG = + "num.concurrent.empty.partition.movements.per.broker"; + public static final int DEFAULT_NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER = 100; + public static final String NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER_DOC = "The maximum number of empty " + + "(zero-byte) partitions the executor will move to or out of a broker at the same time. Since empty partition " + + "moves transfer no data and create no I/O pressure, a much higher concurrency than " + + NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG + " is safe. This limit is applied on top of (not instead " + + "of) the normal partition movement concurrency -- zero-byte moves that fit within the normal limit use normal " + + "slots first, and only overflow into this extended limit."; + /** * num.concurrent.intra.broker.partition.movements */ @@ -706,6 +719,12 @@ public static ConfigDef define(ConfigDef configDef) { atLeast(1), ConfigDef.Importance.MEDIUM, NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_DOC) + .define(NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER_CONFIG, + ConfigDef.Type.INT, + DEFAULT_NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER, + atLeast(1), + ConfigDef.Importance.MEDIUM, + NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER_DOC) .define(NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG, ConfigDef.Type.INT, DEFAULT_NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS, diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java index dda14e9a41..40da5f0041 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java @@ -79,7 +79,8 @@ public synchronized List getInterBrokerReplicaMovementTasks() { ConcurrencyType.INTER_BROKER_REPLICA); return _executionTaskPlanner.getInterBrokerReplicaMovementTasks( brokersReadyForReplicaMovement, _inProgressPartitionsForInterBrokerMovement, - _executionConcurrencyManager.maxClusterInterBrokerPartitionMovements()); + _executionConcurrencyManager.maxClusterInterBrokerPartitionMovements(), + _executionConcurrencyManager.emptyPartitionMovementConcurrency()); } /** diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java index 2f3f671c08..0034c8b8ef 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java @@ -347,12 +347,20 @@ public List getLeadershipMovementTasks(ExecutionConcurrencyManage */ public List getInterBrokerReplicaMovementTasks(Map readyBrokers, Set inProgressPartitions, - int maxInterBrokerPartitionMovements) { + int maxInterBrokerPartitionMovements, + int maxEmptyPartitionMovementsPerBroker) { LOG.trace("Getting inter-broker replica movement tasks for brokers with concurrency {}", readyBrokers); List executableReplicaMovements = new ArrayList<>(); SortedSet interPartMoveBrokerIds = new TreeSet<>(_interPartMoveBrokerComparator); List interPartMoveBrokerIdsList = new ArrayList<>(_interPartMoveTasksByBrokerId.keySet().size()); + // Separate slot tracking for zero-byte (empty) partition moves. These slots are used only after + // normal readyBrokers slots are exhausted, allowing empty partition moves to proceed at higher concurrency. + Map readyBrokersEmptyPartitions = new HashMap<>(); + for (Map.Entry entry : readyBrokers.entrySet()) { + readyBrokersEmptyPartitions.put(entry.getKey(), maxEmptyPartitionMovementsPerBroker); + } + /* * The algorithm avoids unfair situation where the available movement slots of a broker is completely taken * by another broker. It checks the proposals in a round-robin manner that makes sure each ready broker gets @@ -403,33 +411,39 @@ public List getInterBrokerReplicaMovementTasks(Map slotsToDecrement = useEmptySlots ? readyBrokersEmptyPartitions : readyBrokers; + slotsToDecrement.put(sourceBroker, slotsToDecrement.get(sourceBroker) - 1); for (int broker : destinationBrokers) { - readyBrokers.put(broker, readyBrokers.get(broker) - 1); + slotsToDecrement.put(broker, slotsToDecrement.get(broker) - 1); } - // Mark proposal added to true so we will have another round of check. newTaskAdded = true; numInProgressPartitions++; - LOG.debug("Found ready task {} for broker {}. Broker concurrency state: {}", task, brokerId, readyBrokers); - // We can stop the check for proposals for this broker because we have found a proposal. + LOG.debug("Found ready task {} for broker {}. Broker concurrency state: {}, empty partition state: {}", + task, brokerId, readyBrokers, readyBrokersEmptyPartitions); break; } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index 22f210f0a5..da885be776 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -1827,8 +1827,24 @@ private List waitForInterBrokerReplicaTasksToFinish(AlterPartitio } boolean retry; + boolean isFirstIteration = true; do { - Cluster cluster = getClusterForExecutionProgressCheck(); + Cluster cluster; + if (isFirstIteration) { + isFirstIteration = false; + long totalDataInMB = 0; + for (ExecutionTask task : inExecutionTasks()) { + totalDataInMB += task.proposal().dataToMoveInMB(); + } + if (totalDataInMB == 0 && !inExecutionTasks().isEmpty()) { + LOG.debug("All in-execution tasks are zero-byte moves, skipping initial sleep."); + cluster = _metadataClient.refreshMetadata().cluster(); + } else { + cluster = getClusterForExecutionProgressCheck(); + } + } else { + cluster = getClusterForExecutionProgressCheck(); + } List deadInterBrokerReplicaTasks = new ArrayList<>(); List stoppedInterBrokerReplicaTasks = new ArrayList<>(); List slowTasksToReport = new ArrayList<>(); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/concurrency/ExecutionConcurrencyManager.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/concurrency/ExecutionConcurrencyManager.java index bf8170ce88..9ee9592018 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/concurrency/ExecutionConcurrencyManager.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/concurrency/ExecutionConcurrencyManager.java @@ -37,6 +37,9 @@ public class ExecutionConcurrencyManager { // The total allowed movement concurrency for all types in the cluster. This value cannot be overridden at runtime. private final int _clusterMovementConcurrency; + // The allowed per-broker concurrency for empty (zero-byte) partition movements. + private final int _defaultEmptyPartitionMovementConcurrency; + // The allowed inter-broker partition movement concurrency for each broker. private final int _defaultInterBrokerPartitionMovementConcurrency; private Integer _requestedInterBrokerPartitionMovementConcurrency; @@ -74,6 +77,8 @@ public class ExecutionConcurrencyManager { public ExecutionConcurrencyManager(KafkaCruiseControlConfig config) { _clusterMovementConcurrency = config.getInt(ExecutorConfig.MAX_NUM_CLUSTER_MOVEMENTS_CONFIG); + _defaultEmptyPartitionMovementConcurrency = config.getInt( + ExecutorConfig.NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER_CONFIG); _defaultInterBrokerPartitionMovementConcurrency = config.getInt(ExecutorConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG); _defaultIntraBrokerPartitionMovementConcurrency = config.getInt(ExecutorConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG); _defaultClusterLeadershipMovementConcurrency = config.getInt(ExecutorConfig.NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG); @@ -284,6 +289,13 @@ public synchronized int maxClusterLeadershipMovements() { : _requestedClusterLeadershipMovementConcurrency; } + /** + * @return The per-broker concurrency allowed for empty (zero-byte) partition movements. + */ + public int emptyPartitionMovementConcurrency() { + return _defaultEmptyPartitionMovementConcurrency; + } + /** * Get the concurrency of each broker without per-broker throttling. This concurrency is simply the overall allowed concurrency divided * by broker count. diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java index 3a3a4c24dd..a7a5cca888 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java @@ -98,6 +98,7 @@ public class ExecutionTaskPlannerTest { new Node(4, "null", -1), new Node(5, "null", -1)); private final int _defaultPartitionsMaxCap = ExecutorConfig.DEFAULT_MAX_NUM_CLUSTER_PARTITION_MOVEMENTS_CONFIG; + private final int _defaultEmptyPartitionConcurrency = ExecutorConfig.DEFAULT_NUM_CONCURRENT_EMPTY_PARTITION_MOVEMENTS_PER_BROKER; @Test public void testGetLeaderMovementTasks() { @@ -265,28 +266,32 @@ public void testGetInterBrokerPartitionMovementTasks() { basePlanner.addExecutionProposals(proposals, strategyOptions, null); List partitionMovementTasks = basePlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), - _defaultPartitionsMaxCap); + _defaultPartitionsMaxCap, + _defaultEmptyPartitionConcurrency); assertEquals("First task", _partitionMovement0, partitionMovementTasks.get(0).proposal()); assertEquals("Second task", _partitionMovement2, partitionMovementTasks.get(1).proposal()); assertEquals("Third task", _partitionMovement1, partitionMovementTasks.get(2).proposal()); postponeUrpPlanner.addExecutionProposals(proposals, strategyOptions, null); partitionMovementTasks = postponeUrpPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), - _defaultPartitionsMaxCap); + _defaultPartitionsMaxCap, + _defaultEmptyPartitionConcurrency); assertEquals("First task", _partitionMovement1, partitionMovementTasks.get(0).proposal()); assertEquals("Second task", _partitionMovement3, partitionMovementTasks.get(1).proposal()); assertEquals("Third task", _partitionMovement0, partitionMovementTasks.get(2).proposal()); prioritizeLargeMovementPlanner.addExecutionProposals(proposals, strategyOptions, null); partitionMovementTasks = prioritizeLargeMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), - _defaultPartitionsMaxCap); + _defaultPartitionsMaxCap, + _defaultEmptyPartitionConcurrency); assertEquals("First task", _partitionMovement1, partitionMovementTasks.get(0).proposal()); assertEquals("Second task", _partitionMovement3, partitionMovementTasks.get(1).proposal()); assertEquals("Third task", _partitionMovement2, partitionMovementTasks.get(2).proposal()); prioritizeSmallMovementPlanner.addExecutionProposals(proposals, strategyOptions, null); partitionMovementTasks = prioritizeSmallMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), - _defaultPartitionsMaxCap); + _defaultPartitionsMaxCap, + _defaultEmptyPartitionConcurrency); assertEquals("First task", _partitionMovement0, partitionMovementTasks.get(0).proposal()); assertEquals("Second task", _partitionMovement2, partitionMovementTasks.get(1).proposal()); assertEquals("Third task", _partitionMovement3, partitionMovementTasks.get(2).proposal()); @@ -294,7 +299,8 @@ public void testGetInterBrokerPartitionMovementTasks() { smallUrpMovementPlanner.addExecutionProposals(proposals, strategyOptions, null); partitionMovementTasks = smallUrpMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), - _defaultPartitionsMaxCap); + _defaultPartitionsMaxCap, + _defaultEmptyPartitionConcurrency); assertEquals("First task", _partitionMovement3, partitionMovementTasks.get(0).proposal()); assertEquals("Second task", _partitionMovement1, partitionMovementTasks.get(1).proposal()); assertEquals("Third task", _partitionMovement0, partitionMovementTasks.get(2).proposal()); @@ -302,7 +308,8 @@ public void testGetInterBrokerPartitionMovementTasks() { contradictingMovementPlanner.addExecutionProposals(proposals, strategyOptions, null); partitionMovementTasks = contradictingMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), - _defaultPartitionsMaxCap); + _defaultPartitionsMaxCap, + _defaultEmptyPartitionConcurrency); assertEquals("First task", _partitionMovement3, partitionMovementTasks.get(0).proposal()); assertEquals("Second task", _partitionMovement1, partitionMovementTasks.get(1).proposal()); assertEquals("Third task", _partitionMovement0, partitionMovementTasks.get(2).proposal()); @@ -310,7 +317,8 @@ public void testGetInterBrokerPartitionMovementTasks() { prioritizeMinIsrMovementPlanner.addExecutionProposals(proposals, strategyOptions, null); partitionMovementTasks = prioritizeMinIsrMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), - _defaultPartitionsMaxCap); + _defaultPartitionsMaxCap, + _defaultEmptyPartitionConcurrency); assertEquals("First task", _partitionMovement0, partitionMovementTasks.get(0).proposal()); assertEquals("Second task", _partitionMovement2, partitionMovementTasks.get(1).proposal()); assertEquals("Third task", _partitionMovement1, partitionMovementTasks.get(2).proposal()); @@ -354,7 +362,8 @@ public void testGetInterBrokerPartitionMovementWithMinIsrTasks() { readyBrokers.put(5, 6); prioritizeOneAboveMinIsrMovementPlanner.addExecutionProposals(proposals, strategyOptions, null); List partitionMovementTasks - = prioritizeOneAboveMinIsrMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap); + = prioritizeOneAboveMinIsrMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), + _defaultPartitionsMaxCap, _defaultEmptyPartitionConcurrency); assertEquals("First task", _rf4PartitionMovement2, partitionMovementTasks.get(0).proposal()); assertEquals("Second task", _rf4PartitionMovement3, partitionMovementTasks.get(1).proposal()); assertEquals("Third task", _rf4PartitionMovement1, partitionMovementTasks.get(2).proposal()); @@ -387,25 +396,32 @@ public void testDynamicConfigReplicaMovementStrategy() { readyBrokers.put(3, 8); planner.addExecutionProposals(proposals, strategyOptions, null); List partitionMovementTasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), - _defaultPartitionsMaxCap); + _defaultPartitionsMaxCap, + _defaultEmptyPartitionConcurrency); assertEquals("First task", _partitionMovement0, partitionMovementTasks.get(0).proposal()); assertEquals("Second task", _partitionMovement2, partitionMovementTasks.get(1).proposal()); assertEquals("Third task", _partitionMovement1, partitionMovementTasks.get(2).proposal()); planner.addExecutionProposals(proposals, strategyOptions, new PostponeUrpReplicaMovementStrategy()); - partitionMovementTasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap); + partitionMovementTasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), + _defaultPartitionsMaxCap, + _defaultEmptyPartitionConcurrency); assertEquals("First task", _partitionMovement1, partitionMovementTasks.get(0).proposal()); assertEquals("Second task", _partitionMovement3, partitionMovementTasks.get(1).proposal()); assertEquals("Third task", _partitionMovement0, partitionMovementTasks.get(2).proposal()); planner.addExecutionProposals(proposals, strategyOptions, new PrioritizeLargeReplicaMovementStrategy()); - partitionMovementTasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap); + partitionMovementTasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), + _defaultPartitionsMaxCap, + _defaultEmptyPartitionConcurrency); assertEquals("First task", _partitionMovement1, partitionMovementTasks.get(0).proposal()); assertEquals("Second task", _partitionMovement3, partitionMovementTasks.get(1).proposal()); assertEquals("Third task", _partitionMovement2, partitionMovementTasks.get(2).proposal()); planner.addExecutionProposals(proposals, strategyOptions, new PrioritizeSmallReplicaMovementStrategy()); - partitionMovementTasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap); + partitionMovementTasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), + _defaultPartitionsMaxCap, + _defaultEmptyPartitionConcurrency); assertEquals("First task", _partitionMovement0, partitionMovementTasks.get(0).proposal()); assertEquals("Second task", _partitionMovement2, partitionMovementTasks.get(1).proposal()); assertEquals("Third task", _partitionMovement3, partitionMovementTasks.get(2).proposal()); @@ -474,6 +490,124 @@ public void testGetIntraBrokerPartitionMovementTasks() { EasyMock.verify(mockAdminClient); } + @Test + public void testZeroByteTasksExceedNormalConcurrency() { + // Create 10 zero-byte proposals all moving from broker 0 to broker 1. + List proposals = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + proposals.add(new ExecutionProposal(new TopicPartition(TOPIC1, i), 0, _r0, + Arrays.asList(_r0, _r2), Arrays.asList(_r2, _r1))); + } + + Properties props = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties(); + ExecutionTaskPlanner planner = new ExecutionTaskPlanner(null, new KafkaCruiseControlConfig(props)); + + Set partitions = new HashSet<>(); + for (ExecutionProposal p : proposals) { + partitions.add(generatePartitionInfo(p, false)); + } + Cluster cluster = new Cluster(null, _expectedNodes, partitions, Collections.emptySet(), Collections.emptySet()); + StrategyOptions strategyOptions = new StrategyOptions.Builder(cluster).build(); + + planner.addExecutionProposals(proposals, strategyOptions, null); + + // Normal concurrency is 2 per broker -- too low for 10 proposals. + Map readyBrokers = new HashMap<>(); + readyBrokers.put(0, 2); + readyBrokers.put(1, 2); + readyBrokers.put(2, 2); + + // With empty-partition concurrency of 50, all 10 zero-byte tasks should be scheduled. + List tasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), + _defaultPartitionsMaxCap, 50); + assertEquals("All 10 zero-byte tasks should be scheduled", 10, tasks.size()); + } + + @Test + public void testMixedZeroByteAndDataBearingTasks() { + // 3 zero-byte proposals + 3 data-bearing proposals, all from broker 0 to broker 1. + List proposals = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + proposals.add(new ExecutionProposal(new TopicPartition(TOPIC1, i), 0, _r0, + Arrays.asList(_r0, _r2), Arrays.asList(_r2, _r1))); + } + for (int i = 3; i < 6; i++) { + proposals.add(new ExecutionProposal(new TopicPartition(TOPIC1, i), 100, _r0, + Arrays.asList(_r0, _r2), Arrays.asList(_r2, _r1))); + } + + Properties props = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties(); + ExecutionTaskPlanner planner = new ExecutionTaskPlanner(null, new KafkaCruiseControlConfig(props)); + + Set partitions = new HashSet<>(); + for (ExecutionProposal p : proposals) { + partitions.add(generatePartitionInfo(p, false)); + } + Cluster cluster = new Cluster(null, _expectedNodes, partitions, Collections.emptySet(), Collections.emptySet()); + StrategyOptions strategyOptions = new StrategyOptions.Builder(cluster).build(); + + planner.addExecutionProposals(proposals, strategyOptions, null); + + // Normal concurrency of 1 per broker. + Map readyBrokers = new HashMap<>(); + readyBrokers.put(0, 1); + readyBrokers.put(1, 1); + readyBrokers.put(2, 1); + + List tasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), + _defaultPartitionsMaxCap, 50); + + // 1 task (data-bearing or zero-byte) uses the normal slot, then remaining zero-byte tasks use empty slots. + // Due to round-robin, up to 4 tasks total can be scheduled (1 normal + 3 zero-byte via empty slots, or similar). + // At minimum, more than just the normal concurrency (1) should be scheduled. + int dataBearingCount = 0; + int zeroBytCount = 0; + for (ExecutionTask t : tasks) { + if (t.proposal().dataToMoveInMB() == 0) { + zeroBytCount++; + } else { + dataBearingCount++; + } + } + // Normal slots allow 1 task per broker (min of src/dst). That uses 1 normal slot. + // After that, zero-byte tasks can still use the empty-partition slots. + assertEquals("Only 1 data-bearing task should be scheduled with normal concurrency 1", 1, dataBearingCount); + assertEquals("All 3 zero-byte tasks should be scheduled via empty partition slots", 3, zeroBytCount); + } + + @Test + public void testClusterWideCapRespectedForZeroByteTasks() { + // Create 20 zero-byte proposals. + List proposals = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + proposals.add(new ExecutionProposal(new TopicPartition(TOPIC1, i), 0, _r0, + Arrays.asList(_r0, _r2), Arrays.asList(_r2, _r1))); + } + + Properties props = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties(); + ExecutionTaskPlanner planner = new ExecutionTaskPlanner(null, new KafkaCruiseControlConfig(props)); + + Set partitions = new HashSet<>(); + for (ExecutionProposal p : proposals) { + partitions.add(generatePartitionInfo(p, false)); + } + Cluster cluster = new Cluster(null, _expectedNodes, partitions, Collections.emptySet(), Collections.emptySet()); + StrategyOptions strategyOptions = new StrategyOptions.Builder(cluster).build(); + + planner.addExecutionProposals(proposals, strategyOptions, null); + + Map readyBrokers = new HashMap<>(); + readyBrokers.put(0, 2); + readyBrokers.put(1, 2); + readyBrokers.put(2, 2); + + // Set cluster-wide cap to 5 -- should stop at 5 even though empty concurrency allows more. + int clusterWideCap = 5; + List tasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), + clusterWideCap, 100); + assertEquals("Cluster-wide cap should be respected", clusterWideCap, tasks.size()); + } + @Test public void testClear() { List proposals = new ArrayList<>(); From d5d519c1436013580c037a21e302b5e9aea9f831 Mon Sep 17 00:00:00 2001 From: Razvan Dobre Date: Fri, 3 Apr 2026 17:48:33 +0300 Subject: [PATCH 2/2] tmp: work in progress - empty partition rebalance optimization Made-with: Cursor --- .../executor/ExecutionTaskPlanner.java | 2 ++ .../cruisecontrol/executor/Executor.java | 5 +++- .../executor/ExecutionTaskPlannerTest.java | 21 +++++++-------- .../cruisecontrol/executor/ExecutorTest.java | 26 +++++++++++++++++++ 4 files changed, 42 insertions(+), 12 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java index 0034c8b8ef..674dca2cd0 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java @@ -343,6 +343,8 @@ public List getLeadershipMovementTasks(ExecutionConcurrencyManage * controller does not allow updating the ongoing replica reassignment for a partition * whose replica is being reassigned. * @param maxInterBrokerPartitionMovements Maximum cap for number of partitions to move at any time + * @param maxEmptyPartitionMovementsPerBroker Maximum concurrent empty (zero-byte) partition moves per broker, + * used as an extended limit beyond the normal readyBrokers slots. * @return A list of movements that is executable for the ready brokers. */ public List getInterBrokerReplicaMovementTasks(Map readyBrokers, diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index da885be776..7ca037ac6d 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -85,6 +85,7 @@ public class Executor { private static final Logger LOG = LoggerFactory.getLogger(Executor.class); private static final Logger OPERATION_LOG = LoggerFactory.getLogger(OPERATION_LOGGER); private static final long EXECUTION_PROGRESS_CHECK_INTERVAL_ADJUSTING_MS = 1000; + static final long ZERO_BYTE_MOVE_INITIAL_CHECK_DELAY_MS = 1000; // The execution progress is controlled by the ExecutionTaskManager. private final ExecutionTaskManager _executionTaskManager; private final MetadataClient _metadataClient; @@ -1837,7 +1838,9 @@ private List waitForInterBrokerReplicaTasksToFinish(AlterPartitio totalDataInMB += task.proposal().dataToMoveInMB(); } if (totalDataInMB == 0 && !inExecutionTasks().isEmpty()) { - LOG.debug("All in-execution tasks are zero-byte moves, skipping initial sleep."); + LOG.debug("All in-execution tasks are zero-byte moves, using reduced initial delay of {}ms.", + ZERO_BYTE_MOVE_INITIAL_CHECK_DELAY_MS); + Thread.sleep(ZERO_BYTE_MOVE_INITIAL_CHECK_DELAY_MS); cluster = _metadataClient.refreshMetadata().cluster(); } else { cluster = getClusterForExecutionProgressCheck(); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java index a7a5cca888..18245453fe 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java @@ -525,14 +525,15 @@ public void testZeroByteTasksExceedNormalConcurrency() { @Test public void testMixedZeroByteAndDataBearingTasks() { - // 3 zero-byte proposals + 3 data-bearing proposals, all from broker 0 to broker 1. + // Data-bearing proposals added first so they consume the normal slot, + // then zero-byte proposals which should overflow into empty-partition slots. List proposals = new ArrayList<>(); for (int i = 0; i < 3; i++) { - proposals.add(new ExecutionProposal(new TopicPartition(TOPIC1, i), 0, _r0, + proposals.add(new ExecutionProposal(new TopicPartition(TOPIC1, i), 100, _r0, Arrays.asList(_r0, _r2), Arrays.asList(_r2, _r1))); } for (int i = 3; i < 6; i++) { - proposals.add(new ExecutionProposal(new TopicPartition(TOPIC1, i), 100, _r0, + proposals.add(new ExecutionProposal(new TopicPartition(TOPIC1, i), 0, _r0, Arrays.asList(_r0, _r2), Arrays.asList(_r2, _r1))); } @@ -557,22 +558,20 @@ public void testMixedZeroByteAndDataBearingTasks() { List tasks = planner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap, 50); - // 1 task (data-bearing or zero-byte) uses the normal slot, then remaining zero-byte tasks use empty slots. - // Due to round-robin, up to 4 tasks total can be scheduled (1 normal + 3 zero-byte via empty slots, or similar). - // At minimum, more than just the normal concurrency (1) should be scheduled. int dataBearingCount = 0; - int zeroBytCount = 0; + int zeroByteCount = 0; for (ExecutionTask t : tasks) { if (t.proposal().dataToMoveInMB() == 0) { - zeroBytCount++; + zeroByteCount++; } else { dataBearingCount++; } } - // Normal slots allow 1 task per broker (min of src/dst). That uses 1 normal slot. - // After that, zero-byte tasks can still use the empty-partition slots. + // The first data-bearing proposal consumes the single normal slot per broker. + // After that, remaining data-bearing tasks can't be scheduled (no normal slots, not zero-byte). + // All 3 zero-byte tasks are then scheduled via the empty-partition slots. assertEquals("Only 1 data-bearing task should be scheduled with normal concurrency 1", 1, dataBearingCount); - assertEquals("All 3 zero-byte tasks should be scheduled via empty partition slots", 3, zeroBytCount); + assertEquals("All 3 zero-byte tasks should be scheduled via empty partition slots", 3, zeroByteCount); } @Test diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java index 29cbb4ac0d..e92857a91b 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java @@ -839,6 +839,32 @@ private void executeAndVerifyProposals(KafkaZkClient kafkaZkClient, assertFalse("Concurrency manager is not reset after execution", executor.isConcurrencyManagerInitialized()); } + @Test + public void testZeroByteMoveExecution() throws InterruptedException, OngoingExecutionException { + KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), + "ExecutorTestMetricGroup", + "ZeroByteMoveExecution", + false, + _zkClientConfig); + try { + Map topicDescriptions = createTopics(0); + int initialLeader0 = topicDescriptions.get(TOPIC0).partitions().get(0).leader().id(); + int targetBroker = initialLeader0 == 0 ? 1 : 0; + + // Zero-byte inter-broker move: exercises the reduced-delay first-iteration path + // in waitForInterBrokerReplicaTasksToFinish. + ExecutionProposal proposal = + new ExecutionProposal(TP0, 0, new ReplicaPlacementInfo(initialLeader0), + Collections.singletonList(new ReplicaPlacementInfo(initialLeader0)), + Collections.singletonList(new ReplicaPlacementInfo(targetBroker))); + + executeAndVerifyProposals(kafkaZkClient, Collections.singletonList(proposal), + Collections.singletonList(proposal), false, null, false, true); + } finally { + KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient); + } + } + private Properties getExecutorProperties() { Properties props = new Properties(); String capacityConfigFile = Objects.requireNonNull(