From 87149b5ea8cf7af7899b3715959df38a3dedacc4 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Sat, 21 Mar 2026 12:57:43 +0800 Subject: [PATCH 01/11] KAFKA-20297: Remove groupPartitionsByTopic/groupPartitionDataByTopic from CollectionUtils Replace all callers with idiomatic alternatives: - DescribeProducersHandler: add mapKey:true to DescribeProducersRequest.json and use find() on TopicRequestCollection - ListOffsetsHandler: use HashMap+computeIfAbsent Also fix Scala test callers affected by the DescribeProducersRequest mapKey change. --- .../internals/DescribeProducersHandler.java | 17 ++++++++------- .../admin/internals/ListOffsetsHandler.java | 21 ++++++++----------- .../message/DescribeProducersRequest.json | 2 +- .../DescribeProducersHandlerTest.java | 8 ++++--- .../kafka/api/AuthorizerIntegrationTest.scala | 4 ++-- .../unit/kafka/server/KafkaApisTest.scala | 4 ++-- .../unit/kafka/server/RequestQuotaTest.scala | 4 ++-- 7 files changed, 30 insertions(+), 30 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java index 3ae5638423cbd..526d0d227e2ca 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java @@ -30,7 +30,6 @@ import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.DescribeProducersRequest; import org.apache.kafka.common.requests.DescribeProducersResponse; -import org.apache.kafka.common.utils.CollectionUtils; import org.apache.kafka.common.utils.LogContext; import org.slf4j.Logger; @@ -89,15 +88,17 @@ public DescribeProducersRequest.Builder buildBatchedRequest( Set topicPartitions ) { DescribeProducersRequestData request = new DescribeProducersRequestData(); - DescribeProducersRequest.Builder builder = new DescribeProducersRequest.Builder(request); - CollectionUtils.groupPartitionsByTopic( - topicPartitions, - builder::addTopic, - (topicRequest, partitionId) -> topicRequest.partitionIndexes().add(partitionId) - ); + for (TopicPartition tp : topicPartitions) { + DescribeProducersRequestData.TopicRequest topicRequest = request.topics().find(tp.topic()); + if (topicRequest == null) { + topicRequest = new DescribeProducersRequestData.TopicRequest().setName(tp.topic()); + request.topics().add(topicRequest); + } + topicRequest.partitionIndexes().add(tp.partition()); + } - return builder; + return new DescribeProducersRequest.Builder(request); } private void handlePartitionError( diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java index c03a6c5bee005..742c8a276f03d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java @@ -32,7 +32,6 @@ import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.requests.ListOffsetsResponse; -import org.apache.kafka.common.utils.CollectionUtils; import org.apache.kafka.common.utils.LogContext; import org.slf4j.Logger; @@ -80,17 +79,15 @@ public AdminApiLookupStrategy lookupStrategy() { @Override ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set keys) { - Map topicsByName = CollectionUtils.groupPartitionsByTopic( - keys, - topicName -> new ListOffsetsTopic().setName(topicName), - (listOffsetsTopic, partitionId) -> { - TopicPartition topicPartition = new TopicPartition(listOffsetsTopic.name(), partitionId); - long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition); - listOffsetsTopic.partitions().add( - new ListOffsetsPartition() - .setPartitionIndex(partitionId) - .setTimestamp(offsetTimestamp)); - }); + Map topicsByName = new HashMap<>(); + for (TopicPartition topicPartition : keys) { + ListOffsetsTopic topic = topicsByName.computeIfAbsent( + topicPartition.topic(), t -> new ListOffsetsTopic().setName(t)); + long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition); + topic.partitions().add(new ListOffsetsPartition() + .setPartitionIndex(topicPartition.partition()) + .setTimestamp(offsetTimestamp)); + } boolean supportsMaxTimestamp = keys .stream() .anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.MAX_TIMESTAMP); diff --git a/clients/src/main/resources/common/message/DescribeProducersRequest.json b/clients/src/main/resources/common/message/DescribeProducersRequest.json index b7889ef1f1e1c..20f531c7db914 100644 --- a/clients/src/main/resources/common/message/DescribeProducersRequest.json +++ b/clients/src/main/resources/common/message/DescribeProducersRequest.json @@ -23,7 +23,7 @@ "fields": [ { "name": "Topics", "type": "[]TopicRequest", "versions": "0+", "about": "The topics to list producers for.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "about": "The topic name." }, { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+", "about": "The indexes of the partitions to list producers for." } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java index 214d5fe47e913..a58db4182b629 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java @@ -33,7 +33,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.DescribeProducersRequest; import org.apache.kafka.common.requests.DescribeProducersResponse; -import org.apache.kafka.common.utils.CollectionUtils; import org.apache.kafka.common.utils.LogContext; import org.junit.jupiter.api.Test; @@ -120,7 +119,7 @@ public void testBuildRequest() { int brokerId = 3; DescribeProducersRequest.Builder request = handler.buildBatchedRequest(brokerId, topicPartitions); - List topics = request.data.topics(); + DescribeProducersRequestData.TopicRequestCollection topics = request.data.topics(); assertEquals(Set.of("foo", "bar"), topics.stream() .map(DescribeProducersRequestData.TopicRequest::name) @@ -308,7 +307,10 @@ private DescribeProducersResponse describeProducersResponse( ) { DescribeProducersResponseData response = new DescribeProducersResponseData(); Map> partitionResponsesByTopic = - CollectionUtils.groupPartitionDataByTopic(partitionResponses); + partitionResponses.entrySet().stream() + .collect(Collectors.groupingBy( + e -> e.getKey().topic(), + Collectors.toMap(e -> e.getKey().partition(), Map.Entry::getValue))); for (Map.Entry> topicEntry : partitionResponsesByTopic.entrySet()) { String topic = topicEntry.getKey(); diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index e2ac0e1433c04..3c4e424590da2 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -648,11 +648,11 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { private def describeProducersRequest: DescribeProducersRequest = new DescribeProducersRequest.Builder( new DescribeProducersRequestData() - .setTopics(java.util.List.of( + .setTopics(new DescribeProducersRequestData.TopicRequestCollection(java.util.List.of( new DescribeProducersRequestData.TopicRequest() .setName(tp.topic) .setPartitionIndexes(java.util.List.of(Int.box(tp.partition))) - )) + ).iterator())) ).build() private def describeTransactionsRequest: DescribeTransactionsRequest = new DescribeTransactionsRequest.Builder( diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index c5974e7e3179b..a8277531bbc6c 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -10496,7 +10496,7 @@ class KafkaApisTest extends Logging { val tp4 = new TopicPartition("invalid;topic", 1) val authorizer: Authorizer = mock(classOf[Authorizer]) - val data = new DescribeProducersRequestData().setTopics(util.List.of( + val data = new DescribeProducersRequestData().setTopics(new DescribeProducersRequestData.TopicRequestCollection(util.List.of( new DescribeProducersRequestData.TopicRequest() .setName(tp1.topic) .setPartitionIndexes(util.List.of(Int.box(tp1.partition))), @@ -10509,7 +10509,7 @@ class KafkaApisTest extends Logging { new DescribeProducersRequestData.TopicRequest() .setName(tp4.topic) .setPartitionIndexes(util.List.of(Int.box(tp4.partition))) - )) + ).iterator())) def buildExpectedActions(topic: String): util.List[Action] = { val pattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index df8683fe65fab..ac35fb8f6ad47 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -652,9 +652,9 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.DESCRIBE_PRODUCERS => new DescribeProducersRequest.Builder(new DescribeProducersRequestData() - .setTopics(util.List.of(new DescribeProducersRequestData.TopicRequest() + .setTopics(new DescribeProducersRequestData.TopicRequestCollection(util.List.of(new DescribeProducersRequestData.TopicRequest() .setName("test-topic") - .setPartitionIndexes(util.List.of[Integer](1, 2, 3))))) + .setPartitionIndexes(util.List.of[Integer](1, 2, 3))).iterator()))) case ApiKeys.BROKER_REGISTRATION => new BrokerRegistrationRequest.Builder(new BrokerRegistrationRequestData()) From 3c488a26f72fb70ac7664d6aeef26243a7ca3217 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Sat, 21 Mar 2026 12:57:53 +0800 Subject: [PATCH 02/11] KAFKA-20297: Inline subtractMap and delete CollectionUtils Inline subtractMap as a private helper in OAuthBearerExtensionsValidatorCallback, then delete the now-empty CollectionUtils class and its test. --- ...AuthBearerExtensionsValidatorCallback.java | 9 +- .../kafka/common/utils/CollectionUtils.java | 93 ------------------- .../common/utils/CollectionUtilsTest.java | 65 ------------- 3 files changed, 7 insertions(+), 160 deletions(-) delete mode 100644 clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java delete mode 100644 clients/src/test/java/org/apache/kafka/common/utils/CollectionUtilsTest.java diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java index e17cfd1746d4b..d7659bf7cdf1c 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java +++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java @@ -22,11 +22,10 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import javax.security.auth.callback.Callback; -import static org.apache.kafka.common.utils.CollectionUtils.subtractMap; - /** * A {@code Callback} for use by the {@code SaslServer} implementation when it * needs to validate the SASL extensions for the OAUTHBEARER mechanism @@ -90,6 +89,12 @@ public Map ignoredExtensions() { return Collections.unmodifiableMap(subtractMap(subtractMap(inputExtensions.map(), invalidExtensions), validatedExtensions)); } + private static Map subtractMap(Map minuend, Map subtrahend) { + return minuend.entrySet().stream() + .filter(entry -> !subtrahend.containsKey(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + /** * Validates a specific extension in the original {@code inputExtensions} map * @param extensionName - the name of the extension which was validated diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java deleted file mode 100644 index c4d427a7070b1..0000000000000 --- a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.utils; - -import org.apache.kafka.common.TopicPartition; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.BiConsumer; -import java.util.function.Function; -import java.util.stream.Collectors; - -public final class CollectionUtils { - - private CollectionUtils() {} - - /** - * Given two maps (A, B), returns all the key-value pairs in A whose keys are not contained in B - */ - public static Map subtractMap(Map minuend, Map subtrahend) { - return minuend.entrySet().stream() - .filter(entry -> !subtrahend.containsKey(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - - /** - * group data by topic - * - * @param data Data to be partitioned - * @param Partition data type - * @return partitioned data - */ - public static Map> groupPartitionDataByTopic(Map data) { - Map> dataByTopic = new HashMap<>(); - for (Map.Entry entry : data.entrySet()) { - String topic = entry.getKey().topic(); - int partition = entry.getKey().partition(); - Map topicData = dataByTopic.computeIfAbsent(topic, t -> new HashMap<>()); - topicData.put(partition, entry.getValue()); - } - return dataByTopic; - } - - /** - * Group a list of partitions by the topic name. - * - * @param partitions The partitions to collect - * @return partitions per topic - */ - public static Map> groupPartitionsByTopic(Collection partitions) { - return groupPartitionsByTopic( - partitions, - topic -> new ArrayList<>(), - List::add - ); - } - - /** - * Group a collection of partitions by topic - * - * @return The map used to group the partitions - */ - public static Map groupPartitionsByTopic( - Collection partitions, - Function buildGroup, - BiConsumer addToGroup - ) { - Map dataByTopic = new HashMap<>(); - for (TopicPartition tp : partitions) { - String topic = tp.topic(); - T topicData = dataByTopic.computeIfAbsent(topic, buildGroup); - addToGroup.accept(topicData, tp.partition()); - } - return dataByTopic; - } -} diff --git a/clients/src/test/java/org/apache/kafka/common/utils/CollectionUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/CollectionUtilsTest.java deleted file mode 100644 index 7c2a9d66e3a2d..0000000000000 --- a/clients/src/test/java/org/apache/kafka/common/utils/CollectionUtilsTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.utils; - -import org.junit.jupiter.api.Test; - -import java.util.HashMap; -import java.util.Map; - -import static org.apache.kafka.common.utils.CollectionUtils.subtractMap; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotSame; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class CollectionUtilsTest { - - @Test - public void testSubtractMapRemovesSecondMapsKeys() { - Map mainMap = new HashMap<>(); - mainMap.put("one", "1"); - mainMap.put("two", "2"); - mainMap.put("three", "3"); - Map secondaryMap = new HashMap<>(); - secondaryMap.put("one", "4"); - secondaryMap.put("two", "5"); - - Map newMap = subtractMap(mainMap, secondaryMap); - - assertEquals(3, mainMap.size()); // original map should not be modified - assertEquals(1, newMap.size()); - assertTrue(newMap.containsKey("three")); - assertEquals("3", newMap.get("three")); - } - - @Test - public void testSubtractMapDoesntRemoveAnythingWhenEmptyMap() { - Map mainMap = new HashMap<>(); - mainMap.put("one", "1"); - mainMap.put("two", "2"); - mainMap.put("three", "3"); - Map secondaryMap = new HashMap<>(); - - Map newMap = subtractMap(mainMap, secondaryMap); - - assertEquals(3, newMap.size()); - assertEquals("1", newMap.get("one")); - assertEquals("2", newMap.get("two")); - assertEquals("3", newMap.get("three")); - assertNotSame(newMap, mainMap); - } -} From 538c8979dd64d827fb7cadab316e4fac05c6df41 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Sat, 21 Mar 2026 12:57:58 +0800 Subject: [PATCH 03/11] KAFKA-20297: Simplify StickyAssignor partition grouping with helper method Extract repeated stream-based grouping logic into a private groupPartitionsByTopic helper in StickyAssignor and AbstractStickyAssignorTest. --- .../apache/kafka/clients/consumer/StickyAssignor.java | 11 +++++++++-- .../kafka/clients/consumer/StickyAssignorTest.java | 7 +++++-- .../internals/AbstractStickyAssignorTest.java | 11 ++++++++--- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java index c7560ca389893..87b558303a029 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java @@ -23,15 +23,16 @@ import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Type; -import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; /** *

The sticky assignor serves two purposes. First, it guarantees an assignment that is as balanced as possible, meaning either: @@ -230,7 +231,8 @@ protected MemberData memberData(Subscription subscription) { static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) { Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1); List topicAssignments = new ArrayList<>(); - for (Map.Entry> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) { + Map> partitionsByTopic = groupPartitionsByTopic(memberData.partitions); + for (Map.Entry> topicEntry : partitionsByTopic.entrySet()) { Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT); topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey()); topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray()); @@ -272,4 +274,9 @@ private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer) Optional generation = struct.hasField(GENERATION_KEY_NAME) ? Optional.of(struct.getInt(GENERATION_KEY_NAME)) : Optional.empty(); return new MemberData(partitions, generation); } + + private static Map> groupPartitionsByTopic(Collection partitions) { + return partitions.stream() + .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.mapping(TopicPartition::partition, Collectors.toList()))); + } } \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java index c225c291e8fed..b2a04eaec81d1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.test.api.Flaky; -import org.apache.kafka.common.utils.CollectionUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -368,7 +367,11 @@ public void testAssignorWithOldVersionSubscriptions() { private Subscription buildSubscriptionWithOldSchema(List topics, List partitions, int consumerIndex) { Struct struct = new Struct(StickyAssignor.STICKY_ASSIGNOR_USER_DATA_V0); List topicAssignments = new ArrayList<>(); - for (Map.Entry> topicEntry : CollectionUtils.groupPartitionsByTopic(partitions).entrySet()) { + Map> partitionsByTopic = new HashMap<>(); + for (TopicPartition tp : partitions) { + partitionsByTopic.computeIfAbsent(tp.topic(), t -> new ArrayList<>()).add(tp.partition()); + } + for (Map.Entry> topicEntry : partitionsByTopic.entrySet()) { Struct topicAssignment = new Struct(StickyAssignor.TOPIC_ASSIGNMENT); topicAssignment.set(StickyAssignor.TOPIC_KEY_NAME, topicEntry.getKey()); topicAssignment.set(StickyAssignor.PARTITIONS_KEY_NAME, topicEntry.getValue().toArray()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java index 811325645937e..16c57575e7952 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.test.api.Flaky; -import org.apache.kafka.common.utils.CollectionUtils; import org.apache.kafka.common.utils.Utils; import org.junit.jupiter.api.BeforeEach; @@ -36,6 +35,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -1428,6 +1428,11 @@ private String pad(int num, int digits) { return "0".repeat(Math.max(0, digits - iDigits)) + num; } + private static Map> groupPartitionsByTopic(Collection partitions) { + return partitions.stream() + .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.mapping(TopicPartition::partition, Collectors.toList()))); + } + protected static List topics(String... topics) { return Arrays.asList(topics); } @@ -1522,8 +1527,8 @@ protected void verifyValidityAndBalance(Map subscriptions, if (Math.abs(len - otherLen) <= 1) continue; - Map> map = CollectionUtils.groupPartitionsByTopic(partitions); - Map> otherMap = CollectionUtils.groupPartitionsByTopic(otherPartitions); + Map> map = groupPartitionsByTopic(partitions); + Map> otherMap = groupPartitionsByTopic(otherPartitions); int moreLoaded = len > otherLen ? i : j; int lessLoaded = len > otherLen ? j : i; From 53ed6fea056de3eb730be12be516c0143f1cfa56 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Sat, 21 Mar 2026 13:16:49 +0800 Subject: [PATCH 04/11] KAFKA-20297: Extract groupPartitionDataByTopic helper in DescribeProducersHandlerTest --- .../internals/DescribeProducersHandlerTest.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java index a58db4182b629..9e49a36066218 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java @@ -307,10 +307,7 @@ private DescribeProducersResponse describeProducersResponse( ) { DescribeProducersResponseData response = new DescribeProducersResponseData(); Map> partitionResponsesByTopic = - partitionResponses.entrySet().stream() - .collect(Collectors.groupingBy( - e -> e.getKey().topic(), - Collectors.toMap(e -> e.getKey().partition(), Map.Entry::getValue))); + groupPartitionDataByTopic(partitionResponses); for (Map.Entry> topicEntry : partitionResponsesByTopic.entrySet()) { String topic = topicEntry.getKey(); @@ -329,4 +326,12 @@ private DescribeProducersResponse describeProducersResponse( return new DescribeProducersResponse(response); } + private static Map> groupPartitionDataByTopic( + Map partitionResponses + ) { + return partitionResponses.entrySet().stream() + .collect(Collectors.groupingBy( + e -> e.getKey().topic(), + Collectors.toMap(e -> e.getKey().partition(), Map.Entry::getValue))); + } } From f465e645a3110bf9597b6c0279892bce14739403 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Tue, 24 Mar 2026 19:53:53 +0800 Subject: [PATCH 05/11] refactor: simplify input of describeProducersResponse to single TopicPartition and PartitionResponse --- .../DescribeProducersHandlerTest.java | 27 +++++-------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java index 9e49a36066218..32f2b05fd94f4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java @@ -49,7 +49,6 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; -import static java.util.Collections.singletonMap; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -194,9 +193,7 @@ public void testCompletedResult() { DescribeProducersHandler handler = newHandler(options); PartitionResponse partitionResponse = sampleProducerState(topicPartition); - DescribeProducersResponse response = describeProducersResponse( - singletonMap(topicPartition, partitionResponse) - ); + DescribeProducersResponse response = describeProducersResponse(topicPartition, partitionResponse); Node node = new Node(3, "host", 1); ApiResult result = @@ -251,7 +248,7 @@ private DescribeProducersResponse buildResponseWithError( PartitionResponse partitionResponse = new PartitionResponse() .setPartitionIndex(topicPartition.partition()) .setErrorCode(error.code()); - return describeProducersResponse(singletonMap(topicPartition, partitionResponse)); + return describeProducersResponse(topicPartition, partitionResponse); } private PartitionResponse sampleProducerState(TopicPartition topicPartition) { @@ -303,25 +300,15 @@ private void assertMatchingProducers( } private DescribeProducersResponse describeProducersResponse( - Map partitionResponses + TopicPartition partition, PartitionResponse partitionResponse ) { DescribeProducersResponseData response = new DescribeProducersResponseData(); - Map> partitionResponsesByTopic = - groupPartitionDataByTopic(partitionResponses); - for (Map.Entry> topicEntry : partitionResponsesByTopic.entrySet()) { - String topic = topicEntry.getKey(); - Map topicPartitionResponses = topicEntry.getValue(); + TopicResponse topicResponse = new TopicResponse().setName(partition.topic()); + int partitionId = partition.partition(); + topicResponse.partitions().add(partitionResponse.setPartitionIndex(partitionId)); - TopicResponse topicResponse = new TopicResponse().setName(topic); - response.topics().add(topicResponse); - - for (Map.Entry partitionEntry : topicPartitionResponses.entrySet()) { - Integer partitionId = partitionEntry.getKey(); - PartitionResponse partitionResponse = partitionEntry.getValue(); - topicResponse.partitions().add(partitionResponse.setPartitionIndex(partitionId)); - } - } + response.topics().add(topicResponse); return new DescribeProducersResponse(response); } From dcf6bc5dd5ac8fedf84325c6128eea908e3181b0 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Wed, 25 Mar 2026 11:30:16 +0800 Subject: [PATCH 06/11] refactor: inline groupPartitionsByTopic in StickyAssignor --- .../org/apache/kafka/clients/consumer/StickyAssignor.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java index 87b558303a029..b63b853e42bfb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java @@ -26,7 +26,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -231,7 +230,8 @@ protected MemberData memberData(Subscription subscription) { static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) { Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1); List topicAssignments = new ArrayList<>(); - Map> partitionsByTopic = groupPartitionsByTopic(memberData.partitions); + Map> partitionsByTopic = memberData.partitions.stream() + .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.mapping(TopicPartition::partition, Collectors.toList()))); for (Map.Entry> topicEntry : partitionsByTopic.entrySet()) { Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT); topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey()); @@ -275,8 +275,4 @@ private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer) return new MemberData(partitions, generation); } - private static Map> groupPartitionsByTopic(Collection partitions) { - return partitions.stream() - .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.mapping(TopicPartition::partition, Collectors.toList()))); - } } \ No newline at end of file From 73e27eef63142853531b1262ad8977768c1360bd Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Thu, 26 Mar 2026 07:31:00 +0800 Subject: [PATCH 07/11] refactor: simplify ignoredExtensions with for-loop in OAuthBearerExtensionsValidatorCallback --- .../OAuthBearerExtensionsValidatorCallback.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java index d7659bf7cdf1c..d4b55120e8b12 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java +++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java @@ -22,8 +22,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.stream.Collectors; - import javax.security.auth.callback.Callback; /** @@ -86,13 +84,14 @@ public Map invalidExtensions() { * @return An immutable {@link Map} consisting of the extensions that have neither been validated nor invalidated */ public Map ignoredExtensions() { - return Collections.unmodifiableMap(subtractMap(subtractMap(inputExtensions.map(), invalidExtensions), validatedExtensions)); - } - - private static Map subtractMap(Map minuend, Map subtrahend) { - return minuend.entrySet().stream() - .filter(entry -> !subtrahend.containsKey(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map ignored = new HashMap<>(); + for (Map.Entry entry : inputExtensions.map().entrySet()) { + String key = entry.getKey(); + if (!invalidExtensions.containsKey(key) && !validatedExtensions.containsKey(key)) { + ignored.put(key, entry.getValue()); + } + } + return Collections.unmodifiableMap(ignored); } /** From 678d200391020b38cd320070da13df86230ab35f Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Thu, 26 Mar 2026 08:07:14 +0800 Subject: [PATCH 08/11] refactor: simplify partition-topic overlap check in AbstractStickyAssignorTest --- .../internals/AbstractStickyAssignorTest.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java index 16c57575e7952..c7b0c909477a3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java @@ -35,7 +35,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -1428,11 +1427,6 @@ private String pad(int num, int digits) { return "0".repeat(Math.max(0, digits - iDigits)) + num; } - private static Map> groupPartitionsByTopic(Collection partitions) { - return partitions.stream() - .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.mapping(TopicPartition::partition, Collectors.toList()))); - } - protected static List topics(String... topics) { return Arrays.asList(topics); } @@ -1527,15 +1521,16 @@ protected void verifyValidityAndBalance(Map subscriptions, if (Math.abs(len - otherLen) <= 1) continue; - Map> map = groupPartitionsByTopic(partitions); - Map> otherMap = groupPartitionsByTopic(otherPartitions); - int moreLoaded = len > otherLen ? i : j; int lessLoaded = len > otherLen ? j : i; + Set otherTopics = otherPartitions.stream() + .map(TopicPartition::topic) + .collect(Collectors.toSet()); + // If there's any overlap in the subscribed topics, we should have been able to balance partitions - for (String topic: map.keySet()) { - assertFalse(otherMap.containsKey(topic), + for (TopicPartition tp : partitions) { + assertFalse(otherTopics.contains(tp.topic()), "Error: Some partitions can be moved from c" + moreLoaded + " to c" + lessLoaded + " to achieve a better balance" + "\nc" + i + " has " + len + " partitions, and c" + j + " has " + otherLen + " partitions." + "\nSubscriptions: " + subscriptions + From 07ae5c0a1f698f18fa02b562295319b114feb827 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Thu, 26 Mar 2026 08:09:23 +0800 Subject: [PATCH 09/11] refactor: remove unused groupPartitionDataByTopic in DescribeProducersHandlerTest --- .../admin/internals/DescribeProducersHandlerTest.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java index 32f2b05fd94f4..3a2ab50624918 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java @@ -312,13 +312,4 @@ private DescribeProducersResponse describeProducersResponse( return new DescribeProducersResponse(response); } - - private static Map> groupPartitionDataByTopic( - Map partitionResponses - ) { - return partitionResponses.entrySet().stream() - .collect(Collectors.groupingBy( - e -> e.getKey().topic(), - Collectors.toMap(e -> e.getKey().partition(), Map.Entry::getValue))); - } } From ceb7923575d043fa71ad9f250e7a6d6c7df08506 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Thu, 26 Mar 2026 08:13:49 +0800 Subject: [PATCH 10/11] style: fix import group blank line in OAuthBearerExtensionsValidatorCallback --- .../oauthbearer/OAuthBearerExtensionsValidatorCallback.java | 1 + 1 file changed, 1 insertion(+) diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java index d4b55120e8b12..7874833e27ae1 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java +++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; + import javax.security.auth.callback.Callback; /** From 41640f91f558393d78c33d98181957b4bd584027 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Sun, 29 Mar 2026 15:23:28 +0800 Subject: [PATCH 11/11] refactor: replace stream with for-loop in StickyAssignor.serializeTopicPartitionAssignment --- .../org/apache/kafka/clients/consumer/StickyAssignor.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java index b63b853e42bfb..4e3cc855fed62 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java @@ -27,11 +27,11 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; /** *

The sticky assignor serves two purposes. First, it guarantees an assignment that is as balanced as possible, meaning either: @@ -230,8 +230,10 @@ protected MemberData memberData(Subscription subscription) { static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) { Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1); List topicAssignments = new ArrayList<>(); - Map> partitionsByTopic = memberData.partitions.stream() - .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.mapping(TopicPartition::partition, Collectors.toList()))); + var partitionsByTopic = new HashMap>(); + for (TopicPartition tp : memberData.partitions) { + partitionsByTopic.computeIfAbsent(tp.topic(), t -> new ArrayList<>()).add(tp.partition()); + } for (Map.Entry> topicEntry : partitionsByTopic.entrySet()) { Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT); topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());