Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.DescribeProducersRequest;
import org.apache.kafka.common.requests.DescribeProducersResponse;
import org.apache.kafka.common.utils.CollectionUtils;
import org.apache.kafka.common.utils.LogContext;

import org.slf4j.Logger;
Expand Down Expand Up @@ -89,15 +88,17 @@ public DescribeProducersRequest.Builder buildBatchedRequest(
Set<TopicPartition> topicPartitions
) {
DescribeProducersRequestData request = new DescribeProducersRequestData();
DescribeProducersRequest.Builder builder = new DescribeProducersRequest.Builder(request);

CollectionUtils.groupPartitionsByTopic(
topicPartitions,
builder::addTopic,
(topicRequest, partitionId) -> topicRequest.partitionIndexes().add(partitionId)
);
for (TopicPartition tp : topicPartitions) {
DescribeProducersRequestData.TopicRequest topicRequest = request.topics().find(tp.topic());
if (topicRequest == null) {
topicRequest = new DescribeProducersRequestData.TopicRequest().setName(tp.topic());
request.topics().add(topicRequest);
}
topicRequest.partitionIndexes().add(tp.partition());
}

return builder;
return new DescribeProducersRequest.Builder(request);
}

private void handlePartitionError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.utils.CollectionUtils;
import org.apache.kafka.common.utils.LogContext;

