Skip to content

KAFKA-20289: Fix DescribeConfigs to correctly resolve broker synonyms for group configs#21855

Open
majialoong wants to merge 6 commits intoapache:trunkfrom
majialoong:KAFKA-20289
Open

KAFKA-20289: Fix DescribeConfigs to correctly resolve broker synonyms for group configs#21855
majialoong wants to merge 6 commits intoapache:trunkfrom
majialoong:KAFKA-20289

Conversation

@majialoong
Copy link
Copy Markdown
Contributor

createGroupConfigEntry used the group config name (e.g. consumer.session.timeout.ms) directly
to look up broker synonyms, which could produce incorrect synonym chains.

This change prepends GROUP_PREFIX to derive the correct broker config name
(e.g. group.consumer.session.timeout.ms) for synonym lookup, and validates with
configKeys().containsKey() to fall back to GroupConfig defaults for configs without
broker-level counterparts (e.g. share.auto.offset.reset).

It also reads current broker config values via KafkaConfig.get() instead of startup-frozen
defaults, so that dynamically updated broker configs are correctly reflected.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker group-coordinator labels Mar 23, 2026
@majialoong
Copy link
Copy Markdown
Contributor Author

Below is the output of kafka-configs.sh --describe for a consumer group after the fix:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --describe --entity-name my-group --all                                                                                                                
All configs for group my-group are:
  consumer.assignment.interval.ms=1555 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.assignment.interval.ms=1555, DYNAMIC_BROKER_CONFIG:group.consumer.assignment.interval.ms=1333, STATIC_BROKER_CONFIG:group.consumer.assignment.interval.ms=1111, DEFAULT_CONFIG:group.consumer.assignment.interval.ms=1000}
  consumer.assignor.offload.enable=true sensitive=false synonyms={DEFAULT_CONFIG:group.consumer.assignor.offload.enable=true}
  consumer.heartbeat.interval.ms=5555 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.heartbeat.interval.ms=5555, DEFAULT_CONFIG:group.consumer.heartbeat.interval.ms=5000}
  consumer.session.timeout.ms=46000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=46000, STATIC_BROKER_CONFIG:group.consumer.session.timeout.ms=47000, DEFAULT_CONFIG:group.consumer.session.timeout.ms=45000}
  share.assignment.interval.ms=1999 sensitive=false synonyms={STATIC_BROKER_CONFIG:group.share.assignment.interval.ms=1999, DEFAULT_CONFIG:group.share.assignment.interval.ms=1000}
  share.assignor.offload.enable=true sensitive=false synonyms={DEFAULT_CONFIG:group.share.assignor.offload.enable=true}
  share.auto.offset.reset=earliest sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:share.auto.offset.reset=earliest, DEFAULT_CONFIG:share.auto.offset.reset=latest}
  share.delivery.count.limit=5 sensitive=false synonyms={DEFAULT_CONFIG:group.share.delivery.count.limit=5}
  share.heartbeat.interval.ms=5000 sensitive=false synonyms={DEFAULT_CONFIG:group.share.heartbeat.interval.ms=5000}
  share.isolation.level=read_uncommitted sensitive=false synonyms={DEFAULT_CONFIG:share.isolation.level=read_uncommitted}
  share.partition.max.record.locks=2000 sensitive=false synonyms={DEFAULT_CONFIG:group.share.partition.max.record.locks=2000}
  share.record.lock.duration.ms=30000 sensitive=false synonyms={DEFAULT_CONFIG:group.share.record.lock.duration.ms=30000}
  share.renew.acknowledge.enable=true sensitive=false synonyms={DEFAULT_CONFIG:share.renew.acknowledge.enable=true}
  share.session.timeout.ms=45000 sensitive=false synonyms={DEFAULT_CONFIG:group.share.session.timeout.ms=45000}
  streams.assignment.interval.ms=1000 sensitive=false synonyms={DEFAULT_CONFIG:group.streams.assignment.interval.ms=1000}
  streams.assignor.offload.enable=true sensitive=false synonyms={DEFAULT_CONFIG:group.streams.assignor.offload.enable=true}
  streams.heartbeat.interval.ms=5000 sensitive=false synonyms={DEFAULT_CONFIG:group.streams.heartbeat.interval.ms=5000}
  streams.initial.rebalance.delay.ms=3000 sensitive=false synonyms={DEFAULT_CONFIG:group.streams.initial.rebalance.delay.ms=3000}
  streams.num.standby.replicas=0 sensitive=false synonyms={DEFAULT_CONFIG:group.streams.num.standby.replicas=0}
  streams.session.timeout.ms=45000 sensitive=false synonyms={DEFAULT_CONFIG:group.streams.session.timeout.ms=45000}
  streams.task.offset.interval.ms=60000 sensitive=false synonyms={DEFAULT_CONFIG:group.streams.task.offset.interval.ms=60000}

