diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java index 3ae5638423cbd..526d0d227e2ca 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java @@ -30,7 +30,6 @@ import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.DescribeProducersRequest; import org.apache.kafka.common.requests.DescribeProducersResponse; -import org.apache.kafka.common.utils.CollectionUtils; import org.apache.kafka.common.utils.LogContext; import org.slf4j.Logger; @@ -89,15 +88,17 @@ public DescribeProducersRequest.Builder buildBatchedRequest( Set topicPartitions ) { DescribeProducersRequestData request = new DescribeProducersRequestData(); - DescribeProducersRequest.Builder builder = new DescribeProducersRequest.Builder(request); - CollectionUtils.groupPartitionsByTopic( - topicPartitions, - builder::addTopic, - (topicRequest, partitionId) -> topicRequest.partitionIndexes().add(partitionId) - ); + for (TopicPartition tp : topicPartitions) { + DescribeProducersRequestData.TopicRequest topicRequest = request.topics().find(tp.topic()); + if (topicRequest == null) { + topicRequest = new DescribeProducersRequestData.TopicRequest().setName(tp.topic()); + request.topics().add(topicRequest); + } + topicRequest.partitionIndexes().add(tp.partition()); + } - return builder; + return new DescribeProducersRequest.Builder(request); } private void handlePartitionError( diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java index c03a6c5bee005..742c8a276f03d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java @@ -32,7 +32,6 @@ import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.requests.ListOffsetsResponse; -import org.apache.kafka.common.utils.CollectionUtils; import org.apache.kafka.common.utils.LogContext; import org.slf4j.Logger; @@ -80,17 +79,15 @@ public AdminApiLookupStrategy lookupStrategy() { @Override ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set keys) { - Map topicsByName = CollectionUtils.groupPartitionsByTopic( - keys, - topicName -> new ListOffsetsTopic().setName(topicName), - (listOffsetsTopic, partitionId) -> { - TopicPartition topicPartition = new TopicPartition(listOffsetsTopic.name(), partitionId); - long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition); - listOffsetsTopic.partitions().add( - new ListOffsetsPartition() - .setPartitionIndex(partitionId) - .setTimestamp(offsetTimestamp)); - }); + Map topicsByName = new HashMap<>(); + for (TopicPartition topicPartition : keys) { + ListOffsetsTopic topic = topicsByName.computeIfAbsent( + topicPartition.topic(), t -> new ListOffsetsTopic().setName(t)); + long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition); + topic.partitions().add(new ListOffsetsPartition() + .setPartitionIndex(topicPartition.partition()) + .setTimestamp(offsetTimestamp)); + } boolean supportsMaxTimestamp = keys .stream() .anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.MAX_TIMESTAMP); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java index c7560ca389893..4e3cc855fed62 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java @@ -23,11 +23,11 @@ import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Type; -import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -230,7 +230,11 @@ protected MemberData memberData(Subscription subscription) { static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) { Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1); List topicAssignments = new ArrayList<>(); - for (Map.Entry> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) { + var partitionsByTopic = new HashMap>(); + for (TopicPartition tp : memberData.partitions) { + partitionsByTopic.computeIfAbsent(tp.topic(), t -> new ArrayList<>()).add(tp.partition()); + } + for (Map.Entry> topicEntry : partitionsByTopic.entrySet()) { Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT); topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey()); topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray()); @@ -272,4 +276,5 @@ private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer) Optional generation = struct.hasField(GENERATION_KEY_NAME) ? Optional.of(struct.getInt(GENERATION_KEY_NAME)) : Optional.empty(); return new MemberData(partitions, generation); } + } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java index e17cfd1746d4b..7874833e27ae1 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java +++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java @@ -25,8 +25,6 @@ import javax.security.auth.callback.Callback; -import static org.apache.kafka.common.utils.CollectionUtils.subtractMap; - /** * A {@code Callback} for use by the {@code SaslServer} implementation when it * needs to validate the SASL extensions for the OAUTHBEARER mechanism @@ -87,7 +85,14 @@ public Map invalidExtensions() { * @return An immutable {@link Map} consisting of the extensions that have neither been validated nor invalidated */ public Map ignoredExtensions() { - return Collections.unmodifiableMap(subtractMap(subtractMap(inputExtensions.map(), invalidExtensions), validatedExtensions)); + Map ignored = new HashMap<>(); + for (Map.Entry entry : inputExtensions.map().entrySet()) { + String key = entry.getKey(); + if (!invalidExtensions.containsKey(key) && !validatedExtensions.containsKey(key)) { + ignored.put(key, entry.getValue()); + } + } + return Collections.unmodifiableMap(ignored); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java deleted file mode 100644 index c4d427a7070b1..0000000000000 --- a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.utils; - -import org.apache.kafka.common.TopicPartition; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.BiConsumer; -import java.util.function.Function; -import java.util.stream.Collectors; - -public final class CollectionUtils { - - private CollectionUtils() {} - - /** - * Given two maps (A, B), returns all the key-value pairs in A whose keys are not contained in B - */ - public static Map subtractMap(Map minuend, Map subtrahend) { - return minuend.entrySet().stream() - .filter(entry -> !subtrahend.containsKey(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - - /** - * group data by topic - * - * @param data Data to be partitioned - * @param Partition data type - * @return partitioned data - */ - public static Map> groupPartitionDataByTopic(Map data) { - Map> dataByTopic = new HashMap<>(); - for (Map.Entry entry : data.entrySet()) { - String topic = entry.getKey().topic(); - int partition = entry.getKey().partition(); - Map topicData = dataByTopic.computeIfAbsent(topic, t -> new HashMap<>()); - topicData.put(partition, entry.getValue()); - } - return dataByTopic; - } - - /** - * Group a list of partitions by the topic name. - * - * @param partitions The partitions to collect - * @return partitions per topic - */ - public static Map> groupPartitionsByTopic(Collection partitions) { - return groupPartitionsByTopic( - partitions, - topic -> new ArrayList<>(), - List::add - ); - } - - /** - * Group a collection of partitions by topic - * - * @return The map used to group the partitions - */ - public static Map groupPartitionsByTopic( - Collection partitions, - Function buildGroup, - BiConsumer addToGroup - ) { - Map dataByTopic = new HashMap<>(); - for (TopicPartition tp : partitions) { - String topic = tp.topic(); - T topicData = dataByTopic.computeIfAbsent(topic, buildGroup); - addToGroup.accept(topicData, tp.partition()); - } - return dataByTopic; - } -} diff --git a/clients/src/main/resources/common/message/DescribeProducersRequest.json b/clients/src/main/resources/common/message/DescribeProducersRequest.json index b7889ef1f1e1c..20f531c7db914 100644 --- a/clients/src/main/resources/common/message/DescribeProducersRequest.json +++ b/clients/src/main/resources/common/message/DescribeProducersRequest.json @@ -23,7 +23,7 @@ "fields": [ { "name": "Topics", "type": "[]TopicRequest", "versions": "0+", "about": "The topics to list producers for.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "about": "The topic name." }, { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+", "about": "The indexes of the partitions to list producers for." } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java index 214d5fe47e913..3a2ab50624918 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java @@ -33,7 +33,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.DescribeProducersRequest; import org.apache.kafka.common.requests.DescribeProducersResponse; -import org.apache.kafka.common.utils.CollectionUtils; import org.apache.kafka.common.utils.LogContext; import org.junit.jupiter.api.Test; @@ -50,7 +49,6 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; -import static java.util.Collections.singletonMap; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -120,7 +118,7 @@ public void testBuildRequest() { int brokerId = 3; DescribeProducersRequest.Builder request = handler.buildBatchedRequest(brokerId, topicPartitions); - List topics = request.data.topics(); + DescribeProducersRequestData.TopicRequestCollection topics = request.data.topics(); assertEquals(Set.of("foo", "bar"), topics.stream() .map(DescribeProducersRequestData.TopicRequest::name) @@ -195,9 +193,7 @@ public void testCompletedResult() { DescribeProducersHandler handler = newHandler(options); PartitionResponse partitionResponse = sampleProducerState(topicPartition); - DescribeProducersResponse response = describeProducersResponse( - singletonMap(topicPartition, partitionResponse) - ); + DescribeProducersResponse response = describeProducersResponse(topicPartition, partitionResponse); Node node = new Node(3, "host", 1); ApiResult result = @@ -252,7 +248,7 @@ private DescribeProducersResponse buildResponseWithError( PartitionResponse partitionResponse = new PartitionResponse() .setPartitionIndex(topicPartition.partition()) .setErrorCode(error.code()); - return describeProducersResponse(singletonMap(topicPartition, partitionResponse)); + return describeProducersResponse(topicPartition, partitionResponse); } private PartitionResponse sampleProducerState(TopicPartition topicPartition) { @@ -304,27 +300,16 @@ private void assertMatchingProducers( } private DescribeProducersResponse describeProducersResponse( - Map partitionResponses + TopicPartition partition, PartitionResponse partitionResponse ) { DescribeProducersResponseData response = new DescribeProducersResponseData(); - Map> partitionResponsesByTopic = - CollectionUtils.groupPartitionDataByTopic(partitionResponses); - - for (Map.Entry> topicEntry : partitionResponsesByTopic.entrySet()) { - String topic = topicEntry.getKey(); - Map topicPartitionResponses = topicEntry.getValue(); - TopicResponse topicResponse = new TopicResponse().setName(topic); - response.topics().add(topicResponse); + TopicResponse topicResponse = new TopicResponse().setName(partition.topic()); + int partitionId = partition.partition(); + topicResponse.partitions().add(partitionResponse.setPartitionIndex(partitionId)); - for (Map.Entry partitionEntry : topicPartitionResponses.entrySet()) { - Integer partitionId = partitionEntry.getKey(); - PartitionResponse partitionResponse = partitionEntry.getValue(); - topicResponse.partitions().add(partitionResponse.setPartitionIndex(partitionId)); - } - } + response.topics().add(topicResponse); return new DescribeProducersResponse(response); } - } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java index c225c291e8fed..b2a04eaec81d1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.test.api.Flaky; -import org.apache.kafka.common.utils.CollectionUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -368,7 +367,11 @@ public void testAssignorWithOldVersionSubscriptions() { private Subscription buildSubscriptionWithOldSchema(List topics, List partitions, int consumerIndex) { Struct struct = new Struct(StickyAssignor.STICKY_ASSIGNOR_USER_DATA_V0); List topicAssignments = new ArrayList<>(); - for (Map.Entry> topicEntry : CollectionUtils.groupPartitionsByTopic(partitions).entrySet()) { + Map> partitionsByTopic = new HashMap<>(); + for (TopicPartition tp : partitions) { + partitionsByTopic.computeIfAbsent(tp.topic(), t -> new ArrayList<>()).add(tp.partition()); + } + for (Map.Entry> topicEntry : partitionsByTopic.entrySet()) { Struct topicAssignment = new Struct(StickyAssignor.TOPIC_ASSIGNMENT); topicAssignment.set(StickyAssignor.TOPIC_KEY_NAME, topicEntry.getKey()); topicAssignment.set(StickyAssignor.PARTITIONS_KEY_NAME, topicEntry.getValue().toArray()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java index 811325645937e..c7b0c909477a3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.test.api.Flaky; -import org.apache.kafka.common.utils.CollectionUtils; import org.apache.kafka.common.utils.Utils; import org.junit.jupiter.api.BeforeEach; @@ -1522,15 +1521,16 @@ protected void verifyValidityAndBalance(Map subscriptions, if (Math.abs(len - otherLen) <= 1) continue; - Map> map = CollectionUtils.groupPartitionsByTopic(partitions); - Map> otherMap = CollectionUtils.groupPartitionsByTopic(otherPartitions); - int moreLoaded = len > otherLen ? i : j; int lessLoaded = len > otherLen ? j : i; + Set otherTopics = otherPartitions.stream() + .map(TopicPartition::topic) + .collect(Collectors.toSet()); + // If there's any overlap in the subscribed topics, we should have been able to balance partitions - for (String topic: map.keySet()) { - assertFalse(otherMap.containsKey(topic), + for (TopicPartition tp : partitions) { + assertFalse(otherTopics.contains(tp.topic()), "Error: Some partitions can be moved from c" + moreLoaded + " to c" + lessLoaded + " to achieve a better balance" + "\nc" + i + " has " + len + " partitions, and c" + j + " has " + otherLen + " partitions." + "\nSubscriptions: " + subscriptions + diff --git a/clients/src/test/java/org/apache/kafka/common/utils/CollectionUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/CollectionUtilsTest.java deleted file mode 100644 index 7c2a9d66e3a2d..0000000000000 --- a/clients/src/test/java/org/apache/kafka/common/utils/CollectionUtilsTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.utils; - -import org.junit.jupiter.api.Test; - -import java.util.HashMap; -import java.util.Map; - -import static org.apache.kafka.common.utils.CollectionUtils.subtractMap; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotSame; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class CollectionUtilsTest { - - @Test - public void testSubtractMapRemovesSecondMapsKeys() { - Map mainMap = new HashMap<>(); - mainMap.put("one", "1"); - mainMap.put("two", "2"); - mainMap.put("three", "3"); - Map secondaryMap = new HashMap<>(); - secondaryMap.put("one", "4"); - secondaryMap.put("two", "5"); - - Map newMap = subtractMap(mainMap, secondaryMap); - - assertEquals(3, mainMap.size()); // original map should not be modified - assertEquals(1, newMap.size()); - assertTrue(newMap.containsKey("three")); - assertEquals("3", newMap.get("three")); - } - - @Test - public void testSubtractMapDoesntRemoveAnythingWhenEmptyMap() { - Map mainMap = new HashMap<>(); - mainMap.put("one", "1"); - mainMap.put("two", "2"); - mainMap.put("three", "3"); - Map secondaryMap = new HashMap<>(); - - Map newMap = subtractMap(mainMap, secondaryMap); - - assertEquals(3, newMap.size()); - assertEquals("1", newMap.get("one")); - assertEquals("2", newMap.get("two")); - assertEquals("3", newMap.get("three")); - assertNotSame(newMap, mainMap); - } -} diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index e2ac0e1433c04..3c4e424590da2 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -648,11 +648,11 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { private def describeProducersRequest: DescribeProducersRequest = new DescribeProducersRequest.Builder( new DescribeProducersRequestData() - .setTopics(java.util.List.of( + .setTopics(new DescribeProducersRequestData.TopicRequestCollection(java.util.List.of( new DescribeProducersRequestData.TopicRequest() .setName(tp.topic) .setPartitionIndexes(java.util.List.of(Int.box(tp.partition))) - )) + ).iterator())) ).build() private def describeTransactionsRequest: DescribeTransactionsRequest = new DescribeTransactionsRequest.Builder( diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index c5974e7e3179b..a8277531bbc6c 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -10496,7 +10496,7 @@ class KafkaApisTest extends Logging { val tp4 = new TopicPartition("invalid;topic", 1) val authorizer: Authorizer = mock(classOf[Authorizer]) - val data = new DescribeProducersRequestData().setTopics(util.List.of( + val data = new DescribeProducersRequestData().setTopics(new DescribeProducersRequestData.TopicRequestCollection(util.List.of( new DescribeProducersRequestData.TopicRequest() .setName(tp1.topic) .setPartitionIndexes(util.List.of(Int.box(tp1.partition))), @@ -10509,7 +10509,7 @@ class KafkaApisTest extends Logging { new DescribeProducersRequestData.TopicRequest() .setName(tp4.topic) .setPartitionIndexes(util.List.of(Int.box(tp4.partition))) - )) + ).iterator())) def buildExpectedActions(topic: String): util.List[Action] = { val pattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index df8683fe65fab..ac35fb8f6ad47 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -652,9 +652,9 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.DESCRIBE_PRODUCERS => new DescribeProducersRequest.Builder(new DescribeProducersRequestData() - .setTopics(util.List.of(new DescribeProducersRequestData.TopicRequest() + .setTopics(new DescribeProducersRequestData.TopicRequestCollection(util.List.of(new DescribeProducersRequestData.TopicRequest() .setName("test-topic") - .setPartitionIndexes(util.List.of[Integer](1, 2, 3))))) + .setPartitionIndexes(util.List.of[Integer](1, 2, 3))).iterator()))) case ApiKeys.BROKER_REGISTRATION => new BrokerRegistrationRequest.Builder(new BrokerRegistrationRequestData())