From ad196a853af91b004db790a0ac285d53fec2d152 Mon Sep 17 00:00:00 2001 From: see-quick Date: Mon, 23 Mar 2026 13:27:40 +0100 Subject: [PATCH 1/2] MINOR: Replace Collections factory methods with Java 11+ equivalents in group-coordinator and part of clients Signed-off-by: see-quick --- .../kafka/common/config/AbstractConfig.java | 12 ++++----- .../apache/kafka/common/config/ConfigDef.java | 26 +++++++++---------- .../internals/BrokerSecurityConfigs.java | 5 ++-- .../provider/DirectoryConfigProvider.java | 3 +-- .../errors/TopicAuthorizationException.java | 3 +-- .../common/requests/AbstractResponse.java | 3 +-- .../common/requests/AddRaftVoterResponse.java | 5 ++-- .../requests/AllocateProducerIdsResponse.java | 3 +-- .../requests/AlterPartitionRequest.java | 3 +-- .../AssignReplicasToDirsResponse.java | 3 +-- .../requests/BeginQuorumEpochRequest.java | 6 ++--- .../ConsumerGroupHeartbeatResponse.java | 3 +-- .../ControllerRegistrationResponse.java | 3 +-- .../common/requests/DeleteTopicsRequest.java | 3 +-- .../requests/DescribeGroupsResponse.java | 3 +-- .../requests/DescribeQuorumRequest.java | 5 ++-- .../requests/DescribeQuorumResponse.java | 10 +++---- .../DescribeTopicPartitionsRequest.java | 3 +-- .../common/requests/ElectLeadersRequest.java | 3 +-- .../requests/EndQuorumEpochRequest.java | 5 ++-- .../kafka/common/requests/FetchRequest.java | 5 ++-- .../kafka/common/requests/FetchResponse.java | 3 +-- .../requests/FindCoordinatorRequest.java | 6 ++--- .../requests/FindCoordinatorResponse.java | 5 ++-- .../common/requests/JoinGroupRequest.java | 4 +-- .../common/requests/LeaveGroupRequest.java | 5 ++-- .../common/requests/ListGroupsRequest.java | 3 +-- .../common/requests/ListOffsetsResponse.java | 3 +-- .../common/requests/MetadataRequest.java | 7 +++-- .../common/requests/OffsetFetchRequest.java | 3 +-- .../kafka/common/requests/ProduceRequest.java | 6 ++--- .../common/requests/ProduceResponse.java | 9 +++---- .../requests/RemoveRaftVoterResponse.java | 5 ++-- .../common/requests/ShareFetchResponse.java | 3 +-- .../requests/ShareGroupHeartbeatResponse.java | 3 +-- .../StreamsGroupHeartbeatResponse.java | 2 +- .../requests/UpdateFeaturesRequest.java | 4 +-- .../requests/UpdateRaftVoterResponse.java | 5 ++-- .../kafka/common/requests/VoteRequest.java | 6 ++--- .../group/GroupCoordinatorService.java | 2 +- .../group/GroupMetadataManager.java | 2 +- .../group/streams/StreamsGroup.java | 4 +-- .../group/streams/StreamsGroupMember.java | 2 +- 43 files changed, 89 insertions(+), 118 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index e271cd99c4cd0..39d015ec82cd0 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -133,7 +133,7 @@ public AbstractConfig(ConfigDef definition, Map originals, Map * @param originals the configuration properties plus any optional config provider properties; may not be null */ public AbstractConfig(ConfigDef definition, Map originals) { - this(definition, originals, Collections.emptyMap(), true); + this(definition, originals, Map.of(), true); } /** @@ -146,7 +146,7 @@ public AbstractConfig(ConfigDef definition, Map originals) { * @param doLog whether the configurations should be logged */ public AbstractConfig(ConfigDef definition, Map originals, boolean doLog) { - this(definition, originals, Collections.emptyMap(), doLog); + this(definition, originals, Map.of(), doLog); } /** @@ -168,7 +168,7 @@ protected Map preProcessParsedConfig(Map parsedV * @return a map of updates that should be applied to the configuration (will be validated to prevent bad updates) */ protected Map postProcessParsedConfig(Map parsedValues) { - return Collections.emptyMap(); + return Map.of(); } protected Object get(String key) { @@ -430,7 +430,7 @@ private T getConfiguredInstance(Object klass, Class t, Map T getConfiguredInstance(String key, Class t) { - return getConfiguredInstance(key, t, Collections.emptyMap()); + return getConfiguredInstance(key, t, Map.of()); } /** @@ -458,7 +458,7 @@ public T getConfiguredInstance(String key, Class t, Map c * @return The list of configured instances */ public List getConfiguredInstances(String key, Class t) { - return getConfiguredInstances(key, t, Collections.emptyMap()); + return getConfiguredInstances(key, t, Map.of()); } /** @@ -606,7 +606,7 @@ private Map instantiateConfigProviders( final String configProviders = indirectConfigs.get(CONFIG_PROVIDERS_CONFIG); if (configProviders == null || configProviders.isEmpty()) { - return Collections.emptyMap(); + return Map.of(); } Map providerMap = new HashMap<>(); diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index dd1e4898eaf99..efe7f0a1ecc1b 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -218,7 +218,7 @@ public ConfigDef define(String name, Type type, Object defaultValue, Validator v */ public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, String group, int orderInGroup, Width width, String displayName, Recommender recommender) { - return define(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, Collections.emptyList(), recommender); + return define(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, List.of(), recommender); } /** @@ -237,7 +237,7 @@ public ConfigDef define(String name, Type type, Object defaultValue, Validator v */ public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, String group, int orderInGroup, Width width, String displayName) { - return define(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, Collections.emptyList()); + return define(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, List.of()); } /** @@ -295,7 +295,7 @@ public ConfigDef define(String name, Type type, Object defaultValue, Importance */ public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation, String group, int orderInGroup, Width width, String displayName, Recommender recommender) { - return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, Collections.emptyList(), recommender); + return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, List.of(), recommender); } /** @@ -313,7 +313,7 @@ public ConfigDef define(String name, Type type, Object defaultValue, Importance */ public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation, String group, int orderInGroup, Width width, String displayName) { - return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, Collections.emptyList()); + return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, List.of()); } /** @@ -368,7 +368,7 @@ public ConfigDef define(String name, Type type, Importance importance, String do */ public ConfigDef define(String name, Type type, Importance importance, String documentation, String group, int orderInGroup, Width width, String displayName, Recommender recommender) { - return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, Collections.emptyList(), recommender); + return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, List.of(), recommender); } /** @@ -385,7 +385,7 @@ public ConfigDef define(String name, Type type, Importance importance, String do */ public ConfigDef define(String name, Type type, Importance importance, String documentation, String group, int orderInGroup, Width width, String displayName) { - return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, Collections.emptyList()); + return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, List.of()); } /** @@ -427,7 +427,7 @@ public ConfigDef define(String name, Type type, Object defaultValue, Importance */ public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation, String alternativeString) { return define(name, type, defaultValue, null, importance, documentation, null, -1, Width.NONE, - name, Collections.emptyList(), null, alternativeString); + name, List.of(), null, alternativeString); } /** @@ -452,7 +452,7 @@ public ConfigDef define(String name, Type type, Importance importance, String do * @return This ConfigDef so you can chain calls */ public ConfigDef defineInternal(final String name, final Type type, final Object defaultValue, final Importance importance) { - return define(new ConfigKey(name, type, defaultValue, null, importance, "", "", -1, Width.NONE, name, Collections.emptyList(), null, true, null)); + return define(new ConfigKey(name, type, defaultValue, null, importance, "", "", -1, Width.NONE, name, List.of(), null, true, null)); } /** @@ -467,7 +467,7 @@ public ConfigDef defineInternal(final String name, final Type type, final Object * @return This ConfigDef so you can chain calls */ public ConfigDef defineInternal(final String name, final Type type, final Object defaultValue, final Validator validator, final Importance importance, final String documentation) { - return define(new ConfigKey(name, type, defaultValue, validator, importance, documentation, "", -1, Width.NONE, name, Collections.emptyList(), null, true, null)); + return define(new ConfigKey(name, type, defaultValue, validator, importance, documentation, "", -1, Width.NONE, name, List.of(), null, true, null)); } /** @@ -768,7 +768,7 @@ else if (value instanceof String) return value; else if (value instanceof String) if (trimmed.isEmpty()) - return Collections.emptyList(); + return List.of(); else return Arrays.asList(COMMA_WITH_WHITESPACE.split(trimmed, -1)); else @@ -1445,7 +1445,7 @@ static String niceTimeUnits(long millis) { } public String toHtmlTable() { - return toHtmlTable(Collections.emptyMap()); + return toHtmlTable(Map.of()); } private void addHeader(StringBuilder builder, String headerName) { @@ -1700,7 +1700,7 @@ public boolean visible(String name, Map parsedConfig) { } public String toHtml() { - return toHtml(Collections.emptyMap()); + return toHtml(Map.of()); } /** @@ -1709,7 +1709,7 @@ public String toHtml() { * @param idGenerator A function for computing the HTML id attribute in the generated HTML from a given config name. */ public String toHtml(int headerDepth, Function idGenerator) { - return toHtml(headerDepth, idGenerator, Collections.emptyMap()); + return toHtml(headerDepth, idGenerator, Map.of()); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java index c5289676439c4..339acab0a22c2 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder; import org.apache.kafka.common.utils.Utils; -import java.util.Collections; import java.util.List; import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; @@ -61,7 +60,7 @@ public class BrokerSecurityConfigs { " configuration."; public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG = "sasl.kerberos.principal.to.local.rules"; - public static final List DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = Collections.singletonList(DEFAULT_SSL_PRINCIPAL_MAPPING_RULES); + public static final List DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = List.of(DEFAULT_SSL_PRINCIPAL_MAPPING_RULES); public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A list of rules for mapping from principal " + "names to short names (typically operating system usernames). The rules are evaluated in order and the " + "first rule that matches a principal name is used to map it to a short name. Any later rules in the list are " + @@ -95,7 +94,7 @@ public class BrokerSecurityConfigs { + ""; public static final String SASL_ENABLED_MECHANISMS_CONFIG = "sasl.enabled.mechanisms"; - public static final List DEFAULT_SASL_ENABLED_MECHANISMS = Collections.singletonList(SaslConfigs.GSSAPI_MECHANISM); + public static final List DEFAULT_SASL_ENABLED_MECHANISMS = List.of(SaslConfigs.GSSAPI_MECHANISM); public static final String SASL_ENABLED_MECHANISMS_DOC = "The list of SASL mechanisms enabled in the Kafka server. " + "The list may contain any mechanism for which a security provider is available. " + "Only GSSAPI is enabled by default."; diff --git a/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java b/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java index 3e0fa5ed77258..aecd29be6ff09 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java +++ b/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java @@ -32,7 +32,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static java.util.Collections.emptyMap; /** * An implementation of {@link ConfigProvider} based on a directory of files. @@ -87,7 +86,7 @@ private ConfigData get(String path, Predicate fileFilter) { throw new IllegalStateException("The provider has not been configured yet."); } - Map map = emptyMap(); + Map map = Map.of(); if (path != null && !path.isEmpty()) { Path dir = allowedPaths.parseUntrustedPath(path); diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java index e2235f804e170..0b4b210e1f615 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.errors; -import java.util.Collections; import java.util.Set; public class TopicAuthorizationException extends AuthorizationException { @@ -32,7 +31,7 @@ public TopicAuthorizationException(Set unauthorizedTopics) { } public TopicAuthorizationException(String message) { - this(message, Collections.emptySet()); + this(message, Set.of()); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index bc313078d7424..7d96d1a17317c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -26,7 +26,6 @@ import java.nio.ByteBuffer; import java.util.Collection; -import java.util.Collections; import java.util.EnumMap; import java.util.Map; import java.util.stream.Collectors; @@ -65,7 +64,7 @@ final ByteBufferAccessor serialize(short version) { public abstract Map errorCounts(); protected static Map errorCounts(Errors error) { - return Collections.singletonMap(error, 1); + return Map.of(error, 1); } protected static Map errorCounts(Stream errors) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterResponse.java index 52a0cb05feb76..1efaed61b9946 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterResponse.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; -import java.util.Collections; import java.util.Map; public class AddRaftVoterResponse extends AbstractResponse { @@ -51,9 +50,9 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) { @Override public Map errorCounts() { if (data.errorCode() != Errors.NONE.code()) { - return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); + return Map.of(Errors.forCode(data.errorCode()), 1); } else { - return Collections.emptyMap(); + return Map.of(); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java index 4c47651193188..2575e05abb59d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; -import java.util.Collections; import java.util.Map; public class AllocateProducerIdsResponse extends AbstractResponse { @@ -47,7 +46,7 @@ public AllocateProducerIdsResponseData data() { */ @Override public Map errorCounts() { - return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); + return Map.of(Errors.forCode(data.errorCode()), 1); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java index 2d181f4876629..d6a0eccb611f3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.Readable; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -90,7 +89,7 @@ public AlterPartitionRequest build(short version) { newIsr.add(brokerState.brokerId()) ); partitionData.setNewIsr(newIsr); - partitionData.setNewIsrWithEpochs(Collections.emptyList()); + partitionData.setNewIsrWithEpochs(List.of()); } }) ); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsResponse.java index 84f86d058ec65..d785e807dfc44 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsResponse.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; -import java.util.Collections; import java.util.Map; public class AssignReplicasToDirsResponse extends AbstractResponse { @@ -40,7 +39,7 @@ public AssignReplicasToDirsResponseData data() { @Override public Map errorCounts() { - return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); + return Map.of(Errors.forCode(data.errorCode()), 1); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochRequest.java index f3375da9c4171..dffa3049dd88e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochRequest.java @@ -22,8 +22,8 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; +import java.util.List; -import java.util.Collections; public class BeginQuorumEpochRequest extends AbstractRequest { public static class Builder extends AbstractRequest.Builder { @@ -75,10 +75,10 @@ public static BeginQuorumEpochRequestData singletonRequest( ) { return new BeginQuorumEpochRequestData() .setClusterId(clusterId) - .setTopics(Collections.singletonList( + .setTopics(List.of( new BeginQuorumEpochRequestData.TopicData() .setTopicName(topicPartition.topic()) - .setPartitions(Collections.singletonList( + .setPartitions(List.of( new BeginQuorumEpochRequestData.PartitionData() .setPartitionIndex(topicPartition.partition()) .setLeaderEpoch(leaderEpoch) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java index dbbabdcaccdef..ebc9c46beb39d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.Readable; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -59,7 +58,7 @@ public ConsumerGroupHeartbeatResponseData data() { @Override public Map errorCounts() { - return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); + return Map.of(Errors.forCode(data.errorCode()), 1); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationResponse.java index 9cc53db15a9a4..d6462f1f501ef 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationResponse.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; -import java.util.Collections; import java.util.Map; public class ControllerRegistrationResponse extends AbstractResponse { @@ -50,7 +49,7 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) { @Override public Map errorCounts() { - return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); + return Map.of(Errors.forCode(data.errorCode()), 1); } public static ControllerRegistrationResponse parse(Readable readable, short version) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java index b90f853211ddc..dac9f21c169d7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.Readable; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -104,7 +103,7 @@ public int numberOfTopics() { public List topicIds() { if (version() >= 6) return data.topics().stream().map(DeleteTopicState::topicId).collect(Collectors.toList()); - return Collections.emptyList(); + return List.of(); } public List topics() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java index 4d59aee8758ab..b97526d0dd5f4 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.Readable; import org.apache.kafka.common.utils.Utils; -import java.util.Collections; import java.util.EnumMap; import java.util.List; import java.util.Map; @@ -106,7 +105,7 @@ public static DescribedGroup groupMetadata( public static DescribedGroup groupError(String groupId, Errors error) { return groupMetadata(groupId, error, DescribeGroupsResponse.UNKNOWN_STATE, DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE, - DescribeGroupsResponse.UNKNOWN_PROTOCOL, Collections.emptyList(), AUTHORIZED_OPERATIONS_OMITTED); + DescribeGroupsResponse.UNKNOWN_PROTOCOL, List.of(), AUTHORIZED_OPERATIONS_OMITTED); } public static DescribedGroup groupError(String groupId, Errors error, String errorMessage) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java index e5f6b00f9fce4..569d58f115967 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.Readable; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -61,10 +60,10 @@ public static DescribeQuorumRequest parse(Readable readable, short version) { public static DescribeQuorumRequestData singletonRequest(TopicPartition topicPartition) { return new DescribeQuorumRequestData() - .setTopics(Collections.singletonList( + .setTopics(List.of( new DescribeQuorumRequestData.TopicData() .setTopicName(topicPartition.topic()) - .setPartitions(Collections.singletonList( + .setPartitions(List.of( new DescribeQuorumRequestData.PartitionData() .setPartitionIndex(topicPartition.partition())) ))); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java index c3b33d48052cd..bbc936a7846e1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java @@ -22,8 +22,8 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; -import java.util.Collections; import java.util.EnumMap; +import java.util.List; import java.util.Map; /** @@ -79,9 +79,9 @@ public static DescribeQuorumResponseData singletonErrorResponse( Errors error ) { return new DescribeQuorumResponseData() - .setTopics(Collections.singletonList(new DescribeQuorumResponseData.TopicData() + .setTopics(List.of(new DescribeQuorumResponseData.TopicData() .setTopicName(topicPartition.topic()) - .setPartitions(Collections.singletonList(new DescribeQuorumResponseData.PartitionData() + .setPartitions(List.of(new DescribeQuorumResponseData.PartitionData() .setPartitionIndex(topicPartition.partition()) .setErrorCode(error.code()) .setErrorMessage(error.message()))))); @@ -94,9 +94,9 @@ public static DescribeQuorumResponseData singletonResponse( DescribeQuorumResponseData.NodeCollection nodes ) { DescribeQuorumResponseData res = new DescribeQuorumResponseData() - .setTopics(Collections.singletonList(new DescribeQuorumResponseData.TopicData() + .setTopics(List.of(new DescribeQuorumResponseData.TopicData() .setTopicName(topicPartition.topic()) - .setPartitions(Collections.singletonList(partitionData + .setPartitions(List.of(partitionData .setPartitionIndex(topicPartition.partition()))))); if (nodes != null) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java index f65ef91db5410..0054bd8a8f011 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; -import java.util.Collections; import java.util.List; public class DescribeTopicPartitionsRequest extends AbstractRequest { @@ -83,7 +82,7 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { .setName(topic.name()) .setErrorCode(error.code()) .setIsInternal(false) - .setPartitions(Collections.emptyList()) + .setPartitions(List.of()) ); } responseData.setThrottleTimeMs(throttleTimeMs); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java index 8ed9cb676717f..c48098ccd4b9c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java @@ -30,7 +30,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -92,7 +91,7 @@ private ElectLeadersRequestData toRequestData(short version) { public Set topicPartitions() { if (this.data.topicPartitions() == null) { - return Collections.emptySet(); + return Set.of(); } return this.data.topicPartitions().stream() .flatMap(topicPartition -> topicPartition.partitions().stream() diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochRequest.java index de45ca457cb2b..e6b9c2a84868a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochRequest.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; -import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -84,10 +83,10 @@ public static EndQuorumEpochRequestData singletonRequest(TopicPartition topicPar List preferredSuccessors) { return new EndQuorumEpochRequestData() .setClusterId(clusterId) - .setTopics(Collections.singletonList( + .setTopics(List.of( new EndQuorumEpochRequestData.TopicData() .setTopicName(topicPartition.topic()) - .setPartitions(Collections.singletonList( + .setPartitions(List.of( new EndQuorumEpochRequestData.PartitionData() .setPartitionIndex(topicPartition.partition()) .setLeaderEpoch(leaderEpoch) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 762acd0938703..8bce0c775e270 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -30,7 +30,6 @@ import org.apache.kafka.common.record.internal.RecordBatch; import java.util.ArrayList; -import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -159,8 +158,8 @@ public static class Builder extends AbstractRequest.Builder { private IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED; private int maxBytes = DEFAULT_RESPONSE_MAX_BYTES; private FetchMetadata metadata = FetchMetadata.LEGACY; - private List removed = Collections.emptyList(); - private List replaced = Collections.emptyList(); + private List removed = List.of(); + private List replaced = List.of(); private String rackId = ""; public static Builder forConsumer(short maxVersion, int maxWait, int minBytes, Map fetchData) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index d489906c7352e..eee7daee2ae37 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.record.internal.Records; import java.util.ArrayList; -import java.util.Collections; import java.util.EnumMap; import java.util.Iterator; import java.util.LinkedHashMap; @@ -166,7 +165,7 @@ public static int sizeOf(short version, FetchResponseData.PartitionData>> partIterator) { // Since the throttleTimeMs and metadata field sizes are constant and fixed, we can // use arbitrary values here without affecting the result. - FetchResponseData data = toMessage(Errors.NONE, 0, INVALID_SESSION_ID, partIterator, Collections.emptyList()); + FetchResponseData data = toMessage(Errors.NONE, 0, INVALID_SESSION_ID, partIterator, List.of()); ObjectSerializationCache cache = new ObjectSerializationCache(); return 4 + data.size(cache, version); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java index b14fae5cfac91..d63efe082e17e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java @@ -24,8 +24,8 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; +import java.util.List; -import java.util.Collections; public class FindCoordinatorRequest extends AbstractRequest { @@ -52,10 +52,10 @@ public FindCoordinatorRequest build(short version) { "because we require features supported only in " + MIN_BATCHED_VERSION + " or later."); if (batchedKeys == 1) { data.setKey(data.coordinatorKeys().get(0)); - data.setCoordinatorKeys(Collections.emptyList()); + data.setCoordinatorKeys(List.of()); } } else if (batchedKeys == 0 && data.key() != null) { - data.setCoordinatorKeys(Collections.singletonList(data.key())); + data.setCoordinatorKeys(List.of(data.key())); data.setKey(""); // default value } return new FindCoordinatorRequest(data, version); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java index 5bd08934b62c7..fb997b05d6113 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.Readable; import java.util.ArrayList; -import java.util.Collections; import java.util.EnumMap; import java.util.List; import java.util.Map; @@ -132,7 +131,7 @@ public List coordinators() { .setNodeId(data.nodeId()) .setHost(data.host()) .setPort(data.port()); - return Collections.singletonList(coordinator); + return List.of(coordinator); } } @@ -148,7 +147,7 @@ public static FindCoordinatorResponse prepareOldResponse(Errors error, Node node public static FindCoordinatorResponse prepareResponse(Errors error, String key, Node node) { FindCoordinatorResponseData data = new FindCoordinatorResponseData(); - data.setCoordinators(Collections.singletonList( + data.setCoordinators(List.of( prepareCoordinatorResponse(error, key, node) )); return new FindCoordinatorResponse(data); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index 9d75d383bab13..0c8b827875d08 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -24,8 +24,8 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; +import java.util.List; -import java.util.Collections; public class JoinGroupRequest extends AbstractRequest { @@ -197,7 +197,7 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { .setProtocolName(UNKNOWN_PROTOCOL_NAME) .setLeader(UNKNOWN_MEMBER_ID) .setMemberId(UNKNOWN_MEMBER_ID) - .setMembers(Collections.emptyList()); + .setMembers(List.of()); if (version() >= 7) data.setProtocolName(null); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java index 2dd69afab9812..4a97c05037b9e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.common.protocol.Readable; -import java.util.Collections; import java.util.List; public class LeaveGroupRequest extends AbstractRequest { @@ -97,14 +96,14 @@ public LeaveGroupRequestData normalizedData() { } else { return new LeaveGroupRequestData() .setGroupId(data.groupId()) - .setMembers(Collections.singletonList( + .setMembers(List.of( new MemberIdentity().setMemberId(data.memberId()))); } } public List members() { // Before version 3, leave group request is still in single mode - return version() <= 2 ? Collections.singletonList( + return version() <= 2 ? List.of( new MemberIdentity() .setMemberId(data.memberId())) : data.members(); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java index 84f7cc2a72d69..ccdf356aff8ee 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; -import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -86,7 +85,7 @@ public ListGroupsRequest(ListGroupsRequestData data, short version) { @Override public ListGroupsResponse getErrorResponse(int throttleTimeMs, Throwable e) { ListGroupsResponseData listGroupsResponseData = new ListGroupsResponseData(). - setGroups(Collections.emptyList()). + setGroups(List.of()). setErrorCode(Errors.forException(e).code()); if (version() >= 1) { listGroupsResponseData.setThrottleTimeMs(throttleTimeMs); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java index a1dc329054c3e..7cab5d4bb5047 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.Readable; import org.apache.kafka.common.record.internal.RecordBatch; -import java.util.Collections; import java.util.EnumMap; import java.util.List; import java.util.Map; @@ -105,7 +104,7 @@ public boolean shouldClientThrottle(short version) { public static ListOffsetsTopicResponse singletonListOffsetsTopicResponse(TopicPartition tp, Errors error, long timestamp, long offset, int epoch) { return new ListOffsetsTopicResponse() .setName(tp.topic()) - .setPartitions(Collections.singletonList(new ListOffsetsPartitionResponse() + .setPartitions(List.of(new ListOffsetsPartitionResponse() .setPartitionIndex(tp.partition()) .setErrorCode(error.code()) .setTimestamp(timestamp) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index d3dcabfb4f9c5..392bf2a9105e3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -26,7 +26,6 @@ import org.apache.kafka.common.protocol.Readable; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -173,7 +172,7 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { .setTopicId(topic.topicId()) .setErrorCode(error.code()) .setIsInternal(false) - .setPartitions(Collections.emptyList())); + .setPartitions(List.of())); } } @@ -200,9 +199,9 @@ public List topics() { public List topicIds() { if (isAllTopics()) - return Collections.emptyList(); + return List.of(); else if (version() < 10) - return Collections.emptyList(); + return List.of(); else return data.topics() .stream() diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java index 9b8eed7cb3b5c..3cb9588c4b78e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java @@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -228,7 +227,7 @@ public List groups() { ); } - return Collections.singletonList(group); + return List.of(group); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index e733fa9eda43c..9b922d06606d1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -30,9 +30,9 @@ import org.apache.kafka.common.record.internal.Records; import org.apache.kafka.common.utils.Utils; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -174,7 +174,7 @@ public ProduceResponse getErrorResponse(int throttleTimeMs, Throwable e) { } tpr.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse() .setIndex(tpId.partition()) - .setRecordErrors(Collections.emptyList()) + .setRecordErrors(List.of()) .setBaseOffset(INVALID_OFFSET) .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) .setLogStartOffset(INVALID_OFFSET) @@ -187,7 +187,7 @@ public ProduceResponse getErrorResponse(int throttleTimeMs, Throwable e) { @Override public Map errorCounts(Throwable e) { Errors error = Errors.forException(e); - return Collections.singletonMap(error, partitionSizes().size()); + return Map.of(error, partitionSizes().size()); } public short acks() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index df3bff0c32f5d..9d7817e490290 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.Readable; import org.apache.kafka.common.record.internal.RecordBatch; -import java.util.Collections; import java.util.EnumMap; import java.util.List; import java.util.Map; @@ -74,7 +73,7 @@ public ProduceResponse(ProduceResponseData produceResponseData) { */ @Deprecated public ProduceResponse(Map responses) { - this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList()); + this(responses, DEFAULT_THROTTLE_TIME, List.of()); } /** @@ -85,7 +84,7 @@ public ProduceResponse(Map responses) { */ @Deprecated public ProduceResponse(Map responses, int throttleTimeMs) { - this(toData(responses, throttleTimeMs, Collections.emptyList())); + this(toData(responses, throttleTimeMs, List.of())); } /** @@ -170,11 +169,11 @@ public PartitionResponse(Errors error) { } public PartitionResponse(Errors error, String errorMessage) { - this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, INVALID_OFFSET, Collections.emptyList(), errorMessage); + this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, INVALID_OFFSET, List.of(), errorMessage); } public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset) { - this(error, baseOffset, logAppendTime, logStartOffset, Collections.emptyList(), null); + this(error, baseOffset, logAppendTime, logStartOffset, List.of(), null); } public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List recordErrors) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterResponse.java index 271cfde8cffa9..852f7bcc33d7e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterResponse.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; -import java.util.Collections; import java.util.Map; public class RemoveRaftVoterResponse extends AbstractResponse { @@ -51,9 +50,9 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) { @Override public Map errorCounts() { if (data.errorCode() != Errors.NONE.code()) { - return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); + return Map.of(Errors.forCode(data.errorCode()), 1); } else { - return Collections.emptyMap(); + return Map.of(); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java index 64251a0c5fda7..fe725fa9dac97 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.record.internal.Records; import java.util.ArrayList; -import java.util.Collections; import java.util.EnumMap; import java.util.Iterator; import java.util.LinkedHashMap; @@ -139,7 +138,7 @@ public static int sizeOf(short version, Iterator> partIterator) { // Since the throttleTimeMs and metadata field sizes are constant and fixed, we can // use arbitrary values here without affecting the result. - ShareFetchResponseData data = toMessage(Errors.NONE, 0, partIterator, Collections.emptyList(), 0); + ShareFetchResponseData data = toMessage(Errors.NONE, 0, partIterator, List.of(), 0); ObjectSerializationCache cache = new ObjectSerializationCache(); return 4 + data.size(cache, version); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java index 5d8426e2650ff..6df71cbb5c154 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.Readable; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -56,7 +55,7 @@ public ShareGroupHeartbeatResponseData data() { @Override public Map errorCounts() { - return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); + return Map.of(Errors.forCode(data.errorCode()), 1); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java index 87c10a98d3712..68f2a460c4f1b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java @@ -59,7 +59,7 @@ public StreamsGroupHeartbeatResponseData data() { @Override public Map errorCounts() { - return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); + return Map.of(Errors.forCode(data.errorCode()), 1); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java index 35b4cce20953e..8a4f7bd7c2797 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.protocol.Readable; import java.util.Collection; -import java.util.Collections; +import java.util.Set; import java.util.stream.Collectors; public class UpdateFeaturesRequest extends AbstractRequest { @@ -105,7 +105,7 @@ public Collection featureUpdates() { public UpdateFeaturesResponse getErrorResponse(int throttleTimeMs, Throwable e) { return UpdateFeaturesResponse.createWithErrors( ApiError.fromThrowable(e), - Collections.emptySet(), + Set.of(), throttleTimeMs ); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterResponse.java index f52157234fa44..8d5420c9eb00e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterResponse.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; -import java.util.Collections; import java.util.Map; public class UpdateRaftVoterResponse extends AbstractResponse { @@ -51,9 +50,9 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) { @Override public Map errorCounts() { if (data.errorCode() != Errors.NONE.code()) { - return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); + return Map.of(Errors.forCode(data.errorCode()), 1); } else { - return Collections.emptyMap(); + return Map.of(); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java index 732175583671c..2d10a4489db40 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java @@ -22,8 +22,8 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; +import java.util.List; -import java.util.Collections; public class VoteRequest extends AbstractRequest { @@ -77,10 +77,10 @@ public static VoteRequestData singletonRequest(TopicPartition topicPartition, boolean preVote) { return new VoteRequestData() .setClusterId(clusterId) - .setTopics(Collections.singletonList( + .setTopics(List.of( new VoteRequestData.TopicData() .setTopicName(topicPartition.topic()) - .setPartitions(Collections.singletonList( + .setPartitions(List.of( new VoteRequestData.PartitionData() .setPartitionIndex(topicPartition.partition()) .setReplicaEpoch(replicaEpoch) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 3d32e98f8b49b..2081d5bd5f4af 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -766,7 +766,7 @@ private AlterShareGroupOffsetsResponseData buildErrorResponse(AlterShareGroupOff Map partitionErrors = Optional.ofNullable(topicPartitionErrorsMap) .map(map -> map.get(topic.topicId())) - .orElse(Collections.emptyMap()); + .orElse(Map.of()); PartitionErrorData error = partitionErrors.get(partition.partitionIndex()); if (error == null) { partitionData = partition.duplicate(); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 09714ab28b85a..837caddcb0362 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2166,7 +2166,7 @@ private CoordinatorResult stream response.setEndpointInformationEpoch(group.endpointInformationEpoch()); } - Map internalTopicsToBeCreated = Collections.emptyMap(); + Map internalTopicsToBeCreated = Map.of(); if (updatedConfiguredTopology.topicConfigurationException().isPresent()) { TopicConfigurationException exception = updatedConfiguredTopology.topicConfigurationException().get(); internalTopicsToBeCreated = updatedConfiguredTopology.internalTopicsToBeCreated(); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java index b1a5b026654d7..a74871871a6ef 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -1275,11 +1275,11 @@ private CommitPartitionValidator createAssignmentEpochValidator( // Search for the partition in assigned tasks, then in tasks pending revocation Integer assignmentEpoch = assignedTasks.activeTasksWithEpochs() - .getOrDefault(subtopologyId, Collections.emptyMap()) + .getOrDefault(subtopologyId, Map.of()) .get(partitionId); if (assignmentEpoch == null) { assignmentEpoch = tasksPendingRevocation.activeTasksWithEpochs() - .getOrDefault(subtopologyId, Collections.emptyMap()) + .getOrDefault(subtopologyId, Map.of()) .get(partitionId); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java index 887472132a6c1..a43431ce47ef5 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java @@ -287,7 +287,7 @@ public static Builder withDefaults(String memberId) { .setClientId("") .setClientHost("") .setProcessId("") - .setClientTags(Collections.emptyMap()) + .setClientTags(Map.of()) .setState(MemberState.STABLE) .setMemberEpoch(0) .setPreviousMemberEpoch(0) From 7469934983de228a7023b0e00a7b444632de0fe4 Mon Sep 17 00:00:00 2001 From: see-quick Date: Fri, 27 Mar 2026 10:32:21 +0100 Subject: [PATCH 2/2] spotless Signed-off-by: see-quick --- .../apache/kafka/common/requests/BeginQuorumEpochRequest.java | 1 + .../org/apache/kafka/common/requests/FindCoordinatorRequest.java | 1 + .../java/org/apache/kafka/common/requests/JoinGroupRequest.java | 1 + .../main/java/org/apache/kafka/common/requests/VoteRequest.java | 1 + 4 files changed, 4 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochRequest.java index dffa3049dd88e..8e7b816a0659b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochRequest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; + import java.util.List; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java index d63efe082e17e..cda50c1732fed 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; + import java.util.List; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index 0c8b827875d08..30649b27303b6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; + import java.util.List; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java index 2d10a4489db40..91b1cafebc7df 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; + import java.util.List;