@dajac dajac self-requested a review March 23, 2026 18:10
@dajac
Copy link
Copy Markdown
Member

dajac commented Mar 23, 2026

@majialoong Thanks for the patch! Could you please attach the output before the patch too?

STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG
);

public static final String GROUP_PREFIX = "group.";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned about using an implicit string prefix to connect to server-level configurations. It can be error-prone and less flexible. Could we follow the pattern of ALL_TOPIC_CONFIG_SYNONYMS and introduce something like ALL_GROUP_CONFIG_SYNONYMS

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. Both approaches are valid.

I chose the prefix approach because Group configs have a uniform naming convention — every broker config name is "group." + name, with no exceptions. Given the current convention, future group configs would likely follow the same naming rule as well, so new configs wouldn't require maintaining a separate mapping table.

Topic needs ALL_TOPIC_CONFIG_SYNONYMS because its naming is irregular — different prefixes (log., log.cleaner.), different names (flush.messages → log.flush.interval.messages), and unit conversions. Group configs don't have this complexity.

I'm also open to introducing ALL_GROUP_CONFIG_SYNONYMS if we think the explicitness of a mapping table outweighs the cost of maintaining it for each new group config.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose the prefix approach because Group configs have a uniform naming convention — every broker config name is "group." + name, with no exceptions. Given the current convention, future group configs would likely follow the same naming rule as well, so new configs wouldn't require maintaining a separate mapping table.

My primary reservation is that predicating this logic on the group. prefix introduces a tacit constraint that is highly susceptible to oversight by future developers. For example, in #21801 (comment), there was an attempt to add "group.streams.max.num.warmup.replicas" and "streams.max.warmup.replicas", which directly contravenes the exact mapping convention assumed in this PR.

I'm amenable to adopting the group. prefix approach, but would you mind adding a unit test to codify this consistency? we could leverage reflection to cross-reference all configs between GroupConfig and GroupCoordinatorConfig, coorborating that the mapping always holds.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I've added testGroupConfigBrokerSynonymNamingConvention in GroupCoordinatorConfigTest.

It iterates all GroupConfig entries and verifies each has a corresponding broker config named GROUP_PREFIX + groupConfigName. Entries without broker-level synonyms are maintained in an explicit exclusion list, and a reverse check ensures the exclusion list stays accurate.

Any deviation from the naming convention will fail CI with a clear message.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also a bit on the fence here. I think that having an explicit mapping is better and less error prone. We could have ALL_GROUP_CONFIG_SYNONYMS, as suggested by @chia7712, in GroupConfigs. I would also add a test to ensure that all configs have an entry in the Map to ensure that future configs are considered.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ll introduce an explicit mapping for this and add a test to ensure all group configs are covered.

@github-actions github-actions bot removed the triage PRs from the community label Mar 24, 2026
@majialoong
Copy link
Copy Markdown
Contributor Author

@majialoong Thanks for the patch! Could you please attach the output before the patch too?

