KAFKA-20289: Fix DescribeConfigs to correctly resolve broker synonyms for group configs#21855
KAFKA-20289: Fix DescribeConfigs to correctly resolve broker synonyms for group configs#21855majialoong wants to merge 6 commits intoapache:trunkfrom
Conversation
… for group configs
|
Below is the output of |
|
@majialoong Thanks for the patch! Could you please attach the output before the patch too? |
| STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG | ||
| ); | ||
|
|
||
| public static final String GROUP_PREFIX = "group."; |
There was a problem hiding this comment.
I'm a bit concerned about using an implicit string prefix to connect to server-level configurations. It can be error-prone and less flexible. Could we follow the pattern of ALL_TOPIC_CONFIG_SYNONYMS and introduce something like ALL_GROUP_CONFIG_SYNONYMS
There was a problem hiding this comment.
Thanks for the review. Both approaches are valid.
I chose the prefix approach because Group configs have a uniform naming convention — every broker config name is "group." + name, with no exceptions. Given the current convention, future group configs would likely follow the same naming rule as well, so new configs wouldn't require maintaining a separate mapping table.
Topic needs ALL_TOPIC_CONFIG_SYNONYMS because its naming is irregular — different prefixes (log., log.cleaner.), different names (flush.messages → log.flush.interval.messages), and unit conversions. Group configs don't have this complexity.
I'm also open to introducing ALL_GROUP_CONFIG_SYNONYMS if we think the explicitness of a mapping table outweighs the cost of maintaining it for each new group config.
There was a problem hiding this comment.
I chose the prefix approach because Group configs have a uniform naming convention — every broker config name is "group." + name, with no exceptions. Given the current convention, future group configs would likely follow the same naming rule as well, so new configs wouldn't require maintaining a separate mapping table.
My primary reservation is that predicating this logic on the group. prefix introduces a tacit constraint that is highly susceptible to oversight by future developers. For example, in #21801 (comment), there was an attempt to add "group.streams.max.num.warmup.replicas" and "streams.max.warmup.replicas", which directly contravenes the exact mapping convention assumed in this PR.
I'm amenable to adopting the group. prefix approach, but would you mind adding a unit test to codify this consistency? we could leverage reflection to cross-reference all configs between GroupConfig and GroupCoordinatorConfig, coorborating that the mapping always holds.
There was a problem hiding this comment.
Thanks for the suggestion. I've added testGroupConfigBrokerSynonymNamingConvention in GroupCoordinatorConfigTest.
It iterates all GroupConfig entries and verifies each has a corresponding broker config named GROUP_PREFIX + groupConfigName. Entries without broker-level synonyms are maintained in an explicit exclusion list, and a reverse check ensures the exclusion list stays accurate.
Any deviation from the naming convention will fail CI with a clear message.
There was a problem hiding this comment.
I am also a bit on the fence here. I think that having an explicit mapping is better and less error prone. We could have ALL_GROUP_CONFIG_SYNONYMS, as suggested by @chia7712, in GroupConfigs. I would also add a test to ensure that all configs have an entry in the Map to ensure that future configs are considered.
There was a problem hiding this comment.
I’ll introduce an explicit mapping for this and add a test to ensure all group configs are covered.
@dajac Thanks for the review. Here are the before/after comparisons for
Adding Before: bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
consumer.assignment.interval.ms=1000 sensitive=false synonyms={DEFAULT_CONFIG:consumer.assignment.interval.ms=1000, STATIC_BROKER_CONFIG:consumer.assignment.interval.ms=1111}
...After: bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
consumer.assignment.interval.ms=1000 sensitive=false synonyms={DEFAULT_CONFIG:group.consumer.assignment.interval.ms=1000}
...The invalid config in
Adding Before: bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
consumer.assignment.interval.ms=1000 sensitive=false synonyms={DEFAULT_CONFIG:consumer.assignment.interval.ms=1000}
...After: bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
consumer.assignment.interval.ms=1222 sensitive=false synonyms={STATIC_BROKER_CONFIG:group.consumer.assignment.interval.ms=1222, DEFAULT_CONFIG:group.consumer.assignment.interval.ms=1000}
...The broker override is now correctly reflected in both the displayed value and the synonym chain.
No
Before: bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe -all
All configs for group test-group are:
consumer.assignment.interval.ms=1333 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.assignment.interval.ms=1333}
...After: bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
consumer.assignment.interval.ms=1333 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.assignment.interval.ms=1333, DEFAULT_CONFIG:group.consumer.assignment.interval.ms=1000}
...The
Adding then setting group config via: Before: bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
consumer.assignment.interval.ms=1333 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.assignment.interval.ms=1333}
...After: bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
consumer.assignment.interval.ms=1333 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.assignment.interval.ms=1333, STATIC_BROKER_CONFIG:group.consumer.assignment.interval.ms=1222, DEFAULT_CONFIG:group.consumer.assignment.interval.ms=1000}
...
The full synonym chain
Adding then:
Before: bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
consumer.assignment.interval.ms=1333 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.assignment.interval.ms=1333}
...After: bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
consumer.assignment.interval.ms=1333 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.assignment.interval.ms=1333, DYNAMIC_DEFAULT_BROKER_CONFIG:group.consumer.assignment.interval.ms=1444, STATIC_BROKER_CONFIG:group.consumer.assignment.interval.ms=1222, DEFAULT_CONFIG:group.consumer.assignment.interval.ms=1000}
...
The complete synonym chain
Setting group config via:
Before: bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
...
share.auto.offset.reset=latest sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:share.auto.offset.reset=latest}
...After: bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
...
share.auto.offset.reset=latest sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:share.auto.offset.reset=latest, DEFAULT_CONFIG:share.auto.offset.reset=latest}
...Configs without a broker-level synonym correctly fall back to GroupConfig defaults. |
|
@majialoong Thanks for all the cases! Do we have test cases for each of them? |
| val allSynonyms = { | ||
| val list = configSynonyms(name, allNames, isSensitive) | ||
| val brokerConfigName = GroupCoordinatorConfig.GROUP_PREFIX + name | ||
| val list = if (KafkaConfig.configDef.configKeys().containsKey(brokerConfigName)) |
There was a problem hiding this comment.
nit: Should we just remove configDef() and make CONFIG public?
There was a problem hiding this comment.
Just to confirm, is it GroupConfig.configDef() you'd like removed (making CONFIG public)?
| else | ||
| // No broker synonym, fall back to GroupConfig defaults | ||
| Option(GroupConfig.configDef().defaultValues().get(name)) | ||
| .map(v => List(new DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name) |
There was a problem hiding this comment.
nit: Should we put .setName on new line too?
| val allSynonyms = { | ||
| val list = configSynonyms(name, allNames, isSensitive) | ||
| val brokerConfigName = GroupCoordinatorConfig.GROUP_PREFIX + name | ||
| val list = if (KafkaConfig.configDef.configKeys().containsKey(brokerConfigName)) |
There was a problem hiding this comment.
While we are here, would it be possible to not return internal configs? We don't have any at the moment but it would be good to cover it as may have one coming soon.
There was a problem hiding this comment.
As for internal configs, createResponseConfig already uses AbstractConfig.nonInternalValues() to filter them out, so future internal group configs would be excluded automatically.
| /** | ||
| * Build the current effective group config defaults by reading from KafkaConfig. | ||
| */ | ||
| private def currentGroupConfigDefaults: java.util.Map[String, Object] = { |
There was a problem hiding this comment.
I wonder if we should rather place this one in KafkaConfig. What do you think?
There was a problem hiding this comment.
Makes sense. I'll move it to KafkaConfig and name it extractGroupConfigMap to be consistent with extractLogConfigMap.
@dajac The current |
Sounds good. I wonder whether we could add integration tests too. What do you think? |
| GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, | ||
| GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, | ||
| GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG | ||
| ); |
There was a problem hiding this comment.
I would suggest a different approach. Should we make ALL_GROUP_CONFIG_SYNONYMS a Map<String, Optional<String>> and ensure that all configs have an entry in the map? Those three could use Optional.empty() to signal that they have no synonyms.
There was a problem hiding this comment.
Agreed, I'll update it.
| } | ||
|
|
||
| @Test | ||
| def testDescribeConfigsGroupSynonyms(): Unit = { |
There was a problem hiding this comment.
Do we cover all the cases from your comment? It may be worth splitting it into smaller test cases.
There was a problem hiding this comment.
We could also add them as integration tests. What do you think?
There was a problem hiding this comment.
Makes sense. I'll split it into smaller test cases for each scenario and add corresponding integration tests as well.
| logProps | ||
| } | ||
|
|
||
| def extractGroupConfigMap: java.util.Map[String, Object] = { |
There was a problem hiding this comment.
Could you please move this to AbstractKafkaConfig? We are trying to phase out KafkaConfig, so it would be better to avoid adding new code to it
| configSynonyms(brokerConfigName, brokerSynonyms(brokerConfigName), isSensitive) | ||
| else | ||
| // No broker synonym, fall back to GroupConfig defaults | ||
| Option(GroupConfig.CONFIG_DEF.defaultValues().get(name)) |
There was a problem hiding this comment.
Should we apply this fallback to topic?
createGroupConfigEntryused the group config name (e.g.consumer.session.timeout.ms) directlyto look up broker synonyms, which could produce incorrect synonym chains.
This change prepends
GROUP_PREFIXto derive the correct broker config name(e.g.
group.consumer.session.timeout.ms) for synonym lookup, and validates withconfigKeys().containsKey()to fall back toGroupConfigdefaults for configs withoutbroker-level counterparts (e.g.
share.auto.offset.reset).It also reads current broker config values via
KafkaConfig.get()instead of startup-frozendefaults, so that dynamically updated broker configs are correctly reflected.