diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DynamicGroupConfigIntegrationTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DynamicGroupConfigIntegrationTest.java new file mode 100644 index 0000000000000..e10edbd37a949 --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DynamicGroupConfigIntegrationTest.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.coordinator.group.GroupConfig; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; + +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class DynamicGroupConfigIntegrationTest { + + @ClusterTest(types = {Type.KRAFT}) + public void testDescribeGroupConfigSynonymsWithBrokerSynonym(ClusterInstance cluster) throws Exception { + try (Admin admin = cluster.admin()) { + String group = "synonym-test-group"; + ConfigResource groupResource = new ConfigResource(ConfigResource.Type.GROUP, group); + ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "0"); + ConfigResource brokerDefaultResource = new ConfigResource(ConfigResource.Type.BROKER, ""); + + // Verify default config only. + // Expected synonym chain: DEFAULT_CONFIG + assertGroupConfig( + admin, + groupResource, + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + String.valueOf(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT), + ConfigEntry.ConfigSource.DEFAULT_CONFIG, + List.of( + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG)) + ); + + // Set per-broker dynamic config. + // Expected synonym chain: DYNAMIC_BROKER_CONFIG -> DEFAULT_CONFIG + admin.incrementalAlterConfigs(Map.of(brokerResource, List.of( + new AlterConfigOp(new ConfigEntry( + GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "1500"), + AlterConfigOp.OpType.SET) + ))).all().get(); + cluster.ensureConsistentMetadata(); + + assertGroupConfig( + admin, + groupResource, + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + "1500", + ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG, + List.of( + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG)) + ); + + // Set dynamic default broker config; per-broker config still takes precedence. + // Expected synonym chain: DYNAMIC_BROKER_CONFIG -> DYNAMIC_DEFAULT_BROKER_CONFIG -> DEFAULT_CONFIG + admin.incrementalAlterConfigs(Map.of(brokerDefaultResource, List.of( + new AlterConfigOp(new ConfigEntry( + GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "2000"), + AlterConfigOp.OpType.SET) + ))).all().get(); + cluster.ensureConsistentMetadata(); + + assertGroupConfig( + admin, + groupResource, + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + "1500", + ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG, + List.of( + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG)) + ); + + // Set group override; it takes precedence over all broker configs. + // Expected synonym chain: DYNAMIC_GROUP_CONFIG -> DYNAMIC_BROKER_CONFIG + // -> DYNAMIC_DEFAULT_BROKER_CONFIG -> DEFAULT_CONFIG + admin.incrementalAlterConfigs(Map.of(groupResource, List.of( + new AlterConfigOp(new ConfigEntry( + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "3000"), + AlterConfigOp.OpType.SET) + ))).all().get(); + cluster.ensureConsistentMetadata(); + + assertGroupConfig( + admin, + groupResource, + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + "3000", + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG, + List.of( + Map.entry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG)) + ); + } + } + + @ClusterTest(types = {Type.KRAFT}) + public void testDescribeGroupConfigSynonymsWithoutBrokerSynonym(ClusterInstance cluster) throws Exception { + try (Admin admin = cluster.admin()) { + String group = "synonym-no-broker-test-group"; + ConfigResource groupResource = new ConfigResource(ConfigResource.Type.GROUP, group); + + // Verify default config for a config with no broker synonym. + // Expected synonym chain: DEFAULT_CONFIG + assertGroupConfig( + admin, + groupResource, + GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, + GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT, + ConfigEntry.ConfigSource.DEFAULT_CONFIG, + List.of( + Map.entry(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG)) + ); + + // Set group override; synonyms use group config name since there is no broker synonym. + // Expected synonym chain: DYNAMIC_GROUP_CONFIG -> DEFAULT_CONFIG + admin.incrementalAlterConfigs(Map.of(groupResource, List.of( + new AlterConfigOp(new ConfigEntry( + GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "earliest"), + AlterConfigOp.OpType.SET) + ))).all().get(); + cluster.ensureConsistentMetadata(); + + assertGroupConfig( + admin, + groupResource, + GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, + "earliest", + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG, + List.of( + Map.entry(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG), + Map.entry(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG)) + ); + } + } + + @ClusterTest(types = {Type.KRAFT}, + serverProperties = { + @ClusterConfigProperty( + key = GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "2000") + }) + public void testDescribeGroupConfigSynonymsWithStaticBrokerConfig(ClusterInstance cluster) throws Exception { + try (Admin admin = cluster.admin()) { + String group = "synonym-static-test-group"; + ConfigResource groupResource = new ConfigResource(ConfigResource.Type.GROUP, group); + ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "0"); + ConfigResource brokerDefaultResource = new ConfigResource(ConfigResource.Type.BROKER, ""); + + // Verify static broker config is reflected in synonyms. + // Expected synonym chain: STATIC_BROKER_CONFIG -> DEFAULT_CONFIG + assertGroupConfig( + admin, + groupResource, + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + "2000", + ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG, + List.of( + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG)) + ); + + // Set group override; it takes precedence over static broker config. + // Expected synonym chain: DYNAMIC_GROUP_CONFIG -> STATIC_BROKER_CONFIG -> DEFAULT_CONFIG + admin.incrementalAlterConfigs(Map.of(groupResource, List.of( + new AlterConfigOp(new ConfigEntry( + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "3000"), + AlterConfigOp.OpType.SET) + ))).all().get(); + cluster.ensureConsistentMetadata(); + + assertGroupConfig( + admin, + groupResource, + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + "3000", + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG, + List.of( + Map.entry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG)) + ); + + // Add dynamic default broker config. + // Expected synonym chain: DYNAMIC_GROUP_CONFIG -> DYNAMIC_DEFAULT_BROKER_CONFIG + // -> STATIC_BROKER_CONFIG -> DEFAULT_CONFIG + admin.incrementalAlterConfigs(Map.of(brokerDefaultResource, List.of( + new AlterConfigOp(new ConfigEntry( + GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "4000"), + AlterConfigOp.OpType.SET) + ))).all().get(); + cluster.ensureConsistentMetadata(); + + assertGroupConfig( + admin, + groupResource, + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + "3000", + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG, + List.of( + Map.entry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG))); + + // Add per-broker dynamic config to complete the full 5-layer synonym chain. + // Expected synonym chain: DYNAMIC_GROUP_CONFIG -> DYNAMIC_BROKER_CONFIG -> DYNAMIC_DEFAULT_BROKER_CONFIG + // -> STATIC_BROKER_CONFIG -> DEFAULT_CONFIG + admin.incrementalAlterConfigs(Map.of(brokerResource, List.of( + new AlterConfigOp(new ConfigEntry( + GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "5000"), + AlterConfigOp.OpType.SET) + ))).all().get(); + cluster.ensureConsistentMetadata(); + + assertGroupConfig( + admin, + groupResource, + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + "3000", + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG, + List.of( + Map.entry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG)) + ); + + // Delete group override; value falls back to per-broker dynamic config. + // Expected synonym chain: DYNAMIC_BROKER_CONFIG -> DYNAMIC_DEFAULT_BROKER_CONFIG + // -> STATIC_BROKER_CONFIG -> DEFAULT_CONFIG + admin.incrementalAlterConfigs(Map.of(groupResource, List.of( + new AlterConfigOp(new ConfigEntry( + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, ""), + AlterConfigOp.OpType.DELETE) + ))).all().get(); + cluster.ensureConsistentMetadata(); + + assertGroupConfig( + admin, + groupResource, + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + "5000", + ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG, + List.of( + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG))); + } + } + + private void assertGroupConfig( + Admin admin, + ConfigResource groupResource, + String configKey, + String expectedValue, + ConfigEntry.ConfigSource expectedSource, + List> expectedSynonyms + ) throws Exception { + DescribeConfigsOptions options = new DescribeConfigsOptions().includeSynonyms(true); + ConfigEntry entry = admin.describeConfigs(List.of(groupResource), options) + .all().get().get(groupResource).get(configKey); + assertEquals(expectedValue, entry.value()); + assertEquals(expectedSource, entry.source()); + assertEquals(expectedSynonyms, entry.synonyms().stream() + .map(s -> Map.entry(s.name(), s.source())) + .toList()); + } +} diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java index d880449d4eec0..6f35f7ec30757 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java @@ -155,7 +155,7 @@ public void testInternalConfigsDoNotReturnForDescribeConfigs(ClusterInstance clu // test for case ConfigResource.Type == GROUP Config groupConfig = configResourceMap.get(groupResource); - assertNotContainsAnyInternalConfig(groupConfig, GroupConfig.configDef().configKeys()); + assertNotContainsAnyInternalConfig(groupConfig, GroupConfig.CONFIG_DEF.configKeys()); // test for case ConfigResource.Type == CLIENT_METRICS Config clientMetricsConfig = configResourceMap.get(clientMetricsResource); diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 8cc46dadb0646..0166a64e67f61 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -580,7 +580,7 @@ object ConfigCommand extends Logging { "For entity-type '" + ClientType + "': " + QuotaConfig.userAndClientQuotaConfigs().names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) + "For entity-type '" + IpType + "': " + QuotaConfig.ipConfigs.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) + "For entity-type '" + ClientMetricsType + "': " + ClientMetricsConfigs.configDef().names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) + - "For entity-type '" + GroupType + "': " + GroupConfig.configDef().names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) + + "For entity-type '" + GroupType + "': " + GroupConfig.CONFIG_DEF.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) + s"Entity types '$UserType' and '$ClientType' may be specified together to update config for clients of a specific user.") .withRequiredArg .ofType(classOf[String]) diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala b/core/src/main/scala/kafka/server/ConfigHelper.scala index 5c32c14eb2df9..8a28be7a2e7fa 100644 --- a/core/src/main/scala/kafka/server/ConfigHelper.scala +++ b/core/src/main/scala/kafka/server/ConfigHelper.scala @@ -133,7 +133,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo throw new InvalidRequestException("Group name must not be empty") } else { val groupProps = configRepository.groupConfig(group) - val groupConfig = GroupConfig.fromProps(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig), groupProps) + val groupConfig = GroupConfig.fromProps(config.extractGroupConfigMap, groupProps) createResponseConfig(resource, groupConfig, createGroupConfigEntry(groupConfig, groupProps, includeSynonyms, includeDocumentation)(_, _)) } @@ -161,15 +161,23 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo private def createGroupConfigEntry(groupConfig: GroupConfig, groupProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean) (name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = { - val allNames = brokerSynonyms(name) val configEntryType = GroupConfig.configType(name).toScala val isSensitive = KafkaConfig.maybeSensitive(configEntryType) val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType.orNull) val allSynonyms = { - val list = configSynonyms(name, allNames, isSensitive) + val brokerConfigName = GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.get(name) + val list = if (brokerConfigName != null && brokerConfigName.isPresent) + configSynonyms(brokerConfigName.get, brokerSynonyms(brokerConfigName.get), isSensitive) + else + // No broker synonym, fall back to GroupConfig defaults + Option(GroupConfig.CONFIG_DEF.defaultValues().get(name)) + .map(v => List(new DescribeConfigsResponseData.DescribeConfigsSynonym() + .setName(name) + .setValue(if (isSensitive) null else ConfigDef.convertToString(v, configEntryType.orNull)) + .setSource(ConfigSource.DEFAULT_CONFIG.id))) + .getOrElse(List.empty) if (!groupProps.containsKey(name)) - new DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(valueAsString) - .setSource(ConfigSource.DEFAULT_CONFIG.id) +: list + list else new DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(valueAsString) .setSource(ConfigSource.GROUP_CONFIG.id) +: list diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java index 8b1ce751ef068..354f3e8cba4d4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java @@ -157,7 +157,7 @@ public final class GroupConfig extends AbstractConfig { public final boolean shareRenewAcknowledgeEnable; - private static final ConfigDef CONFIG = new ConfigDef() + public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, INT, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, @@ -281,8 +281,41 @@ public final class GroupConfig extends AbstractConfig { MEDIUM, GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC); + /** + * Mapping from GroupConfig name to its broker-level synonym config name. + * {@code Optional.empty()} indicates that the config has no broker-level synonym. + */ + public static final Map> ALL_GROUP_CONFIG_SYNONYMS = Map.ofEntries( + // Consumer group configs + Map.entry(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)), + Map.entry(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)), + Map.entry(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)), + Map.entry(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)), + + // Share group configs + Map.entry(SHARE_SESSION_TIMEOUT_MS_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)), + Map.entry(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)), + Map.entry(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, Optional.of(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG)), + Map.entry(SHARE_DELIVERY_COUNT_LIMIT_CONFIG, Optional.of(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG)), + Map.entry(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, Optional.of(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG)), + Map.entry(SHARE_AUTO_OFFSET_RESET_CONFIG, Optional.empty()), + Map.entry(SHARE_ISOLATION_LEVEL_CONFIG, Optional.empty()), + Map.entry(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, Optional.empty()), + Map.entry(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)), + Map.entry(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)), + + // Streams group configs + Map.entry(STREAMS_SESSION_TIMEOUT_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG)), + Map.entry(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)), + Map.entry(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG)), + Map.entry(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)), + Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)), + Map.entry(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)), + Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG)) + ); + public GroupConfig(Map props) { - super(CONFIG, props, false); + super(CONFIG_DEF, props, false); this.consumerSessionTimeoutMs = getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG); this.consumerHeartbeatIntervalMs = getInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG); // These have to be optionals because their default group coordinator configs are dynamic, @@ -327,16 +360,12 @@ public GroupConfig(Map props) { this.shareRenewAcknowledgeEnable = getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG); } - public static ConfigDef configDef() { - return CONFIG; - } - public static Optional configType(String configName) { - return Optional.ofNullable(CONFIG.configKeys().get(configName)).map(c -> c.type); + return Optional.ofNullable(CONFIG_DEF.configKeys().get(configName)).map(c -> c.type); } public static Set configNames() { - return CONFIG.names(); + return CONFIG_DEF.names(); } /** @@ -356,7 +385,7 @@ public static void validateNames(Map props) { */ @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"}) private static void validateValues(Map unparsedMap, GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) { - Map valueMaps = CONFIG.parse(unparsedMap); + Map valueMaps = CONFIG_DEF.parse(unparsedMap); int consumerHeartbeatInterval = (Integer) valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG); int consumerSessionTimeout = (Integer) valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG); int consumerAssignmentIntervalMs = (Integer) valueMaps.get(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG); @@ -899,6 +928,6 @@ public boolean shareRenewAcknowledgeEnable() { } public static void main(String[] args) { - System.out.println(CONFIG.toHtml(4, config -> "groupconfigs_" + config)); + System.out.println(CONFIG_DEF.toHtml(4, config -> "groupconfigs_" + config)); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java index 139aa4ab3ced5..d1468b1eba1dc 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java @@ -28,9 +28,11 @@ import org.junit.jupiter.params.provider.MethodSource; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.stream.Stream; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT; @@ -637,6 +639,34 @@ public void testEvaluateMinBoundedValueBelowMinIsCapped( assertEquals(expectedMin, result.get(key)); } + @Test + public void testAllGroupConfigSynonyms() { + // Every GroupConfig entry should have an entry in ALL_GROUP_CONFIG_SYNONYMS. + for (String groupConfigName : GroupConfig.CONFIG_DEF.names()) { + assertTrue(GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.containsKey(groupConfigName), + "GroupConfig entry '" + groupConfigName + "' is not in ALL_GROUP_CONFIG_SYNONYMS. " + + "Add it with Optional.of(brokerConfigName) or Optional.empty() if it has no broker synonym."); + } + + // Every key in ALL_GROUP_CONFIG_SYNONYMS should be a valid GroupConfig entry. + for (String key : GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.keySet()) { + assertTrue(GroupConfig.CONFIG_DEF.names().contains(key), + "ALL_GROUP_CONFIG_SYNONYMS contains '" + key + "' which is not a valid GroupConfig entry."); + } + + // Every present synonym mapping should point to a valid broker config. + Set brokerConfigNames = new HashSet<>(); + brokerConfigNames.addAll(GroupCoordinatorConfig.CONFIG_DEF.names()); + brokerConfigNames.addAll(ShareGroupConfig.CONFIG_DEF.names()); + + for (Map.Entry> entry : GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.entrySet()) { + entry.getValue().ifPresent(brokerConfigName -> + assertTrue(brokerConfigNames.contains(brokerConfigName), + "ALL_GROUP_CONFIG_SYNONYMS maps '" + entry.getKey() + "' to '" + + brokerConfigName + "' but this broker config does not exist.")); + } + } + private Map createValidGroupConfig() { Map props = new HashMap<>(); props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "45000"); diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java index 903e24a78eba3..d129fdcd65b08 100644 --- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java @@ -627,4 +627,12 @@ public Long logRetentionTimeMillis() { return millis < 0 ? Long.valueOf(-1) : millis; } + + public Map extractGroupConfigMap() { + Map defaults = new HashMap<>(); + GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.forEach((groupConfigName, brokerConfigName) -> + brokerConfigName.ifPresent(name -> defaults.put(groupConfigName, get(name))) + ); + return defaults; + } } diff --git a/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java b/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java index 301d9e1d6bfec..1c3976750ddeb 100644 --- a/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java +++ b/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java @@ -65,7 +65,7 @@ public DefaultSupportedConfigChecker() { ConfigResource.Type.TOPIC, new SetContainsPredicate(new HashSet<>(LogConfig.configNames())), ConfigResource.Type.BROKER, ignore -> true, ConfigResource.Type.CLIENT_METRICS, new SetContainsPredicate(ClientMetricsConfigs.configDef().names()), - ConfigResource.Type.GROUP, new SetContainsPredicate(GroupConfig.configDef().names()) + ConfigResource.Type.GROUP, new SetContainsPredicate(GroupConfig.CONFIG_DEF.names()) ); } diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java index ccc356820d1ae..9cb8ef7f3b12d 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java @@ -137,7 +137,7 @@ public void testNullStatusOnKraftCommandAlterGroup() { "--describe")); message = captureStandardOut(run(command)); assertTrue(message.contains("Dynamic configs for group group are:")); - assertTrue(message.contains("consumer.session.timeout.ms=50000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000}")); + assertTrue(message.contains("consumer.session.timeout.ms=50000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000, DEFAULT_CONFIG:group.consumer.session.timeout.ms=45000}")); command = Stream.concat(quorumArgs(), Stream.of( "--entity-type", "groups", @@ -145,7 +145,7 @@ public void testNullStatusOnKraftCommandAlterGroup() { "--describe")); message = captureStandardOut(run(command)); assertTrue(message.contains("Dynamic configs for group group are:")); - assertTrue(message.contains("consumer.session.timeout.ms=50000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000}")); + assertTrue(message.contains("consumer.session.timeout.ms=50000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000, DEFAULT_CONFIG:group.consumer.session.timeout.ms=45000}")); } @ClusterTest(serverProperties = { @@ -293,10 +293,10 @@ public void testDescribeStreamsGroupConfigs() { "--describe", "--all")); String message = captureStandardOut(run(command)); - assertTrue(message.contains("streams.heartbeat.interval.ms=5000 sensitive=false synonyms={DEFAULT_CONFIG:streams.heartbeat.interval.ms=5000}")); - assertTrue(message.contains("streams.num.standby.replicas=0 sensitive=false synonyms={DEFAULT_CONFIG:streams.num.standby.replicas=0}")); - assertTrue(message.contains("streams.session.timeout.ms=45000 sensitive=false synonyms={DEFAULT_CONFIG:streams.session.timeout.ms=45000}")); - assertTrue(message.contains("streams.task.offset.interval.ms=60000 sensitive=false synonyms={DEFAULT_CONFIG:streams.task.offset.interval.ms=60000}")); + assertTrue(message.contains("streams.heartbeat.interval.ms=5000 sensitive=false synonyms={DEFAULT_CONFIG:group.streams.heartbeat.interval.ms=5000}")); + assertTrue(message.contains("streams.num.standby.replicas=0 sensitive=false synonyms={DEFAULT_CONFIG:group.streams.num.standby.replicas=0}")); + assertTrue(message.contains("streams.session.timeout.ms=45000 sensitive=false synonyms={DEFAULT_CONFIG:group.streams.session.timeout.ms=45000}")); + assertTrue(message.contains("streams.task.offset.interval.ms=60000 sensitive=false synonyms={DEFAULT_CONFIG:group.streams.task.offset.interval.ms=60000}")); } @ClusterTest