@dajac Thanks for the review. Here are the before/after comparisons for kafka-configs.sh --describe across different scenarios, using consumer.assignment.interval.ms as the example config:

  1. Invalid broker param in server.properties

Adding consumer.assignment.interval.ms=1111 (missing group. prefix) to server.properties.

Before:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
  consumer.assignment.interval.ms=1000 sensitive=false synonyms={DEFAULT_CONFIG:consumer.assignment.interval.ms=1000, STATIC_BROKER_CONFIG:consumer.assignment.interval.ms=1111}
  ...

After:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
  consumer.assignment.interval.ms=1000 sensitive=false synonyms={DEFAULT_CONFIG:group.consumer.assignment.interval.ms=1000}
  ...

The invalid config in server.properties no longer produces a spurious STATIC_BROKER_CONFIG synonym.


  1. Valid broker param in server.properties

Adding group.consumer.assignment.interval.ms=1222 to server.properties.

Before:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
  consumer.assignment.interval.ms=1000 sensitive=false synonyms={DEFAULT_CONFIG:consumer.assignment.interval.ms=1000}
  ...

After:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
  consumer.assignment.interval.ms=1222 sensitive=false synonyms={STATIC_BROKER_CONFIG:group.consumer.assignment.interval.ms=1222, DEFAULT_CONFIG:group.consumer.assignment.interval.ms=1000}
  ...

The broker override is now correctly reflected in both the displayed value and the synonym chain.


  1. Group-level override only

No server.properties changes. Setting group config via:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type groups --entity-name test-group --add-config consumer.assignment.interval.ms=1333

Before:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe -all
All configs for group test-group are:
  consumer.assignment.interval.ms=1333 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.assignment.interval.ms=1333}
  ...

After:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
  consumer.assignment.interval.ms=1333 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.assignment.interval.ms=1333, DEFAULT_CONFIG:group.consumer.assignment.interval.ms=1000}
  ...

The DEFAULT_CONFIG synonym with the correct broker config name is now present.


  1. Valid broker param + group-level override

Adding group.consumer.assignment.interval.ms=1222 to server.properties,

then setting group config via:
bin/kafka-configs.sh --alter --entity-type groups --entity-name test-group --add-config consumer.assignment.interval.ms=1333

Before:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
  consumer.assignment.interval.ms=1333 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.assignment.interval.ms=1333}
  ...

After:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
  consumer.assignment.interval.ms=1333 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.assignment.interval.ms=1333, STATIC_BROKER_CONFIG:group.consumer.assignment.interval.ms=1222, DEFAULT_CONFIG:group.consumer.assignment.interval.ms=1000}
  ...

The full synonym chain DYNAMIC_GROUP_CONFIG → STATIC_BROKER_CONFIG → DEFAULT_CONFIG is now displayed correctly.


  1. Valid broker param + group-level override + dynamic broker config

Adding group.consumer.assignment.interval.ms=1222 to server.properties,

then:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-default --add-config group.consumer.assignment.interval.ms=1444

bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type groups --entity-name test-group --add-config consumer.assignment.interval.ms=1333

Before:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
  consumer.assignment.interval.ms=1333 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.assignment.interval.ms=1333}
  ...

After:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
  consumer.assignment.interval.ms=1333 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.assignment.interval.ms=1333, DYNAMIC_DEFAULT_BROKER_CONFIG:group.consumer.assignment.interval.ms=1444, STATIC_BROKER_CONFIG:group.consumer.assignment.interval.ms=1222, DEFAULT_CONFIG:group.consumer.assignment.interval.ms=1000}
  ...

The complete synonym chain DYNAMIC_GROUP_CONFIG → DYNAMIC_DEFAULT_BROKER_CONFIG → STATIC_BROKER_CONFIG → DEFAULT_CONFIG is now displayed correctly.


  1. Config without broker synonym (share.auto.offset.reset)

