From 001916dbc49cab6f17ffa11b8c5c63e1e0917114 Mon Sep 17 00:00:00 2001 From: majialong Date: Mon, 23 Mar 2026 00:18:49 +0800 Subject: [PATCH 1/9] KAFKA-20289: Fix DescribeConfigs to correctly resolve broker synonyms for group configs --- .../scala/kafka/server/ConfigHelper.scala | 32 +++++-- .../unit/kafka/server/KafkaApisTest.scala | 96 +++++++++++++++++++ .../group/GroupCoordinatorConfig.java | 2 + 3 files changed, 124 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala b/core/src/main/scala/kafka/server/ConfigHelper.scala index 5c32c14eb2df9..33fc184c4bebd 100644 --- a/core/src/main/scala/kafka/server/ConfigHelper.scala +++ b/core/src/main/scala/kafka/server/ConfigHelper.scala @@ -32,7 +32,7 @@ import org.apache.kafka.common.requests.{ApiError, DescribeConfigsRequest, Descr import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource import org.apache.kafka.common.resource.Resource.CLUSTER_NAME import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC} -import org.apache.kafka.coordinator.group.GroupConfig +import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} import org.apache.kafka.metadata.{ConfigRepository, MetadataCache} import org.apache.kafka.server.ConfigHelperUtils.createResponseConfig import org.apache.kafka.server.config.{DynamicBrokerConfig, ServerTopicConfigSynonyms} @@ -133,7 +133,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo throw new InvalidRequestException("Group name must not be empty") } else { val groupProps = configRepository.groupConfig(group) - val groupConfig = GroupConfig.fromProps(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig), groupProps) + val groupConfig = GroupConfig.fromProps(currentGroupConfigDefaults, groupProps) createResponseConfig(resource, groupConfig, createGroupConfigEntry(groupConfig, groupProps, includeSynonyms, includeDocumentation)(_, _)) } @@ -161,15 +161,22 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo private def createGroupConfigEntry(groupConfig: GroupConfig, groupProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean) (name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = { - val allNames = brokerSynonyms(name) val configEntryType = GroupConfig.configType(name).toScala val isSensitive = KafkaConfig.maybeSensitive(configEntryType) val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType.orNull) val allSynonyms = { - val list = configSynonyms(name, allNames, isSensitive) + val brokerConfigName = GroupCoordinatorConfig.GROUP_PREFIX + name + val list = if (KafkaConfig.configDef.configKeys().containsKey(brokerConfigName)) + configSynonyms(brokerConfigName, brokerSynonyms(brokerConfigName), isSensitive) + else + // No broker synonym, fall back to GroupConfig defaults + Option(GroupConfig.configDef().defaultValues().get(name)) + .map(v => List(new DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name) + .setValue(if (isSensitive) null else ConfigDef.convertToString(v, configEntryType.orNull)) + .setSource(ConfigSource.DEFAULT_CONFIG.id))) + .getOrElse(List.empty) if (!groupProps.containsKey(name)) - new DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(valueAsString) - .setSource(ConfigSource.DEFAULT_CONFIG.id) +: list + list else new DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(valueAsString) .setSource(ConfigSource.GROUP_CONFIG.id) +: list @@ -304,4 +311,17 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo throw new InvalidRequestException(s"Broker id must be an integer, but it is: $resourceName") } } + + /** + * Build the current effective group config defaults by reading from KafkaConfig. + */ + private def currentGroupConfigDefaults: java.util.Map[String, Object] = { + val defaults = new java.util.HashMap[String, Object]() + GroupConfig.configDef().configKeys().forEach { (name, _) => + val brokerConfigName = GroupCoordinatorConfig.GROUP_PREFIX + name + if (KafkaConfig.configDef.configKeys().containsKey(brokerConfigName)) + defaults.put(name, config.get(brokerConfigName)) + } + defaults + } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 663aaefe8392f..f5bb867df2caa 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -399,6 +399,102 @@ class KafkaApisTest extends Logging { assertEquals(cgConfigs.size, configs.size) } + @Test + def testDescribeConfigsGroupSynonyms(): Unit = { + val authorizer: Authorizer = mock(classOf[Authorizer]) + val operation = AclOperation.DESCRIBE_CONFIGS + val resourceType = ResourceType.GROUP + val consumerGroupId = "consumer_group_1" + val requestHeader = + new RequestHeader(ApiKeys.DESCRIBE_CONFIGS, ApiKeys.DESCRIBE_CONFIGS.latestVersion, clientId, 0) + val expectedActions = util.List.of( + new Action(operation, new ResourcePattern(resourceType, consumerGroupId, PatternType.LITERAL), + 1, true, true) + ) + + when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(expectedActions))) + .thenReturn(util.List.of(AuthorizationResult.ALLOWED)) + + val configRepository: ConfigRepository = mock(classOf[ConfigRepository]) + + val cgConfigs = new Properties() + cgConfigs.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString) + cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT) + + when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs) + + // Override broker config for consumer session timeout. + val customSessionTimeoutMs = "55000" + val overrideProperties = Map( + GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG -> customSessionTimeoutMs + ) + + val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData() + .setIncludeSynonyms(true) + .setResources(util.List.of(new DescribeConfigsRequestData.DescribeConfigsResource() + .setResourceName(consumerGroupId) + .setResourceType(ConfigResource.Type.GROUP.id)))) + .build(requestHeader.apiVersion) + val request = buildRequest(describeConfigsRequest, + requestHeader = Option(requestHeader)) + when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), + any[Long])).thenReturn(0) + + createKafkaApis(authorizer = Some(authorizer), configRepository = configRepository, + overrideProperties = overrideProperties).handleDescribeConfigsRequest(request) + + val response = verifyNoThrottling[DescribeConfigsResponse](request) + // Verify that authorize is only called once + verify(authorizer, times(1)).authorize(any(), any()) + val results = response.data.results + assertEquals(1, results.size) + val describeConfigsResult = results.get(0) + + assertEquals(ConfigResource.Type.GROUP.id, describeConfigsResult.resourceType) + assertEquals(consumerGroupId, describeConfigsResult.resourceName) + val configs = describeConfigsResult.configs + assertEquals(GroupConfig.configDef().names().size, configs.size) + + val configMap = configs.asScala.map(c => c.name -> c).toMap + + // Not in cgConfigs, has broker override. + val sessionTimeoutConfig = configMap(CONSUMER_SESSION_TIMEOUT_MS_CONFIG) + assertEquals(customSessionTimeoutMs, sessionTimeoutConfig.value) + assertEquals(DescribeConfigsResponse.ConfigSource.STATIC_BROKER_CONFIG.id, sessionTimeoutConfig.configSource) + val sessionTimeoutSynonyms = sessionTimeoutConfig.synonyms.asScala + // Synonyms: STATIC_BROKER_CONFIG -> DEFAULT_CONFIG. + assertEquals(2, sessionTimeoutSynonyms.size) + assertEquals(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutSynonyms.head.name) + assertEquals(customSessionTimeoutMs, sessionTimeoutSynonyms.head.value) + assertEquals(DescribeConfigsResponse.ConfigSource.STATIC_BROKER_CONFIG.id, sessionTimeoutSynonyms.head.source) + assertEquals(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutSynonyms(1).name) + assertEquals(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString, sessionTimeoutSynonyms(1).value) + assertEquals(DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG.id, sessionTimeoutSynonyms(1).source) + + // No broker synonym, set in cgConfigs. + val shareAutoOffsetResetConfig = configMap(SHARE_AUTO_OFFSET_RESET_CONFIG) + assertEquals(GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT, shareAutoOffsetResetConfig.value) + val shareAutoOffsetResetSynonyms = shareAutoOffsetResetConfig.synonyms.asScala + // Synonyms: GROUP_CONFIG -> DEFAULT_CONFIG. + assertEquals(2, shareAutoOffsetResetSynonyms.size) + assertEquals(SHARE_AUTO_OFFSET_RESET_CONFIG, shareAutoOffsetResetSynonyms.head.name) + assertEquals(DescribeConfigsResponse.ConfigSource.GROUP_CONFIG.id, shareAutoOffsetResetSynonyms.head.source) + assertEquals(SHARE_AUTO_OFFSET_RESET_CONFIG, shareAutoOffsetResetSynonyms(1).name) + assertEquals(GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT, shareAutoOffsetResetSynonyms(1).value) + assertEquals(DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG.id, shareAutoOffsetResetSynonyms(1).source) + + // Has broker synonym, set in cgConfigs, no broker override. + val heartbeatConfig = configMap(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG) + val heartbeatSynonyms = heartbeatConfig.synonyms.asScala + // Synonyms: GROUP_CONFIG -> DEFAULT_CONFIG. + assertEquals(2, heartbeatSynonyms.size) + assertEquals(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatSynonyms.head.name) + assertEquals(DescribeConfigsResponse.ConfigSource.GROUP_CONFIG.id, heartbeatSynonyms.head.source) + assertEquals(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatSynonyms(1).name) + assertEquals(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString, heartbeatSynonyms(1).value) + assertEquals(DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG.id, heartbeatSynonyms(1).source) + } + @Test def testAlterConfigsClientMetrics(): Unit = { val subscriptionName = "client_metric_subscription_1" diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 1da3ca214125c..2037862f9b3ec 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -392,6 +392,8 @@ public class GroupCoordinatorConfig { STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG ); + + public static final String GROUP_PREFIX = "group."; public static final ConfigDef CONFIG_DEF = new ConfigDef() // Group coordinator configs From b39fe33501d040ca7dc4d54f90562f34cd1e36b8 Mon Sep 17 00:00:00 2001 From: majialong Date: Tue, 24 Mar 2026 22:29:11 +0800 Subject: [PATCH 2/9] Fix test --- .../kafka/tools/ConfigCommandIntegrationTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java index ccc356820d1ae..9cb8ef7f3b12d 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java @@ -137,7 +137,7 @@ public void testNullStatusOnKraftCommandAlterGroup() { "--describe")); message = captureStandardOut(run(command)); assertTrue(message.contains("Dynamic configs for group group are:")); - assertTrue(message.contains("consumer.session.timeout.ms=50000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000}")); + assertTrue(message.contains("consumer.session.timeout.ms=50000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000, DEFAULT_CONFIG:group.consumer.session.timeout.ms=45000}")); command = Stream.concat(quorumArgs(), Stream.of( "--entity-type", "groups", @@ -145,7 +145,7 @@ public void testNullStatusOnKraftCommandAlterGroup() { "--describe")); message = captureStandardOut(run(command)); assertTrue(message.contains("Dynamic configs for group group are:")); - assertTrue(message.contains("consumer.session.timeout.ms=50000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000}")); + assertTrue(message.contains("consumer.session.timeout.ms=50000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000, DEFAULT_CONFIG:group.consumer.session.timeout.ms=45000}")); } @ClusterTest(serverProperties = { @@ -293,10 +293,10 @@ public void testDescribeStreamsGroupConfigs() { "--describe", "--all")); String message = captureStandardOut(run(command)); - assertTrue(message.contains("streams.heartbeat.interval.ms=5000 sensitive=false synonyms={DEFAULT_CONFIG:streams.heartbeat.interval.ms=5000}")); - assertTrue(message.contains("streams.num.standby.replicas=0 sensitive=false synonyms={DEFAULT_CONFIG:streams.num.standby.replicas=0}")); - assertTrue(message.contains("streams.session.timeout.ms=45000 sensitive=false synonyms={DEFAULT_CONFIG:streams.session.timeout.ms=45000}")); - assertTrue(message.contains("streams.task.offset.interval.ms=60000 sensitive=false synonyms={DEFAULT_CONFIG:streams.task.offset.interval.ms=60000}")); + assertTrue(message.contains("streams.heartbeat.interval.ms=5000 sensitive=false synonyms={DEFAULT_CONFIG:group.streams.heartbeat.interval.ms=5000}")); + assertTrue(message.contains("streams.num.standby.replicas=0 sensitive=false synonyms={DEFAULT_CONFIG:group.streams.num.standby.replicas=0}")); + assertTrue(message.contains("streams.session.timeout.ms=45000 sensitive=false synonyms={DEFAULT_CONFIG:group.streams.session.timeout.ms=45000}")); + assertTrue(message.contains("streams.task.offset.interval.ms=60000 sensitive=false synonyms={DEFAULT_CONFIG:group.streams.task.offset.interval.ms=60000}")); } @ClusterTest From 03c9bf2e6d0bc8243407fedd99b9f8184989a971 Mon Sep 17 00:00:00 2001 From: majialong Date: Thu, 26 Mar 2026 00:48:09 +0800 Subject: [PATCH 3/9] Add naming convention test for group config broker synonyms --- .../group/GroupCoordinatorConfigTest.java | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 418b57706fc4f..c7de622dced38 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -30,19 +30,24 @@ import org.apache.kafka.coordinator.group.assignor.RangeAssignor; import org.apache.kafka.coordinator.group.assignor.SimpleAssignor; import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig; import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.OptionalInt; +import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class GroupCoordinatorConfigTest { @@ -708,6 +713,47 @@ public void testStreamsGroupMinTaskOffsetIntervalCustomValue() { assertEquals(20000, config.streamsGroupMinTaskOffsetIntervalMs()); } + @Test + public void testGroupConfigBrokerSynonymNamingConvention() { + // GroupConfig entries that have no broker-level synonym. + Set groupConfigsWithoutBrokerSynonym = Set.of( + GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, + GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, + GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG + ); + + // Combine broker CONFIG_DEFs since broker configs are spread across multiple classes. + // If a new config class is introduced, add its CONFIG_DEF here. + Set brokerConfigNames = new HashSet<>(); + brokerConfigNames.addAll(GroupCoordinatorConfig.CONFIG_DEF.names()); + brokerConfigNames.addAll(ShareGroupConfig.CONFIG_DEF.names()); + + // For each GroupConfig entry that has a broker synonym, the broker config name + // must be GROUP_PREFIX + groupConfigName. + for (String groupConfigName : GroupConfig.configDef().names()) { + if (groupConfigsWithoutBrokerSynonym.contains(groupConfigName)) { + continue; + } + String expectedBrokerName = GroupCoordinatorConfig.GROUP_PREFIX + groupConfigName; + assertTrue(brokerConfigNames.contains(expectedBrokerName), + "GroupConfig entry '" + groupConfigName + "' is not in the exclusion list, " + + "but expected broker config '" + expectedBrokerName + "' was not found. " + + "Either add the broker config following the naming convention, or add this " + + "entry to groupConfigsWithoutBrokerSynonym."); + } + + // Verify exclusion list entries truly have no broker-level synonym. + for (String excluded : groupConfigsWithoutBrokerSynonym) { + assertTrue(GroupConfig.configDef().names().contains(excluded), + "'" + excluded + "' is in the exclusion list but not in GroupConfig.configDef(). " + + "Remove it from groupConfigsWithoutBrokerSynonym."); + String brokerName = GroupCoordinatorConfig.GROUP_PREFIX + excluded; + assertFalse(brokerConfigNames.contains(brokerName), + "'" + excluded + "' is in the exclusion list but broker config '" + + brokerName + "' exists. Remove it from groupConfigsWithoutBrokerSynonym."); + } + } + public static GroupCoordinatorConfig createConfig(Map configs) { return new GroupCoordinatorConfig(new AbstractConfig( GroupCoordinatorConfig.CONFIG_DEF, From e0b1c99ef75eb6bbe34472e8d8f214f97ce0b935 Mon Sep 17 00:00:00 2001 From: majialong Date: Sat, 28 Mar 2026 02:04:29 +0800 Subject: [PATCH 4/9] Address review comments --- .../scala/kafka/server/ConfigHelper.scala | 24 +++------ .../main/scala/kafka/server/KafkaConfig.scala | 10 +++- .../kafka/coordinator/group/GroupConfig.java | 29 +++++++++++ .../group/GroupCoordinatorConfig.java | 2 - .../coordinator/group/GroupConfigTest.java | 52 +++++++++++++++++++ .../group/GroupCoordinatorConfigTest.java | 46 ---------------- 6 files changed, 96 insertions(+), 67 deletions(-) diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala b/core/src/main/scala/kafka/server/ConfigHelper.scala index 33fc184c4bebd..971569fb9ebc3 100644 --- a/core/src/main/scala/kafka/server/ConfigHelper.scala +++ b/core/src/main/scala/kafka/server/ConfigHelper.scala @@ -32,7 +32,7 @@ import org.apache.kafka.common.requests.{ApiError, DescribeConfigsRequest, Descr import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource import org.apache.kafka.common.resource.Resource.CLUSTER_NAME import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC} -import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} +import org.apache.kafka.coordinator.group.GroupConfig import org.apache.kafka.metadata.{ConfigRepository, MetadataCache} import org.apache.kafka.server.ConfigHelperUtils.createResponseConfig import org.apache.kafka.server.config.{DynamicBrokerConfig, ServerTopicConfigSynonyms} @@ -133,7 +133,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo throw new InvalidRequestException("Group name must not be empty") } else { val groupProps = configRepository.groupConfig(group) - val groupConfig = GroupConfig.fromProps(currentGroupConfigDefaults, groupProps) + val groupConfig = GroupConfig.fromProps(config.extractGroupConfigMap, groupProps) createResponseConfig(resource, groupConfig, createGroupConfigEntry(groupConfig, groupProps, includeSynonyms, includeDocumentation)(_, _)) } @@ -165,13 +165,14 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo val isSensitive = KafkaConfig.maybeSensitive(configEntryType) val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType.orNull) val allSynonyms = { - val brokerConfigName = GroupCoordinatorConfig.GROUP_PREFIX + name - val list = if (KafkaConfig.configDef.configKeys().containsKey(brokerConfigName)) + val brokerConfigName = GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.get(name) + val list = if (brokerConfigName != null) configSynonyms(brokerConfigName, brokerSynonyms(brokerConfigName), isSensitive) else // No broker synonym, fall back to GroupConfig defaults Option(GroupConfig.configDef().defaultValues().get(name)) - .map(v => List(new DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name) + .map(v => List(new DescribeConfigsResponseData.DescribeConfigsSynonym() + .setName(name) .setValue(if (isSensitive) null else ConfigDef.convertToString(v, configEntryType.orNull)) .setSource(ConfigSource.DEFAULT_CONFIG.id))) .getOrElse(List.empty) @@ -311,17 +312,4 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo throw new InvalidRequestException(s"Broker id must be an integer, but it is: $resourceName") } } - - /** - * Build the current effective group config defaults by reading from KafkaConfig. - */ - private def currentGroupConfigDefaults: java.util.Map[String, Object] = { - val defaults = new java.util.HashMap[String, Object]() - GroupConfig.configDef().configKeys().forEach { (name, _) => - val brokerConfigName = GroupCoordinatorConfig.GROUP_PREFIX + name - if (KafkaConfig.configDef.configKeys().containsKey(brokerConfigName)) - defaults.put(name, config.get(brokerConfigName)) - } - defaults - } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index c5075d5bd9949..55e8c4bdb5737 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.Utils import org.apache.kafka.coordinator.group.Group.GroupType import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig -import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} import org.apache.kafka.coordinator.share.ShareCoordinatorConfig import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig} @@ -607,4 +607,12 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, remoteLogManagerConfig.logLocalRetentionBytes: java.lang.Long) logProps } + + def extractGroupConfigMap: java.util.Map[String, Object] = { + val defaults = new java.util.HashMap[String, Object]() + GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.forEach { (groupConfigName, brokerConfigName) => + defaults.put(groupConfigName, get(brokerConfigName)) + } + defaults + } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java index 8b1ce751ef068..458e9d7df9762 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java @@ -281,6 +281,35 @@ public final class GroupConfig extends AbstractConfig { MEDIUM, GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC); + /** + * Mapping from GroupConfig name to its broker-level synonym config name. + */ + public static final Map ALL_GROUP_CONFIG_SYNONYMS = Map.ofEntries( + // Consumer group configs + Map.entry(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG), + Map.entry(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG), + Map.entry(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG), + Map.entry(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG), + + // Share group configs + Map.entry(SHARE_SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG), + Map.entry(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG), + Map.entry(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG), + Map.entry(SHARE_DELIVERY_COUNT_LIMIT_CONFIG, ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG), + Map.entry(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG), + Map.entry(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG), + Map.entry(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG), + + // Streams group configs + Map.entry(STREAMS_SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG), + Map.entry(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG), + Map.entry(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG), + Map.entry(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG), + Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG), + Map.entry(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG), + Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG) + ); + public GroupConfig(Map props) { super(CONFIG, props, false); this.consumerSessionTimeoutMs = getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 2037862f9b3ec..1da3ca214125c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -392,8 +392,6 @@ public class GroupCoordinatorConfig { STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG ); - - public static final String GROUP_PREFIX = "group."; public static final ConfigDef CONFIG_DEF = new ConfigDef() // Group coordinator configs diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java index 139aa4ab3ced5..7627813e58783 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java @@ -28,9 +28,11 @@ import org.junit.jupiter.params.provider.MethodSource; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.stream.Stream; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT; @@ -58,6 +60,7 @@ import static org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -637,6 +640,55 @@ public void testEvaluateMinBoundedValueBelowMinIsCapped( assertEquals(expectedMin, result.get(key)); } + @Test + public void testAllGroupConfigSynonyms() { + // GroupConfig entries with no broker-level synonym. + Set groupConfigsWithoutBrokerSynonym = Set.of( + GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, + GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, + GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG + ); + + // Every GroupConfig entry must be in ALL_GROUP_CONFIG_SYNONYMS or in the exclusion list. + for (String groupConfigName : GroupConfig.configDef().names()) { + if (groupConfigsWithoutBrokerSynonym.contains(groupConfigName)) { + assertFalse(GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.containsKey(groupConfigName), + "Config '" + groupConfigName + "' should not be in both the exclusion list " + + "and ALL_GROUP_CONFIG_SYNONYMS."); + continue; + } + assertTrue(GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.containsKey(groupConfigName), + "GroupConfig entry '" + groupConfigName + "' is not in ALL_GROUP_CONFIG_SYNONYMS " + + "and not in the exclusion list. Add it to ALL_GROUP_CONFIG_SYNONYMS or " + + "to groupConfigsWithoutBrokerSynonym."); + } + + // Every key in ALL_GROUP_CONFIG_SYNONYMS must be a valid GroupConfig entry. + for (String key : GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.keySet()) { + assertTrue(GroupConfig.configDef().names().contains(key), + "ALL_GROUP_CONFIG_SYNONYMS contains key '" + key + "' which is not a valid " + + "GroupConfig entry. Remove it or fix the typo."); + } + + // Every entry in the exclusion list must be a valid GroupConfig entry. + for (String excluded : groupConfigsWithoutBrokerSynonym) { + assertTrue(GroupConfig.configDef().names().contains(excluded), + "Exclusion list contains '" + excluded + "' which is not a valid GroupConfig " + + "entry. Remove it or fix the typo."); + } + + // Every synonym value must point to a valid broker config. + Set brokerConfigNames = new HashSet<>(); + brokerConfigNames.addAll(GroupCoordinatorConfig.CONFIG_DEF.names()); + brokerConfigNames.addAll(ShareGroupConfig.CONFIG_DEF.names()); + + for (Map.Entry entry : GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.entrySet()) { + assertTrue(brokerConfigNames.contains(entry.getValue()), + "ALL_GROUP_CONFIG_SYNONYMS maps '" + entry.getKey() + "' to '" + + entry.getValue() + "' but this broker config does not exist."); + } + } + private Map createValidGroupConfig() { Map props = new HashMap<>(); props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "45000"); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 7d45c7f59a48a..a1eff74238f0f 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -30,24 +30,19 @@ import org.apache.kafka.coordinator.group.assignor.RangeAssignor; import org.apache.kafka.coordinator.group.assignor.SimpleAssignor; import org.apache.kafka.coordinator.group.assignor.UniformAssignor; -import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig; import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.OptionalInt; -import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; public class GroupCoordinatorConfigTest { @@ -729,47 +724,6 @@ public void testStreamsGroupMinTaskOffsetIntervalCustomValue() { assertEquals(20000, config.streamsGroupMinTaskOffsetIntervalMs()); } - @Test - public void testGroupConfigBrokerSynonymNamingConvention() { - // GroupConfig entries that have no broker-level synonym. - Set groupConfigsWithoutBrokerSynonym = Set.of( - GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, - GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, - GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG - ); - - // Combine broker CONFIG_DEFs since broker configs are spread across multiple classes. - // If a new config class is introduced, add its CONFIG_DEF here. - Set brokerConfigNames = new HashSet<>(); - brokerConfigNames.addAll(GroupCoordinatorConfig.CONFIG_DEF.names()); - brokerConfigNames.addAll(ShareGroupConfig.CONFIG_DEF.names()); - - // For each GroupConfig entry that has a broker synonym, the broker config name - // must be GROUP_PREFIX + groupConfigName. - for (String groupConfigName : GroupConfig.configDef().names()) { - if (groupConfigsWithoutBrokerSynonym.contains(groupConfigName)) { - continue; - } - String expectedBrokerName = GroupCoordinatorConfig.GROUP_PREFIX + groupConfigName; - assertTrue(brokerConfigNames.contains(expectedBrokerName), - "GroupConfig entry '" + groupConfigName + "' is not in the exclusion list, " + - "but expected broker config '" + expectedBrokerName + "' was not found. " + - "Either add the broker config following the naming convention, or add this " + - "entry to groupConfigsWithoutBrokerSynonym."); - } - - // Verify exclusion list entries truly have no broker-level synonym. - for (String excluded : groupConfigsWithoutBrokerSynonym) { - assertTrue(GroupConfig.configDef().names().contains(excluded), - "'" + excluded + "' is in the exclusion list but not in GroupConfig.configDef(). " + - "Remove it from groupConfigsWithoutBrokerSynonym."); - String brokerName = GroupCoordinatorConfig.GROUP_PREFIX + excluded; - assertFalse(brokerConfigNames.contains(brokerName), - "'" + excluded + "' is in the exclusion list but broker config '" + - brokerName + "' exists. Remove it from groupConfigsWithoutBrokerSynonym."); - } - } - public static GroupCoordinatorConfig createConfig(Map configs) { return new GroupCoordinatorConfig(new AbstractConfig( GroupCoordinatorConfig.CONFIG_DEF, From 04dfa51bd90c99ae3756448d02a810303caf3475 Mon Sep 17 00:00:00 2001 From: majialong Date: Sun, 29 Mar 2026 22:37:43 +0800 Subject: [PATCH 5/9] Rename CONFIG to CONFIG_DEF and make public --- .../clients/admin/StaticBrokerConfigTest.java | 2 +- .../main/scala/kafka/admin/ConfigCommand.scala | 2 +- .../main/scala/kafka/server/ConfigHelper.scala | 2 +- .../scala/unit/kafka/server/KafkaApisTest.scala | 2 +- .../kafka/coordinator/group/GroupConfig.java | 16 ++++++---------- .../kafka/coordinator/group/GroupConfigTest.java | 6 +++--- 6 files changed, 13 insertions(+), 17 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java index d880449d4eec0..6f35f7ec30757 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java @@ -155,7 +155,7 @@ public void testInternalConfigsDoNotReturnForDescribeConfigs(ClusterInstance clu // test for case ConfigResource.Type == GROUP Config groupConfig = configResourceMap.get(groupResource); - assertNotContainsAnyInternalConfig(groupConfig, GroupConfig.configDef().configKeys()); + assertNotContainsAnyInternalConfig(groupConfig, GroupConfig.CONFIG_DEF.configKeys()); // test for case ConfigResource.Type == CLIENT_METRICS Config clientMetricsConfig = configResourceMap.get(clientMetricsResource); diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 8cc46dadb0646..0166a64e67f61 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -580,7 +580,7 @@ object ConfigCommand extends Logging { "For entity-type '" + ClientType + "': " + QuotaConfig.userAndClientQuotaConfigs().names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) + "For entity-type '" + IpType + "': " + QuotaConfig.ipConfigs.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) + "For entity-type '" + ClientMetricsType + "': " + ClientMetricsConfigs.configDef().names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) + - "For entity-type '" + GroupType + "': " + GroupConfig.configDef().names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) + + "For entity-type '" + GroupType + "': " + GroupConfig.CONFIG_DEF.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) + s"Entity types '$UserType' and '$ClientType' may be specified together to update config for clients of a specific user.") .withRequiredArg .ofType(classOf[String]) diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala b/core/src/main/scala/kafka/server/ConfigHelper.scala index 971569fb9ebc3..2f171138d72a4 100644 --- a/core/src/main/scala/kafka/server/ConfigHelper.scala +++ b/core/src/main/scala/kafka/server/ConfigHelper.scala @@ -170,7 +170,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo configSynonyms(brokerConfigName, brokerSynonyms(brokerConfigName), isSensitive) else // No broker synonym, fall back to GroupConfig defaults - Option(GroupConfig.configDef().defaultValues().get(name)) + Option(GroupConfig.CONFIG_DEF.defaultValues().get(name)) .map(v => List(new DescribeConfigsResponseData.DescribeConfigsSynonym() .setName(name) .setValue(if (isSensitive) null else ConfigDef.convertToString(v, configEntryType.orNull)) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index a2322c5e97d29..1d0a857dd5ea0 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -459,7 +459,7 @@ class KafkaApisTest extends Logging { assertEquals(ConfigResource.Type.GROUP.id, describeConfigsResult.resourceType) assertEquals(consumerGroupId, describeConfigsResult.resourceName) val configs = describeConfigsResult.configs - assertEquals(GroupConfig.configDef().names().size, configs.size) + assertEquals(GroupConfig.CONFIG_DEF.names().size, configs.size) val configMap = configs.asScala.map(c => c.name -> c).toMap diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java index 458e9d7df9762..61031b606c822 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java @@ -157,7 +157,7 @@ public final class GroupConfig extends AbstractConfig { public final boolean shareRenewAcknowledgeEnable; - private static final ConfigDef CONFIG = new ConfigDef() + public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, INT, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, @@ -311,7 +311,7 @@ public final class GroupConfig extends AbstractConfig { ); public GroupConfig(Map props) { - super(CONFIG, props, false); + super(CONFIG_DEF, props, false); this.consumerSessionTimeoutMs = getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG); this.consumerHeartbeatIntervalMs = getInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG); // These have to be optionals because their default group coordinator configs are dynamic, @@ -356,16 +356,12 @@ public GroupConfig(Map props) { this.shareRenewAcknowledgeEnable = getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG); } - public static ConfigDef configDef() { - return CONFIG; - } - public static Optional configType(String configName) { - return Optional.ofNullable(CONFIG.configKeys().get(configName)).map(c -> c.type); + return Optional.ofNullable(CONFIG_DEF.configKeys().get(configName)).map(c -> c.type); } public static Set configNames() { - return CONFIG.names(); + return CONFIG_DEF.names(); } /** @@ -385,7 +381,7 @@ public static void validateNames(Map props) { */ @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"}) private static void validateValues(Map unparsedMap, GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) { - Map valueMaps = CONFIG.parse(unparsedMap); + Map valueMaps = CONFIG_DEF.parse(unparsedMap); int consumerHeartbeatInterval = (Integer) valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG); int consumerSessionTimeout = (Integer) valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG); int consumerAssignmentIntervalMs = (Integer) valueMaps.get(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG); @@ -928,6 +924,6 @@ public boolean shareRenewAcknowledgeEnable() { } public static void main(String[] args) { - System.out.println(CONFIG.toHtml(4, config -> "groupconfigs_" + config)); + System.out.println(CONFIG_DEF.toHtml(4, config -> "groupconfigs_" + config)); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java index 7627813e58783..44f720faafa47 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java @@ -650,7 +650,7 @@ public void testAllGroupConfigSynonyms() { ); // Every GroupConfig entry must be in ALL_GROUP_CONFIG_SYNONYMS or in the exclusion list. - for (String groupConfigName : GroupConfig.configDef().names()) { + for (String groupConfigName : GroupConfig.CONFIG_DEF.names()) { if (groupConfigsWithoutBrokerSynonym.contains(groupConfigName)) { assertFalse(GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.containsKey(groupConfigName), "Config '" + groupConfigName + "' should not be in both the exclusion list " + @@ -665,14 +665,14 @@ public void testAllGroupConfigSynonyms() { // Every key in ALL_GROUP_CONFIG_SYNONYMS must be a valid GroupConfig entry. for (String key : GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.keySet()) { - assertTrue(GroupConfig.configDef().names().contains(key), + assertTrue(GroupConfig.CONFIG_DEF.names().contains(key), "ALL_GROUP_CONFIG_SYNONYMS contains key '" + key + "' which is not a valid " + "GroupConfig entry. Remove it or fix the typo."); } // Every entry in the exclusion list must be a valid GroupConfig entry. for (String excluded : groupConfigsWithoutBrokerSynonym) { - assertTrue(GroupConfig.configDef().names().contains(excluded), + assertTrue(GroupConfig.CONFIG_DEF.names().contains(excluded), "Exclusion list contains '" + excluded + "' which is not a valid GroupConfig " + "entry. Remove it or fix the typo."); } From 9a7bfd0ded5956e6c71007f443c6e796642d11f3 Mon Sep 17 00:00:00 2001 From: majialong Date: Mon, 30 Mar 2026 22:24:58 +0800 Subject: [PATCH 6/9] Use Optional in ALL_GROUP_CONFIG_SYNONYMS and add integration tests --- .../scala/kafka/server/ConfigHelper.scala | 4 +- .../main/scala/kafka/server/KafkaConfig.scala | 10 +- .../kafka/api/BaseAdminIntegrationTest.scala | 7 + .../api/PlaintextAdminIntegrationTest.scala | 215 ++++++++++++++++++ .../kafka/coordinator/group/GroupConfig.java | 42 ++-- .../coordinator/group/GroupConfigTest.java | 44 +--- .../server/config/AbstractKafkaConfig.java | 8 + 7 files changed, 267 insertions(+), 63 deletions(-) diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala b/core/src/main/scala/kafka/server/ConfigHelper.scala index 2f171138d72a4..8a28be7a2e7fa 100644 --- a/core/src/main/scala/kafka/server/ConfigHelper.scala +++ b/core/src/main/scala/kafka/server/ConfigHelper.scala @@ -166,8 +166,8 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType.orNull) val allSynonyms = { val brokerConfigName = GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.get(name) - val list = if (brokerConfigName != null) - configSynonyms(brokerConfigName, brokerSynonyms(brokerConfigName), isSensitive) + val list = if (brokerConfigName != null && brokerConfigName.isPresent) + configSynonyms(brokerConfigName.get, brokerSynonyms(brokerConfigName.get), isSensitive) else // No broker synonym, fall back to GroupConfig defaults Option(GroupConfig.CONFIG_DEF.defaultValues().get(name)) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 55e8c4bdb5737..c5075d5bd9949 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.Utils import org.apache.kafka.coordinator.group.Group.GroupType import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig -import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.coordinator.share.ShareCoordinatorConfig import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig} @@ -607,12 +607,4 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, remoteLogManagerConfig.logLocalRetentionBytes: java.lang.Long) logProps } - - def extractGroupConfigMap: java.util.Map[String, Object] = { - val defaults = new java.util.HashMap[String, Object]() - GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.forEach { (groupConfigName, brokerConfigName) => - defaults.put(groupConfigName, get(brokerConfigName)) - } - defaults - } } diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index 16dec9dc00800..ed3ac0d203099 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -202,6 +202,13 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg config.setProperty(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG, "123") }) } + // For testDescribeGroupConfigSynonymsWithStaticBrokerConfig, set a static broker config so + // that we can verify the synonym chain includes STATIC_BROKER_CONFIG. + if (testInfo.getTestMethod.toString.contains("testDescribeGroupConfigSynonymsWithStaticBrokerConfig")) { + configs.foreach(config => { + config.setProperty(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "2000") + }) + } configs.foreach { config => config.setProperty(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, "true") config.setProperty(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0") diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index a9c23a0bc0865..4e440766ebded 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -1203,6 +1203,221 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(50000, brokerServers.head.groupConfigManager.groupConfig(groupId).get.consumerSessionTimeoutMs) } + @Test + def testDescribeGroupConfigSynonymsWithBrokerSynonym(): Unit = { + client = createAdminClient + val group = "synonym-test-group" + val groupResource = new ConfigResource(ConfigResource.Type.GROUP, group) + val brokerDefaultResource = new ConfigResource(ConfigResource.Type.BROKER, "") + val describeOptions = new DescribeConfigsOptions().includeSynonyms(true) + + def synonymsList(entry: ConfigEntry): List[(String, ConfigSource)] = + entry.synonyms.asScala.map(s => (s.name, s.source)).toList + + // Verify default config only, no dynamic broker config, no group override. + // Expected chain: DEFAULT_CONFIG + var configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) + var assignInterval = configs.get(groupResource).get(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) + assertEquals(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT.toString, assignInterval.value) + assertEquals(ConfigSource.DEFAULT_CONFIG, assignInterval.source) + assertEquals(List( + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DEFAULT_CONFIG) + ), synonymsList(assignInterval)) + + // Set per-broker dynamic config for all brokers. + // Expected chain: DYNAMIC_BROKER_CONFIG → DEFAULT_CONFIG + val perBrokerProps = new Properties + perBrokerProps.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "1500") + TestUtils.incrementalAlterConfigs(brokers, client, perBrokerProps, perBrokerConfig = true) + .all.get(15, TimeUnit.SECONDS) + ensureConsistentKRaftMetadata() + + configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) + assignInterval = configs.get(groupResource).get(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) + assertEquals("1500", assignInterval.value) + assertEquals(ConfigSource.DYNAMIC_BROKER_CONFIG, assignInterval.source) + assertEquals(List( + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_BROKER_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DEFAULT_CONFIG) + ), synonymsList(assignInterval)) + + // Set dynamic default broker config, verify per-broker config still takes precedence. + // Expected chain: DYNAMIC_BROKER_CONFIG → DYNAMIC_DEFAULT_BROKER_CONFIG → DEFAULT_CONFIG + val dynamicAlterOps = util.List.of( + new AlterConfigOp(new ConfigEntry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "2000"), AlterConfigOp.OpType.SET) + ) + client.incrementalAlterConfigs(util.Map.of(brokerDefaultResource, dynamicAlterOps)).all.get(15, TimeUnit.SECONDS) + ensureConsistentKRaftMetadata() + + configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) + assignInterval = configs.get(groupResource).get(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) + assertEquals("1500", assignInterval.value) + assertEquals(ConfigSource.DYNAMIC_BROKER_CONFIG, assignInterval.source) + assertEquals(List( + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_BROKER_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DEFAULT_CONFIG) + ), synonymsList(assignInterval)) + + // Set group override, verify it takes precedence over all broker configs. + // Expected chain: DYNAMIC_GROUP_CONFIG → DYNAMIC_BROKER_CONFIG → DYNAMIC_DEFAULT_BROKER_CONFIG → DEFAULT_CONFIG + val groupAlterOps = util.List.of( + new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "3000"), AlterConfigOp.OpType.SET) + ) + client.incrementalAlterConfigs(util.Map.of(groupResource, groupAlterOps)).all.get(15, TimeUnit.SECONDS) + ensureConsistentKRaftMetadata() + + configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) + assignInterval = configs.get(groupResource).get(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) + assertEquals("3000", assignInterval.value) + assertEquals(ConfigSource.DYNAMIC_GROUP_CONFIG, assignInterval.source) + assertEquals(List( + (GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_GROUP_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_BROKER_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DEFAULT_CONFIG) + ), synonymsList(assignInterval)) + } + + @Test + def testDescribeGroupConfigSynonymsWithoutBrokerSynonym(): Unit = { + client = createAdminClient + val group = "synonym-no-broker-test-group" + val groupResource = new ConfigResource(ConfigResource.Type.GROUP, group) + val describeOptions = new DescribeConfigsOptions().includeSynonyms(true) + + def synonymsList(entry: ConfigEntry): List[(String, ConfigSource)] = + entry.synonyms.asScala.map(s => (s.name, s.source)).toList + + // Verify default config only, no group override, config has no broker synonym. + // Expected chain: DEFAULT_CONFIG + var configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) + var shareAutoOffsetReset = configs.get(groupResource).get(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG) + assertEquals(GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT, shareAutoOffsetReset.value) + assertEquals(ConfigSource.DEFAULT_CONFIG, shareAutoOffsetReset.source) + assertEquals(List( + (GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, ConfigSource.DEFAULT_CONFIG) + ), synonymsList(shareAutoOffsetReset)) + + // Set group override, verify synonyms use group config name (no broker synonym). + // Expected chain: DYNAMIC_GROUP_CONFIG → DEFAULT_CONFIG + val groupAlterOps = util.List.of( + new AlterConfigOp(new ConfigEntry(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "earliest"), AlterConfigOp.OpType.SET) + ) + client.incrementalAlterConfigs(util.Map.of(groupResource, groupAlterOps)).all.get(15, TimeUnit.SECONDS) + ensureConsistentKRaftMetadata() + + configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) + shareAutoOffsetReset = configs.get(groupResource).get(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG) + assertEquals("earliest", shareAutoOffsetReset.value) + assertEquals(ConfigSource.DYNAMIC_GROUP_CONFIG, shareAutoOffsetReset.source) + assertEquals(List( + (GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, ConfigSource.DYNAMIC_GROUP_CONFIG), + (GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, ConfigSource.DEFAULT_CONFIG) + ), synonymsList(shareAutoOffsetReset)) + } + + @Test + def testDescribeGroupConfigSynonymsWithStaticBrokerConfig(): Unit = { + // This test requires group.consumer.assignment.interval.ms=2000 in static broker config, + // configured via BaseAdminIntegrationTest.modifyConfigs. + client = createAdminClient + val group = "synonym-static-test-group" + val groupResource = new ConfigResource(ConfigResource.Type.GROUP, group) + val brokerDefaultResource = new ConfigResource(ConfigResource.Type.BROKER, "") + val describeOptions = new DescribeConfigsOptions().includeSynonyms(true) + + def synonymsList(entry: ConfigEntry): List[(String, ConfigSource)] = + entry.synonyms.asScala.map(s => (s.name, s.source)).toList + + // Verify static broker config is reflected in synonyms. + // Expected chain: STATIC_BROKER_CONFIG → DEFAULT_CONFIG + var configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) + var assignInterval = configs.get(groupResource).get(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) + assertEquals("2000", assignInterval.value) + assertEquals(ConfigSource.STATIC_BROKER_CONFIG, assignInterval.source) + assertEquals(List( + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.STATIC_BROKER_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DEFAULT_CONFIG) + ), synonymsList(assignInterval)) + + // Set group override, verify it takes precedence over static broker config. + // Expected chain: DYNAMIC_GROUP_CONFIG → STATIC_BROKER_CONFIG → DEFAULT_CONFIG + val groupAlterOps = util.List.of( + new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "3000"), AlterConfigOp.OpType.SET) + ) + client.incrementalAlterConfigs(util.Map.of(groupResource, groupAlterOps)).all.get(15, TimeUnit.SECONDS) + ensureConsistentKRaftMetadata() + + configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) + assignInterval = configs.get(groupResource).get(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) + assertEquals("3000", assignInterval.value) + assertEquals(ConfigSource.DYNAMIC_GROUP_CONFIG, assignInterval.source) + assertEquals(List( + (GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_GROUP_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.STATIC_BROKER_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DEFAULT_CONFIG) + ), synonymsList(assignInterval)) + + // Add dynamic default broker config. + // Expected chain: DYNAMIC_GROUP_CONFIG → DYNAMIC_DEFAULT_BROKER_CONFIG → STATIC_BROKER_CONFIG → DEFAULT_CONFIG + val dynamicAlterOps = util.List.of( + new AlterConfigOp(new ConfigEntry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "4000"), AlterConfigOp.OpType.SET) + ) + client.incrementalAlterConfigs(util.Map.of(brokerDefaultResource, dynamicAlterOps)).all.get(15, TimeUnit.SECONDS) + ensureConsistentKRaftMetadata() + + configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) + assignInterval = configs.get(groupResource).get(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) + assertEquals("3000", assignInterval.value) + assertEquals(ConfigSource.DYNAMIC_GROUP_CONFIG, assignInterval.source) + assertEquals(List( + (GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_GROUP_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.STATIC_BROKER_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DEFAULT_CONFIG) + ), synonymsList(assignInterval)) + + // Add per-broker dynamic config, verify full synonym chain with all five layers. + // Expected chain: DYNAMIC_GROUP_CONFIG → DYNAMIC_BROKER_CONFIG → DYNAMIC_DEFAULT_BROKER_CONFIG → STATIC_BROKER_CONFIG → DEFAULT_CONFIG + val perBrokerProps = new Properties + perBrokerProps.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "5000") + TestUtils.incrementalAlterConfigs(brokers, client, perBrokerProps, perBrokerConfig = true) + .all.get(15, TimeUnit.SECONDS) + ensureConsistentKRaftMetadata() + + configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) + assignInterval = configs.get(groupResource).get(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) + assertEquals("3000", assignInterval.value) + assertEquals(ConfigSource.DYNAMIC_GROUP_CONFIG, assignInterval.source) + assertEquals(List( + (GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_GROUP_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_BROKER_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.STATIC_BROKER_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DEFAULT_CONFIG) + ), synonymsList(assignInterval)) + + // Delete group override, verify synonyms fall back to per-broker dynamic config. + // Expected chain: DYNAMIC_BROKER_CONFIG → DYNAMIC_DEFAULT_BROKER_CONFIG → STATIC_BROKER_CONFIG → DEFAULT_CONFIG + val groupDeleteOps = util.List.of( + new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, ""), AlterConfigOp.OpType.DELETE) + ) + client.incrementalAlterConfigs(util.Map.of(groupResource, groupDeleteOps)).all.get(15, TimeUnit.SECONDS) + ensureConsistentKRaftMetadata() + + configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) + assignInterval = configs.get(groupResource).get(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) + assertEquals("5000", assignInterval.value) + assertEquals(ConfigSource.DYNAMIC_BROKER_CONFIG, assignInterval.source) + assertEquals(List( + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_BROKER_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.STATIC_BROKER_CONFIG), + (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DEFAULT_CONFIG) + ), synonymsList(assignInterval)) + } + @Test def testCreatePartitions(): Unit = { client = createAdminClient diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java index 61031b606c822..354f3e8cba4d4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java @@ -283,31 +283,35 @@ public final class GroupConfig extends AbstractConfig { /** * Mapping from GroupConfig name to its broker-level synonym config name. + * {@code Optional.empty()} indicates that the config has no broker-level synonym. */ - public static final Map ALL_GROUP_CONFIG_SYNONYMS = Map.ofEntries( + public static final Map> ALL_GROUP_CONFIG_SYNONYMS = Map.ofEntries( // Consumer group configs - Map.entry(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG), - Map.entry(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG), - Map.entry(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG), - Map.entry(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG), + Map.entry(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)), + Map.entry(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)), + Map.entry(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)), + Map.entry(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)), // Share group configs - Map.entry(SHARE_SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG), - Map.entry(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG), - Map.entry(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG), - Map.entry(SHARE_DELIVERY_COUNT_LIMIT_CONFIG, ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG), - Map.entry(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG), - Map.entry(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG), - Map.entry(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG), + Map.entry(SHARE_SESSION_TIMEOUT_MS_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)), + Map.entry(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)), + Map.entry(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, Optional.of(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG)), + Map.entry(SHARE_DELIVERY_COUNT_LIMIT_CONFIG, Optional.of(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG)), + Map.entry(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, Optional.of(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG)), + Map.entry(SHARE_AUTO_OFFSET_RESET_CONFIG, Optional.empty()), + Map.entry(SHARE_ISOLATION_LEVEL_CONFIG, Optional.empty()), + Map.entry(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, Optional.empty()), + Map.entry(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)), + Map.entry(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)), // Streams group configs - Map.entry(STREAMS_SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG), - Map.entry(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG), - Map.entry(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG), - Map.entry(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG), - Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG), - Map.entry(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG), - Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG) + Map.entry(STREAMS_SESSION_TIMEOUT_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG)), + Map.entry(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)), + Map.entry(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG)), + Map.entry(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)), + Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)), + Map.entry(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)), + Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG)) ); public GroupConfig(Map props) { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java index 44f720faafa47..d1468b1eba1dc 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java @@ -60,7 +60,6 @@ import static org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -642,50 +641,29 @@ public void testEvaluateMinBoundedValueBelowMinIsCapped( @Test public void testAllGroupConfigSynonyms() { - // GroupConfig entries with no broker-level synonym. - Set groupConfigsWithoutBrokerSynonym = Set.of( - GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, - GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, - GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG - ); - - // Every GroupConfig entry must be in ALL_GROUP_CONFIG_SYNONYMS or in the exclusion list. + // Every GroupConfig entry should have an entry in ALL_GROUP_CONFIG_SYNONYMS. for (String groupConfigName : GroupConfig.CONFIG_DEF.names()) { - if (groupConfigsWithoutBrokerSynonym.contains(groupConfigName)) { - assertFalse(GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.containsKey(groupConfigName), - "Config '" + groupConfigName + "' should not be in both the exclusion list " + - "and ALL_GROUP_CONFIG_SYNONYMS."); - continue; - } assertTrue(GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.containsKey(groupConfigName), - "GroupConfig entry '" + groupConfigName + "' is not in ALL_GROUP_CONFIG_SYNONYMS " + - "and not in the exclusion list. Add it to ALL_GROUP_CONFIG_SYNONYMS or " + - "to groupConfigsWithoutBrokerSynonym."); + "GroupConfig entry '" + groupConfigName + "' is not in ALL_GROUP_CONFIG_SYNONYMS. " + + "Add it with Optional.of(brokerConfigName) or Optional.empty() if it has no broker synonym."); } - // Every key in ALL_GROUP_CONFIG_SYNONYMS must be a valid GroupConfig entry. + // Every key in ALL_GROUP_CONFIG_SYNONYMS should be a valid GroupConfig entry. for (String key : GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.keySet()) { assertTrue(GroupConfig.CONFIG_DEF.names().contains(key), - "ALL_GROUP_CONFIG_SYNONYMS contains key '" + key + "' which is not a valid " + - "GroupConfig entry. Remove it or fix the typo."); - } - - // Every entry in the exclusion list must be a valid GroupConfig entry. - for (String excluded : groupConfigsWithoutBrokerSynonym) { - assertTrue(GroupConfig.CONFIG_DEF.names().contains(excluded), - "Exclusion list contains '" + excluded + "' which is not a valid GroupConfig " + - "entry. Remove it or fix the typo."); + "ALL_GROUP_CONFIG_SYNONYMS contains '" + key + "' which is not a valid GroupConfig entry."); } - // Every synonym value must point to a valid broker config. + // Every present synonym mapping should point to a valid broker config. Set brokerConfigNames = new HashSet<>(); brokerConfigNames.addAll(GroupCoordinatorConfig.CONFIG_DEF.names()); brokerConfigNames.addAll(ShareGroupConfig.CONFIG_DEF.names()); - for (Map.Entry entry : GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.entrySet()) { - assertTrue(brokerConfigNames.contains(entry.getValue()), - "ALL_GROUP_CONFIG_SYNONYMS maps '" + entry.getKey() + "' to '" + - entry.getValue() + "' but this broker config does not exist."); + for (Map.Entry> entry : GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.entrySet()) { + entry.getValue().ifPresent(brokerConfigName -> + assertTrue(brokerConfigNames.contains(brokerConfigName), + "ALL_GROUP_CONFIG_SYNONYMS maps '" + entry.getKey() + "' to '" + + brokerConfigName + "' but this broker config does not exist.")); } } diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java index 903e24a78eba3..d129fdcd65b08 100644 --- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java @@ -627,4 +627,12 @@ public Long logRetentionTimeMillis() { return millis < 0 ? Long.valueOf(-1) : millis; } + + public Map extractGroupConfigMap() { + Map defaults = new HashMap<>(); + GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.forEach((groupConfigName, brokerConfigName) -> + brokerConfigName.ifPresent(name -> defaults.put(groupConfigName, get(name))) + ); + return defaults; + } } From f44305072c85e9b27d72b10b6c9ab98e5135ee9f Mon Sep 17 00:00:00 2001 From: majialong Date: Mon, 30 Mar 2026 22:34:02 +0800 Subject: [PATCH 7/9] Remove redundant test --- .../unit/kafka/server/KafkaApisTest.scala | 96 ------------------- 1 file changed, 96 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 1d0a857dd5ea0..c5974e7e3179b 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -405,102 +405,6 @@ class KafkaApisTest extends Logging { assertEquals(cgConfigs.size, configs.size) } - @Test - def testDescribeConfigsGroupSynonyms(): Unit = { - val authorizer: Authorizer = mock(classOf[Authorizer]) - val operation = AclOperation.DESCRIBE_CONFIGS - val resourceType = ResourceType.GROUP - val consumerGroupId = "consumer_group_1" - val requestHeader = - new RequestHeader(ApiKeys.DESCRIBE_CONFIGS, ApiKeys.DESCRIBE_CONFIGS.latestVersion, clientId, 0) - val expectedActions = util.List.of( - new Action(operation, new ResourcePattern(resourceType, consumerGroupId, PatternType.LITERAL), - 1, true, true) - ) - - when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(expectedActions))) - .thenReturn(util.List.of(AuthorizationResult.ALLOWED)) - - val configRepository: ConfigRepository = mock(classOf[ConfigRepository]) - - val cgConfigs = new Properties() - cgConfigs.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString) - cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT) - - when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs) - - // Override broker config for consumer session timeout. - val customSessionTimeoutMs = "55000" - val overrideProperties = Map( - GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG -> customSessionTimeoutMs - ) - - val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData() - .setIncludeSynonyms(true) - .setResources(util.List.of(new DescribeConfigsRequestData.DescribeConfigsResource() - .setResourceName(consumerGroupId) - .setResourceType(ConfigResource.Type.GROUP.id)))) - .build(requestHeader.apiVersion) - val request = buildRequest(describeConfigsRequest, - requestHeader = Option(requestHeader)) - when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), - any[Long])).thenReturn(0) - - createKafkaApis(authorizer = Some(authorizer), configRepository = configRepository, - overrideProperties = overrideProperties).handleDescribeConfigsRequest(request) - - val response = verifyNoThrottling[DescribeConfigsResponse](request) - // Verify that authorize is only called once - verify(authorizer, times(1)).authorize(any(), any()) - val results = response.data.results - assertEquals(1, results.size) - val describeConfigsResult = results.get(0) - - assertEquals(ConfigResource.Type.GROUP.id, describeConfigsResult.resourceType) - assertEquals(consumerGroupId, describeConfigsResult.resourceName) - val configs = describeConfigsResult.configs - assertEquals(GroupConfig.CONFIG_DEF.names().size, configs.size) - - val configMap = configs.asScala.map(c => c.name -> c).toMap - - // Not in cgConfigs, has broker override. - val sessionTimeoutConfig = configMap(CONSUMER_SESSION_TIMEOUT_MS_CONFIG) - assertEquals(customSessionTimeoutMs, sessionTimeoutConfig.value) - assertEquals(DescribeConfigsResponse.ConfigSource.STATIC_BROKER_CONFIG.id, sessionTimeoutConfig.configSource) - val sessionTimeoutSynonyms = sessionTimeoutConfig.synonyms.asScala - // Synonyms: STATIC_BROKER_CONFIG -> DEFAULT_CONFIG. - assertEquals(2, sessionTimeoutSynonyms.size) - assertEquals(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutSynonyms.head.name) - assertEquals(customSessionTimeoutMs, sessionTimeoutSynonyms.head.value) - assertEquals(DescribeConfigsResponse.ConfigSource.STATIC_BROKER_CONFIG.id, sessionTimeoutSynonyms.head.source) - assertEquals(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutSynonyms(1).name) - assertEquals(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString, sessionTimeoutSynonyms(1).value) - assertEquals(DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG.id, sessionTimeoutSynonyms(1).source) - - // No broker synonym, set in cgConfigs. - val shareAutoOffsetResetConfig = configMap(SHARE_AUTO_OFFSET_RESET_CONFIG) - assertEquals(GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT, shareAutoOffsetResetConfig.value) - val shareAutoOffsetResetSynonyms = shareAutoOffsetResetConfig.synonyms.asScala - // Synonyms: GROUP_CONFIG -> DEFAULT_CONFIG. - assertEquals(2, shareAutoOffsetResetSynonyms.size) - assertEquals(SHARE_AUTO_OFFSET_RESET_CONFIG, shareAutoOffsetResetSynonyms.head.name) - assertEquals(DescribeConfigsResponse.ConfigSource.GROUP_CONFIG.id, shareAutoOffsetResetSynonyms.head.source) - assertEquals(SHARE_AUTO_OFFSET_RESET_CONFIG, shareAutoOffsetResetSynonyms(1).name) - assertEquals(GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT, shareAutoOffsetResetSynonyms(1).value) - assertEquals(DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG.id, shareAutoOffsetResetSynonyms(1).source) - - // Has broker synonym, set in cgConfigs, no broker override. - val heartbeatConfig = configMap(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG) - val heartbeatSynonyms = heartbeatConfig.synonyms.asScala - // Synonyms: GROUP_CONFIG -> DEFAULT_CONFIG. - assertEquals(2, heartbeatSynonyms.size) - assertEquals(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatSynonyms.head.name) - assertEquals(DescribeConfigsResponse.ConfigSource.GROUP_CONFIG.id, heartbeatSynonyms.head.source) - assertEquals(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatSynonyms(1).name) - assertEquals(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString, heartbeatSynonyms(1).value) - assertEquals(DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG.id, heartbeatSynonyms(1).source) - } - @Test def testAlterConfigsClientMetrics(): Unit = { val subscriptionName = "client_metric_subscription_1" From 9936230d06e0844b73b29329a84e70e8936d0612 Mon Sep 17 00:00:00 2001 From: majialong Date: Tue, 31 Mar 2026 01:59:17 +0800 Subject: [PATCH 8/9] Add DynamicGroupConfigIntegrationTest --- .../DynamicGroupConfigIntegrationTest.java | 324 ++++++++++++++++++ .../kafka/api/BaseAdminIntegrationTest.scala | 7 - .../api/PlaintextAdminIntegrationTest.scala | 215 ------------ 3 files changed, 324 insertions(+), 222 deletions(-) create mode 100644 clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DynamicGroupConfigIntegrationTest.java diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DynamicGroupConfigIntegrationTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DynamicGroupConfigIntegrationTest.java new file mode 100644 index 0000000000000..e10edbd37a949 --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DynamicGroupConfigIntegrationTest.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.coordinator.group.GroupConfig; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; + +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class DynamicGroupConfigIntegrationTest { + + @ClusterTest(types = {Type.KRAFT}) + public void testDescribeGroupConfigSynonymsWithBrokerSynonym(ClusterInstance cluster) throws Exception { + try (Admin admin = cluster.admin()) { + String group = "synonym-test-group"; + ConfigResource groupResource = new ConfigResource(ConfigResource.Type.GROUP, group); + ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "0"); + ConfigResource brokerDefaultResource = new ConfigResource(ConfigResource.Type.BROKER, ""); + + // Verify default config only. + // Expected synonym chain: DEFAULT_CONFIG + assertGroupConfig( + admin, + groupResource, + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + String.valueOf(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT), + ConfigEntry.ConfigSource.DEFAULT_CONFIG, + List.of( + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG)) + ); + + // Set per-broker dynamic config. + // Expected synonym chain: DYNAMIC_BROKER_CONFIG -> DEFAULT_CONFIG + admin.incrementalAlterConfigs(Map.of(brokerResource, List.of( + new AlterConfigOp(new ConfigEntry( + GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "1500"), + AlterConfigOp.OpType.SET) + ))).all().get(); + cluster.ensureConsistentMetadata(); + + assertGroupConfig( + admin, + groupResource, + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + "1500", + ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG, + List.of( + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG)) + ); + + // Set dynamic default broker config; per-broker config still takes precedence. + // Expected synonym chain: DYNAMIC_BROKER_CONFIG -> DYNAMIC_DEFAULT_BROKER_CONFIG -> DEFAULT_CONFIG + admin.incrementalAlterConfigs(Map.of(brokerDefaultResource, List.of( + new AlterConfigOp(new ConfigEntry( + GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "2000"), + AlterConfigOp.OpType.SET) + ))).all().get(); + cluster.ensureConsistentMetadata(); + + assertGroupConfig( + admin, + groupResource, + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + "1500", + ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG, + List.of( + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG)) + ); + + // Set group override; it takes precedence over all broker configs. + // Expected synonym chain: DYNAMIC_GROUP_CONFIG -> DYNAMIC_BROKER_CONFIG + // -> DYNAMIC_DEFAULT_BROKER_CONFIG -> DEFAULT_CONFIG + admin.incrementalAlterConfigs(Map.of(groupResource, List.of( + new AlterConfigOp(new ConfigEntry( + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "3000"), + AlterConfigOp.OpType.SET) + ))).all().get(); + cluster.ensureConsistentMetadata(); + + assertGroupConfig( + admin, + groupResource, + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + "3000", + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG, + List.of( + Map.entry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG)) + ); + } + } + + @ClusterTest(types = {Type.KRAFT}) + public void testDescribeGroupConfigSynonymsWithoutBrokerSynonym(ClusterInstance cluster) throws Exception { + try (Admin admin = cluster.admin()) { + String group = "synonym-no-broker-test-group"; + ConfigResource groupResource = new ConfigResource(ConfigResource.Type.GROUP, group); + + // Verify default config for a config with no broker synonym. + // Expected synonym chain: DEFAULT_CONFIG + assertGroupConfig( + admin, + groupResource, + GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, + GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT, + ConfigEntry.ConfigSource.DEFAULT_CONFIG, + List.of( + Map.entry(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG)) + ); + + // Set group override; synonyms use group config name since there is no broker synonym. + // Expected synonym chain: DYNAMIC_GROUP_CONFIG -> DEFAULT_CONFIG + admin.incrementalAlterConfigs(Map.of(groupResource, List.of( + new AlterConfigOp(new ConfigEntry( + GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "earliest"), + AlterConfigOp.OpType.SET) + ))).all().get(); + cluster.ensureConsistentMetadata(); + + assertGroupConfig( + admin, + groupResource, + GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, + "earliest", + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG, + List.of( + Map.entry(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG), + Map.entry(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG)) + ); + } + } + + @ClusterTest(types = {Type.KRAFT}, + serverProperties = { + @ClusterConfigProperty( + key = GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "2000") + }) + public void testDescribeGroupConfigSynonymsWithStaticBrokerConfig(ClusterInstance cluster) throws Exception { + try (Admin admin = cluster.admin()) { + String group = "synonym-static-test-group"; + ConfigResource groupResource = new ConfigResource(ConfigResource.Type.GROUP, group); + ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "0"); + ConfigResource brokerDefaultResource = new ConfigResource(ConfigResource.Type.BROKER, ""); + + // Verify static broker config is reflected in synonyms. + // Expected synonym chain: STATIC_BROKER_CONFIG -> DEFAULT_CONFIG + assertGroupConfig( + admin, + groupResource, + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + "2000", + ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG, + List.of( + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG)) + ); + + // Set group override; it takes precedence over static broker config. + // Expected synonym chain: DYNAMIC_GROUP_CONFIG -> STATIC_BROKER_CONFIG -> DEFAULT_CONFIG + admin.incrementalAlterConfigs(Map.of(groupResource, List.of( + new AlterConfigOp(new ConfigEntry( + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "3000"), + AlterConfigOp.OpType.SET) + ))).all().get(); + cluster.ensureConsistentMetadata(); + + assertGroupConfig( + admin, + groupResource, + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + "3000", + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG, + List.of( + Map.entry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG)) + ); + + // Add dynamic default broker config. + // Expected synonym chain: DYNAMIC_GROUP_CONFIG -> DYNAMIC_DEFAULT_BROKER_CONFIG + // -> STATIC_BROKER_CONFIG -> DEFAULT_CONFIG + admin.incrementalAlterConfigs(Map.of(brokerDefaultResource, List.of( + new AlterConfigOp(new ConfigEntry( + GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "4000"), + AlterConfigOp.OpType.SET) + ))).all().get(); + cluster.ensureConsistentMetadata(); + + assertGroupConfig( + admin, + groupResource, + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + "3000", + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG, + List.of( + Map.entry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG))); + + // Add per-broker dynamic config to complete the full 5-layer synonym chain. + // Expected synonym chain: DYNAMIC_GROUP_CONFIG -> DYNAMIC_BROKER_CONFIG -> DYNAMIC_DEFAULT_BROKER_CONFIG + // -> STATIC_BROKER_CONFIG -> DEFAULT_CONFIG + admin.incrementalAlterConfigs(Map.of(brokerResource, List.of( + new AlterConfigOp(new ConfigEntry( + GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "5000"), + AlterConfigOp.OpType.SET) + ))).all().get(); + cluster.ensureConsistentMetadata(); + + assertGroupConfig( + admin, + groupResource, + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + "3000", + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG, + List.of( + Map.entry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG)) + ); + + // Delete group override; value falls back to per-broker dynamic config. + // Expected synonym chain: DYNAMIC_BROKER_CONFIG -> DYNAMIC_DEFAULT_BROKER_CONFIG + // -> STATIC_BROKER_CONFIG -> DEFAULT_CONFIG + admin.incrementalAlterConfigs(Map.of(groupResource, List.of( + new AlterConfigOp(new ConfigEntry( + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, ""), + AlterConfigOp.OpType.DELETE) + ))).all().get(); + cluster.ensureConsistentMetadata(); + + assertGroupConfig( + admin, + groupResource, + GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, + "5000", + ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG, + List.of( + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG), + Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, + ConfigEntry.ConfigSource.DEFAULT_CONFIG))); + } + } + + private void assertGroupConfig( + Admin admin, + ConfigResource groupResource, + String configKey, + String expectedValue, + ConfigEntry.ConfigSource expectedSource, + List> expectedSynonyms + ) throws Exception { + DescribeConfigsOptions options = new DescribeConfigsOptions().includeSynonyms(true); + ConfigEntry entry = admin.describeConfigs(List.of(groupResource), options) + .all().get().get(groupResource).get(configKey); + assertEquals(expectedValue, entry.value()); + assertEquals(expectedSource, entry.source()); + assertEquals(expectedSynonyms, entry.synonyms().stream() + .map(s -> Map.entry(s.name(), s.source())) + .toList()); + } +} diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index ed3ac0d203099..16dec9dc00800 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -202,13 +202,6 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg config.setProperty(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG, "123") }) } - // For testDescribeGroupConfigSynonymsWithStaticBrokerConfig, set a static broker config so - // that we can verify the synonym chain includes STATIC_BROKER_CONFIG. - if (testInfo.getTestMethod.toString.contains("testDescribeGroupConfigSynonymsWithStaticBrokerConfig")) { - configs.foreach(config => { - config.setProperty(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "2000") - }) - } configs.foreach { config => config.setProperty(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, "true") config.setProperty(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0") diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 4e440766ebded..a9c23a0bc0865 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -1203,221 +1203,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(50000, brokerServers.head.groupConfigManager.groupConfig(groupId).get.consumerSessionTimeoutMs) } - @Test - def testDescribeGroupConfigSynonymsWithBrokerSynonym(): Unit = { - client = createAdminClient - val group = "synonym-test-group" - val groupResource = new ConfigResource(ConfigResource.Type.GROUP, group) - val brokerDefaultResource = new ConfigResource(ConfigResource.Type.BROKER, "") - val describeOptions = new DescribeConfigsOptions().includeSynonyms(true) - - def synonymsList(entry: ConfigEntry): List[(String, ConfigSource)] = - entry.synonyms.asScala.map(s => (s.name, s.source)).toList - - // Verify default config only, no dynamic broker config, no group override. - // Expected chain: DEFAULT_CONFIG - var configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) - var assignInterval = configs.get(groupResource).get(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) - assertEquals(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT.toString, assignInterval.value) - assertEquals(ConfigSource.DEFAULT_CONFIG, assignInterval.source) - assertEquals(List( - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DEFAULT_CONFIG) - ), synonymsList(assignInterval)) - - // Set per-broker dynamic config for all brokers. - // Expected chain: DYNAMIC_BROKER_CONFIG → DEFAULT_CONFIG - val perBrokerProps = new Properties - perBrokerProps.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "1500") - TestUtils.incrementalAlterConfigs(brokers, client, perBrokerProps, perBrokerConfig = true) - .all.get(15, TimeUnit.SECONDS) - ensureConsistentKRaftMetadata() - - configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) - assignInterval = configs.get(groupResource).get(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) - assertEquals("1500", assignInterval.value) - assertEquals(ConfigSource.DYNAMIC_BROKER_CONFIG, assignInterval.source) - assertEquals(List( - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_BROKER_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DEFAULT_CONFIG) - ), synonymsList(assignInterval)) - - // Set dynamic default broker config, verify per-broker config still takes precedence. - // Expected chain: DYNAMIC_BROKER_CONFIG → DYNAMIC_DEFAULT_BROKER_CONFIG → DEFAULT_CONFIG - val dynamicAlterOps = util.List.of( - new AlterConfigOp(new ConfigEntry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "2000"), AlterConfigOp.OpType.SET) - ) - client.incrementalAlterConfigs(util.Map.of(brokerDefaultResource, dynamicAlterOps)).all.get(15, TimeUnit.SECONDS) - ensureConsistentKRaftMetadata() - - configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) - assignInterval = configs.get(groupResource).get(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) - assertEquals("1500", assignInterval.value) - assertEquals(ConfigSource.DYNAMIC_BROKER_CONFIG, assignInterval.source) - assertEquals(List( - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_BROKER_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DEFAULT_CONFIG) - ), synonymsList(assignInterval)) - - // Set group override, verify it takes precedence over all broker configs. - // Expected chain: DYNAMIC_GROUP_CONFIG → DYNAMIC_BROKER_CONFIG → DYNAMIC_DEFAULT_BROKER_CONFIG → DEFAULT_CONFIG - val groupAlterOps = util.List.of( - new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "3000"), AlterConfigOp.OpType.SET) - ) - client.incrementalAlterConfigs(util.Map.of(groupResource, groupAlterOps)).all.get(15, TimeUnit.SECONDS) - ensureConsistentKRaftMetadata() - - configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) - assignInterval = configs.get(groupResource).get(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) - assertEquals("3000", assignInterval.value) - assertEquals(ConfigSource.DYNAMIC_GROUP_CONFIG, assignInterval.source) - assertEquals(List( - (GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_GROUP_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_BROKER_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DEFAULT_CONFIG) - ), synonymsList(assignInterval)) - } - - @Test - def testDescribeGroupConfigSynonymsWithoutBrokerSynonym(): Unit = { - client = createAdminClient - val group = "synonym-no-broker-test-group" - val groupResource = new ConfigResource(ConfigResource.Type.GROUP, group) - val describeOptions = new DescribeConfigsOptions().includeSynonyms(true) - - def synonymsList(entry: ConfigEntry): List[(String, ConfigSource)] = - entry.synonyms.asScala.map(s => (s.name, s.source)).toList - - // Verify default config only, no group override, config has no broker synonym. - // Expected chain: DEFAULT_CONFIG - var configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) - var shareAutoOffsetReset = configs.get(groupResource).get(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG) - assertEquals(GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT, shareAutoOffsetReset.value) - assertEquals(ConfigSource.DEFAULT_CONFIG, shareAutoOffsetReset.source) - assertEquals(List( - (GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, ConfigSource.DEFAULT_CONFIG) - ), synonymsList(shareAutoOffsetReset)) - - // Set group override, verify synonyms use group config name (no broker synonym). - // Expected chain: DYNAMIC_GROUP_CONFIG → DEFAULT_CONFIG - val groupAlterOps = util.List.of( - new AlterConfigOp(new ConfigEntry(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "earliest"), AlterConfigOp.OpType.SET) - ) - client.incrementalAlterConfigs(util.Map.of(groupResource, groupAlterOps)).all.get(15, TimeUnit.SECONDS) - ensureConsistentKRaftMetadata() - - configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) - shareAutoOffsetReset = configs.get(groupResource).get(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG) - assertEquals("earliest", shareAutoOffsetReset.value) - assertEquals(ConfigSource.DYNAMIC_GROUP_CONFIG, shareAutoOffsetReset.source) - assertEquals(List( - (GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, ConfigSource.DYNAMIC_GROUP_CONFIG), - (GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, ConfigSource.DEFAULT_CONFIG) - ), synonymsList(shareAutoOffsetReset)) - } - - @Test - def testDescribeGroupConfigSynonymsWithStaticBrokerConfig(): Unit = { - // This test requires group.consumer.assignment.interval.ms=2000 in static broker config, - // configured via BaseAdminIntegrationTest.modifyConfigs. - client = createAdminClient - val group = "synonym-static-test-group" - val groupResource = new ConfigResource(ConfigResource.Type.GROUP, group) - val brokerDefaultResource = new ConfigResource(ConfigResource.Type.BROKER, "") - val describeOptions = new DescribeConfigsOptions().includeSynonyms(true) - - def synonymsList(entry: ConfigEntry): List[(String, ConfigSource)] = - entry.synonyms.asScala.map(s => (s.name, s.source)).toList - - // Verify static broker config is reflected in synonyms. - // Expected chain: STATIC_BROKER_CONFIG → DEFAULT_CONFIG - var configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) - var assignInterval = configs.get(groupResource).get(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) - assertEquals("2000", assignInterval.value) - assertEquals(ConfigSource.STATIC_BROKER_CONFIG, assignInterval.source) - assertEquals(List( - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.STATIC_BROKER_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DEFAULT_CONFIG) - ), synonymsList(assignInterval)) - - // Set group override, verify it takes precedence over static broker config. - // Expected chain: DYNAMIC_GROUP_CONFIG → STATIC_BROKER_CONFIG → DEFAULT_CONFIG - val groupAlterOps = util.List.of( - new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "3000"), AlterConfigOp.OpType.SET) - ) - client.incrementalAlterConfigs(util.Map.of(groupResource, groupAlterOps)).all.get(15, TimeUnit.SECONDS) - ensureConsistentKRaftMetadata() - - configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) - assignInterval = configs.get(groupResource).get(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) - assertEquals("3000", assignInterval.value) - assertEquals(ConfigSource.DYNAMIC_GROUP_CONFIG, assignInterval.source) - assertEquals(List( - (GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_GROUP_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.STATIC_BROKER_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DEFAULT_CONFIG) - ), synonymsList(assignInterval)) - - // Add dynamic default broker config. - // Expected chain: DYNAMIC_GROUP_CONFIG → DYNAMIC_DEFAULT_BROKER_CONFIG → STATIC_BROKER_CONFIG → DEFAULT_CONFIG - val dynamicAlterOps = util.List.of( - new AlterConfigOp(new ConfigEntry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "4000"), AlterConfigOp.OpType.SET) - ) - client.incrementalAlterConfigs(util.Map.of(brokerDefaultResource, dynamicAlterOps)).all.get(15, TimeUnit.SECONDS) - ensureConsistentKRaftMetadata() - - configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) - assignInterval = configs.get(groupResource).get(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) - assertEquals("3000", assignInterval.value) - assertEquals(ConfigSource.DYNAMIC_GROUP_CONFIG, assignInterval.source) - assertEquals(List( - (GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_GROUP_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.STATIC_BROKER_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DEFAULT_CONFIG) - ), synonymsList(assignInterval)) - - // Add per-broker dynamic config, verify full synonym chain with all five layers. - // Expected chain: DYNAMIC_GROUP_CONFIG → DYNAMIC_BROKER_CONFIG → DYNAMIC_DEFAULT_BROKER_CONFIG → STATIC_BROKER_CONFIG → DEFAULT_CONFIG - val perBrokerProps = new Properties - perBrokerProps.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "5000") - TestUtils.incrementalAlterConfigs(brokers, client, perBrokerProps, perBrokerConfig = true) - .all.get(15, TimeUnit.SECONDS) - ensureConsistentKRaftMetadata() - - configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) - assignInterval = configs.get(groupResource).get(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) - assertEquals("3000", assignInterval.value) - assertEquals(ConfigSource.DYNAMIC_GROUP_CONFIG, assignInterval.source) - assertEquals(List( - (GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_GROUP_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_BROKER_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.STATIC_BROKER_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DEFAULT_CONFIG) - ), synonymsList(assignInterval)) - - // Delete group override, verify synonyms fall back to per-broker dynamic config. - // Expected chain: DYNAMIC_BROKER_CONFIG → DYNAMIC_DEFAULT_BROKER_CONFIG → STATIC_BROKER_CONFIG → DEFAULT_CONFIG - val groupDeleteOps = util.List.of( - new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, ""), AlterConfigOp.OpType.DELETE) - ) - client.incrementalAlterConfigs(util.Map.of(groupResource, groupDeleteOps)).all.get(15, TimeUnit.SECONDS) - ensureConsistentKRaftMetadata() - - configs = client.describeConfigs(util.List.of(groupResource), describeOptions).all.get(15, TimeUnit.SECONDS) - assignInterval = configs.get(groupResource).get(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) - assertEquals("5000", assignInterval.value) - assertEquals(ConfigSource.DYNAMIC_BROKER_CONFIG, assignInterval.source) - assertEquals(List( - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_BROKER_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.STATIC_BROKER_CONFIG), - (GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, ConfigSource.DEFAULT_CONFIG) - ), synonymsList(assignInterval)) - } - @Test def testCreatePartitions(): Unit = { client = createAdminClient From 82ae65947c79bf3b87fd0e8f641916c3d4b6e425 Mon Sep 17 00:00:00 2001 From: majialong Date: Tue, 31 Mar 2026 02:14:25 +0800 Subject: [PATCH 9/9] Fix merge conflicts --- .../kafka/server/config/DefaultSupportedConfigChecker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java b/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java index 301d9e1d6bfec..1c3976750ddeb 100644 --- a/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java +++ b/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java @@ -65,7 +65,7 @@ public DefaultSupportedConfigChecker() { ConfigResource.Type.TOPIC, new SetContainsPredicate(new HashSet<>(LogConfig.configNames())), ConfigResource.Type.BROKER, ignore -> true, ConfigResource.Type.CLIENT_METRICS, new SetContainsPredicate(ClientMetricsConfigs.configDef().names()), - ConfigResource.Type.GROUP, new SetContainsPredicate(GroupConfig.configDef().names()) + ConfigResource.Type.GROUP, new SetContainsPredicate(GroupConfig.CONFIG_DEF.names()) ); }