import org.slf4j.Logger;
Expand Down Expand Up @@ -80,17 +79,15 @@ public AdminApiLookupStrategy<TopicPartition> lookupStrategy() {

@Override
ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<TopicPartition> keys) {
Map<String, ListOffsetsTopic> topicsByName = CollectionUtils.groupPartitionsByTopic(
keys,
topicName -> new ListOffsetsTopic().setName(topicName),
(listOffsetsTopic, partitionId) -> {
TopicPartition topicPartition = new TopicPartition(listOffsetsTopic.name(), partitionId);
long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition);
listOffsetsTopic.partitions().add(
new ListOffsetsPartition()
.setPartitionIndex(partitionId)
.setTimestamp(offsetTimestamp));
});
Map<String, ListOffsetsTopic> topicsByName = new HashMap<>();
for (TopicPartition topicPartition : keys) {
ListOffsetsTopic topic = topicsByName.computeIfAbsent(
topicPartition.topic(), t -> new ListOffsetsTopic().setName(t));
long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition);
topic.partitions().add(new ListOffsetsPartition()
.setPartitionIndex(topicPartition.partition())
.setTimestamp(offsetTimestamp));
}
boolean supportsMaxTimestamp = keys
.stream()
.anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.MAX_TIMESTAMP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.utils.CollectionUtils;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -230,7 +230,11 @@ protected MemberData memberData(Subscription subscription) {
static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) {
Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
List<Struct> topicAssignments = new ArrayList<>();
for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {
var partitionsByTopic = new HashMap<String, List<Integer>>();
for (TopicPartition tp : memberData.partitions) {
partitionsByTopic.computeIfAbsent(tp.topic(), t -> new ArrayList<>()).add(tp.partition());
}
for (Map.Entry<String, List<Integer>> topicEntry : partitionsByTopic.entrySet()) {
Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT);
topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
Expand Down Expand Up @@ -272,4 +276,5 @@ private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer)
Optional<Integer> generation = struct.hasField(GENERATION_KEY_NAME) ? Optional.of(struct.getInt(GENERATION_KEY_NAME)) : Optional.empty();
return new MemberData(partitions, generation);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@

import javax.security.auth.callback.Callback;

import static org.apache.kafka.common.utils.CollectionUtils.subtractMap;

/**
* A {@code Callback} for use by the {@code SaslServer} implementation when it
* needs to validate the SASL extensions for the OAUTHBEARER mechanism
Expand Down Expand Up @@ -87,7 +85,14 @@ public Map<String, String> invalidExtensions() {
* @return An immutable {@link Map} consisting of the extensions that have neither been validated nor invalidated
*/
public Map<String, String> ignoredExtensions() {
return Collections.unmodifiableMap(subtractMap(subtractMap(inputExtensions.map(), invalidExtensions), validatedExtensions));
Map<String, String> ignored = new HashMap<>();
for (Map.Entry<String, String> entry : inputExtensions.map().entrySet()) {
String key = entry.getKey();
if (!invalidExtensions.containsKey(key) && !validatedExtensions.containsKey(key)) {
ignored.put(key, entry.getValue());
}
}
return Collections.unmodifiableMap(ignored);
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"fields": [
{ "name": "Topics", "type": "[]TopicRequest", "versions": "0+",
"about": "The topics to list producers for.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
"about": "The topic name." },
{ "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
"about": "The indexes of the partitions to list producers for." }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DescribeProducersRequest;
import org.apache.kafka.common.requests.DescribeProducersResponse;
import org.apache.kafka.common.utils.CollectionUtils;
import org.apache.kafka.common.utils.LogContext;

import org.junit.jupiter.api.Test;
Expand All @@ -50,7 +49,6 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -120,7 +118,7 @@ public void testBuildRequest() {
int brokerId = 3;
DescribeProducersRequest.Builder request = handler.buildBatchedRequest(brokerId, topicPartitions);

List<DescribeProducersRequestData.TopicRequest> topics = request.data.topics();
DescribeProducersRequestData.TopicRequestCollection topics = request.data.topics();

assertEquals(Set.of("foo", "bar"), topics.stream()
.map(DescribeProducersRequestData.TopicRequest::name)
Expand Down Expand Up @@ -195,9 +193,7 @@ public void testCompletedResult() {
DescribeProducersHandler handler = newHandler(options);

PartitionResponse partitionResponse = sampleProducerState(topicPartition);
DescribeProducersResponse response = describeProducersResponse(
singletonMap(topicPartition, partitionResponse)
);
DescribeProducersResponse response = describeProducersResponse(topicPartition, partitionResponse);
Node node = new Node(3, "host", 1);

ApiResult<TopicPartition, PartitionProducerState> result =
Expand Down Expand Up @@ -252,7 +248,7 @@ private DescribeProducersResponse buildResponseWithError(
PartitionResponse partitionResponse = new PartitionResponse()
.setPartitionIndex(topicPartition.partition())
.setErrorCode(error.code());
return describeProducersResponse(singletonMap(topicPartition, partitionResponse));
return describeProducersResponse(topicPartition, partitionResponse);
}

private PartitionResponse sampleProducerState(TopicPartition topicPartition) {
Expand Down Expand Up @@ -304,27 +300,16 @@ private void assertMatchingProducers(
}

private DescribeProducersResponse describeProducersResponse(
Map<TopicPartition, PartitionResponse> partitionResponses
TopicPartition partition, PartitionResponse partitionResponse
) {
DescribeProducersResponseData response = new DescribeProducersResponseData();
Map<String, Map<Integer, PartitionResponse>> partitionResponsesByTopic =
CollectionUtils.groupPartitionDataByTopic(partitionResponses);

for (Map.Entry<String, Map<Integer, PartitionResponse>> topicEntry : partitionResponsesByTopic.entrySet()) {
String topic = topicEntry.getKey();
Map<Integer, PartitionResponse> topicPartitionResponses = topicEntry.getValue();

TopicResponse topicResponse = new TopicResponse().setName(topic);
response.topics().add(topicResponse);
TopicResponse topicResponse = new TopicResponse().setName(partition.topic());
int partitionId = partition.partition();
topicResponse.partitions().add(partitionResponse.setPartitionIndex(partitionId));

for (Map.Entry<Integer, PartitionResponse> partitionEntry : topicPartitionResponses.entrySet()) {
Integer partitionId = partitionEntry.getKey();
PartitionResponse partitionResponse = partitionEntry.getValue();
topicResponse.partitions().add(partitionResponse.setPartitionIndex(partitionId));
}
}
response.topics().add(topicResponse);

return new DescribeProducersResponse(response);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.common.utils.CollectionUtils;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand Down Expand Up @@ -368,7 +367,11 @@ public void testAssignorWithOldVersionSubscriptions() {
private Subscription buildSubscriptionWithOldSchema(List<String> topics, List<TopicPartition> partitions, int consumerIndex) {
Struct struct = new Struct(StickyAssignor.STICKY_ASSIGNOR_USER_DATA_V0);
List<Struct> topicAssignments = new ArrayList<>();
for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupPartitionsByTopic(partitions).entrySet()) {
Map<String, List<Integer>> partitionsByTopic = new HashMap<>();
for (TopicPartition tp : partitions) {
partitionsByTopic.computeIfAbsent(tp.topic(), t -> new ArrayList<>()).add(tp.partition());
}
for (Map.Entry<String, List<Integer>> topicEntry : partitionsByTopic.entrySet()) {
Struct topicAssignment = new Struct(StickyAssignor.TOPIC_ASSIGNMENT);
topicAssignment.set(StickyAssignor.TOPIC_KEY_NAME, topicEntry.getKey());
topicAssignment.set(StickyAssignor.PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.common.utils.CollectionUtils;
import org.apache.kafka.common.utils.Utils;

import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -1522,15 +1521,16 @@ protected void verifyValidityAndBalance(Map<String, Subscription> subscriptions,
if (Math.abs(len - otherLen) <= 1)
continue;

Map<String, List<Integer>> map = CollectionUtils.groupPartitionsByTopic(partitions);
Map<String, List<Integer>> otherMap = CollectionUtils.groupPartitionsByTopic(otherPartitions);

int moreLoaded = len > otherLen ? i : j;
int lessLoaded = len > otherLen ? j : i;

Set<String> otherTopics = otherPartitions.stream()
.map(TopicPartition::topic)
.collect(Collectors.toSet());

// If there's any overlap in the subscribed topics, we should have been able to balance partitions
for (String topic: map.keySet()) {
assertFalse(otherMap.containsKey(topic),
for (TopicPartition tp : partitions) {
assertFalse(otherTopics.contains(tp.topic()),
"Error: Some partitions can be moved from c" + moreLoaded + " to c" + lessLoaded + " to achieve a better balance" +
"\nc" + i + " has " + len + " partitions, and c" + j + " has " + otherLen + " partitions." +
"\nSubscriptions: " + subscriptions +
Expand Down
Loading
Loading