Setting group config via:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type groups --entity-name test-group --add-config share.auto.offset.reset=latest

Before:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
  ...
  share.auto.offset.reset=latest sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:share.auto.offset.reset=latest}
  ...

After:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type groups --entity-name test-group --describe --all
All configs for group test-group are:
  ...
  share.auto.offset.reset=latest sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:share.auto.offset.reset=latest, DEFAULT_CONFIG:share.auto.offset.reset=latest}
  ...

Configs without a broker-level synonym correctly fall back to GroupConfig defaults.

@github-actions github-actions bot added the tools label Mar 24, 2026
@dajac
Copy link
Copy Markdown
Member

dajac commented Mar 27, 2026

@majialoong Thanks for all the cases! Do we have test cases for each of them?

val allSynonyms = {
val list = configSynonyms(name, allNames, isSensitive)
val brokerConfigName = GroupCoordinatorConfig.GROUP_PREFIX + name
val list = if (KafkaConfig.configDef.configKeys().containsKey(brokerConfigName))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we just remove configDef() and make CONFIG public?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, is it GroupConfig.configDef() you'd like removed (making CONFIG public)?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

else
// No broker synonym, fall back to GroupConfig defaults
Option(GroupConfig.configDef().defaultValues().get(name))
.map(v => List(new DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we put .setName on new line too?

val allSynonyms = {
val list = configSynonyms(name, allNames, isSensitive)
val brokerConfigName = GroupCoordinatorConfig.GROUP_PREFIX + name
val list = if (KafkaConfig.configDef.configKeys().containsKey(brokerConfigName))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are here, would it be possible to not return internal configs? We don't have any at the moment but it would be good to cover it as may have one coming soon.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for internal configs, createResponseConfig already uses AbstractConfig.nonInternalValues() to filter them out, so future internal group configs would be excluded automatically.

/**
* Build the current effective group config defaults by reading from KafkaConfig.
*/
private def currentGroupConfigDefaults: java.util.Map[String, Object] = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should rather place this one in KafkaConfig. What do you think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I'll move it to KafkaConfig and name it extractGroupConfigMap to be consistent with extractLogConfigMap.

@majialoong
Copy link
Copy Markdown
Contributor Author

@majialoong Thanks for all the cases! Do we have test cases for each of them?

@dajac The current testDescribeConfigsGroupSynonyms covers some scenarios. I'll add more test cases for the remaining ones and split them into separate methods so each test focuses on a single synonym chain pattern.

@dajac
Copy link
Copy Markdown
Member

dajac commented Mar 27, 2026

@majialoong Thanks for all the cases! Do we have test cases for each of them?

@dajac The current testDescribeConfigsGroupSynonyms covers some scenarios. I'll add more test cases for the remaining ones and split them into separate methods so each test focuses on a single synonym chain pattern.

Sounds good. I wonder whether we could add integration tests too. What do you think?

GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG,
GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG,
GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG
);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest a different approach. Should we make ALL_GROUP_CONFIG_SYNONYMS a Map<String, Optional<String>> and ensure that all configs have an entry in the map? Those three could use Optional.empty() to signal that they have no synonyms.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I'll update it.

}

@Test
def testDescribeConfigsGroupSynonyms(): Unit = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we cover all the cases from your comment? It may be worth splitting it into smaller test cases.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also add them as integration tests. What do you think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I'll split it into smaller test cases for each scenario and add corresponding integration tests as well.

logProps
}

def extractGroupConfigMap: java.util.Map[String, Object] = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please move this to AbstractKafkaConfig? We are trying to phase out KafkaConfig, so it would be better to avoid adding new code to it

configSynonyms(brokerConfigName, brokerSynonyms(brokerConfigName), isSensitive)
else
// No broker synonym, fall back to GroupConfig defaults
Option(GroupConfig.CONFIG_DEF.defaultValues().get(name))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we apply this fallback to topic?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants