Skip to content

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void testInternalConfigsDoNotReturnForDescribeConfigs(ClusterInstance clu

// test for case ConfigResource.Type == GROUP
Config groupConfig = configResourceMap.get(groupResource);
assertNotContainsAnyInternalConfig(groupConfig, GroupConfig.configDef().configKeys());
assertNotContainsAnyInternalConfig(groupConfig, GroupConfig.CONFIG_DEF.configKeys());

// test for case ConfigResource.Type == CLIENT_METRICS
Config clientMetricsConfig = configResourceMap.get(clientMetricsResource);
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ object ConfigCommand extends Logging {
"For entity-type '" + ClientType + "': " + QuotaConfig.userAndClientQuotaConfigs().names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + IpType + "': " + QuotaConfig.ipConfigs.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ClientMetricsType + "': " + ClientMetricsConfigs.configDef().names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + GroupType + "': " + GroupConfig.configDef().names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + GroupType + "': " + GroupConfig.CONFIG_DEF.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
s"Entity types '$UserType' and '$ClientType' may be specified together to update config for clients of a specific user.")
.withRequiredArg
.ofType(classOf[String])
Expand Down
18 changes: 13 additions & 5 deletions core/src/main/scala/kafka/server/ConfigHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
throw new InvalidRequestException("Group name must not be empty")
} else {
val groupProps = configRepository.groupConfig(group)
val groupConfig = GroupConfig.fromProps(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig), groupProps)
val groupConfig = GroupConfig.fromProps(config.extractGroupConfigMap, groupProps)
createResponseConfig(resource, groupConfig, createGroupConfigEntry(groupConfig, groupProps, includeSynonyms, includeDocumentation)(_, _))
}

Expand Down Expand Up @@ -161,15 +161,23 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo

private def createGroupConfigEntry(groupConfig: GroupConfig, groupProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean)
(name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = {
val allNames = brokerSynonyms(name)
val configEntryType = GroupConfig.configType(name).toScala
val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType.orNull)
val allSynonyms = {
val list = configSynonyms(name, allNames, isSensitive)
val brokerConfigName = GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.get(name)
val list = if (brokerConfigName != null && brokerConfigName.isPresent)
configSynonyms(brokerConfigName.get, brokerSynonyms(brokerConfigName.get), isSensitive)
else
// No broker synonym, fall back to GroupConfig defaults
Option(GroupConfig.CONFIG_DEF.defaultValues().get(name))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we apply this fallback to topic?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applying the fallback to topic also makes sense. I’ll follow up with a separate JIRA.

.map(v => List(new DescribeConfigsResponseData.DescribeConfigsSynonym()
.setName(name)
.setValue(if (isSensitive) null else ConfigDef.convertToString(v, configEntryType.orNull))
.setSource(ConfigSource.DEFAULT_CONFIG.id)))
.getOrElse(List.empty)
if (!groupProps.containsKey(name))
new DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(valueAsString)
.setSource(ConfigSource.DEFAULT_CONFIG.id) +: list
list
else
new DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(valueAsString)
.setSource(ConfigSource.GROUP_CONFIG.id) +: list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public final class GroupConfig extends AbstractConfig {

public final boolean shareRenewAcknowledgeEnable;

private static final ConfigDef CONFIG = new ConfigDef()
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
INT,
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
Expand Down Expand Up @@ -281,8 +281,41 @@ public final class GroupConfig extends AbstractConfig {
MEDIUM,
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC);

/**
* Mapping from GroupConfig name to its broker-level synonym config name.
* {@code Optional.empty()} indicates that the config has no broker-level synonym.
*/
public static final Map<String, Optional<String>> ALL_GROUP_CONFIG_SYNONYMS = Map.ofEntries(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we use Map.of?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I checked, but Map.of() only supports up to 10 key-value pairs. Since ALL_GROUP_CONFIG_SYNONYMS has more than 10 entries, Map.ofEntries() is the appropriate option here.

// Consumer group configs
Map.entry(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
Map.entry(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
Map.entry(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
Map.entry(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),

// Share group configs
Map.entry(SHARE_SESSION_TIMEOUT_MS_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
Map.entry(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
Map.entry(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, Optional.of(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG)),
Map.entry(SHARE_DELIVERY_COUNT_LIMIT_CONFIG, Optional.of(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG)),
Map.entry(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, Optional.of(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG)),
Map.entry(SHARE_AUTO_OFFSET_RESET_CONFIG, Optional.empty()),
Map.entry(SHARE_ISOLATION_LEVEL_CONFIG, Optional.empty()),
Map.entry(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, Optional.empty()),
Map.entry(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
Map.entry(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),

// Streams group configs
Map.entry(STREAMS_SESSION_TIMEOUT_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
Map.entry(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
Map.entry(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG)),
Map.entry(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)),
Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
Map.entry(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG))
);

public GroupConfig(Map<?, ?> props) {
super(CONFIG, props, false);
super(CONFIG_DEF, props, false);
this.consumerSessionTimeoutMs = getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
this.consumerHeartbeatIntervalMs = getInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
// These have to be optionals because their default group coordinator configs are dynamic,
Expand Down Expand Up @@ -327,16 +360,12 @@ public GroupConfig(Map<?, ?> props) {
this.shareRenewAcknowledgeEnable = getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
}

public static ConfigDef configDef() {
return CONFIG;
}

public static Optional<Type> configType(String configName) {
return Optional.ofNullable(CONFIG.configKeys().get(configName)).map(c -> c.type);
return Optional.ofNullable(CONFIG_DEF.configKeys().get(configName)).map(c -> c.type);
}

public static Set<String> configNames() {
return CONFIG.names();
return CONFIG_DEF.names();
}

/**
Expand All @@ -356,7 +385,7 @@ public static void validateNames(Map<String, ?> props) {
*/
@SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
private static void validateValues(Map<String, Object> unparsedMap, GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) {
Map<String, Object> valueMaps = CONFIG.parse(unparsedMap);
Map<String, Object> valueMaps = CONFIG_DEF.parse(unparsedMap);
int consumerHeartbeatInterval = (Integer) valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
int consumerSessionTimeout = (Integer) valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
int consumerAssignmentIntervalMs = (Integer) valueMaps.get(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG);
Expand Down Expand Up @@ -899,6 +928,6 @@ public boolean shareRenewAcknowledgeEnable() {
}

public static void main(String[] args) {
System.out.println(CONFIG.toHtml(4, config -> "groupconfigs_" + config));
System.out.println(CONFIG_DEF.toHtml(4, config -> "groupconfigs_" + config));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import org.junit.jupiter.params.provider.MethodSource;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Stream;

import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT;
Expand Down Expand Up @@ -637,6 +639,34 @@ public void testEvaluateMinBoundedValueBelowMinIsCapped(
assertEquals(expectedMin, result.get(key));
}

@Test
public void testAllGroupConfigSynonyms() {
// Every GroupConfig entry should have an entry in ALL_GROUP_CONFIG_SYNONYMS.
for (String groupConfigName : GroupConfig.CONFIG_DEF.names()) {
assertTrue(GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.containsKey(groupConfigName),
"GroupConfig entry '" + groupConfigName + "' is not in ALL_GROUP_CONFIG_SYNONYMS. " +
"Add it with Optional.of(brokerConfigName) or Optional.empty() if it has no broker synonym.");
}

// Every key in ALL_GROUP_CONFIG_SYNONYMS should be a valid GroupConfig entry.
for (String key : GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.keySet()) {
assertTrue(GroupConfig.CONFIG_DEF.names().contains(key),
"ALL_GROUP_CONFIG_SYNONYMS contains '" + key + "' which is not a valid GroupConfig entry.");
}

// Every present synonym mapping should point to a valid broker config.
Set<String> brokerConfigNames = new HashSet<>();
brokerConfigNames.addAll(GroupCoordinatorConfig.CONFIG_DEF.names());
brokerConfigNames.addAll(ShareGroupConfig.CONFIG_DEF.names());

for (Map.Entry<String, Optional<String>> entry : GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.entrySet()) {
entry.getValue().ifPresent(brokerConfigName ->
assertTrue(brokerConfigNames.contains(brokerConfigName),
"ALL_GROUP_CONFIG_SYNONYMS maps '" + entry.getKey() + "' to '" +
brokerConfigName + "' but this broker config does not exist."));
}
}

private Map<String, String> createValidGroupConfig() {
Map<String, String> props = new HashMap<>();
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "45000");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,4 +627,12 @@ public Long logRetentionTimeMillis() {

return millis < 0 ? Long.valueOf(-1) : millis;
}

public Map<String, Object> extractGroupConfigMap() {
Map<String, Object> defaults = new HashMap<>();
GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.forEach((groupConfigName, brokerConfigName) ->
brokerConfigName.ifPresent(name -> defaults.put(groupConfigName, get(name)))
);
return defaults;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ public void testNullStatusOnKraftCommandAlterGroup() {
"--describe"));
message = captureStandardOut(run(command));
assertTrue(message.contains("Dynamic configs for group group are:"));
assertTrue(message.contains("consumer.session.timeout.ms=50000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000}"));
assertTrue(message.contains("consumer.session.timeout.ms=50000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000, DEFAULT_CONFIG:group.consumer.session.timeout.ms=45000}"));

command = Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "groups",
"--entity-name", "group",
"--describe"));
message = captureStandardOut(run(command));
assertTrue(message.contains("Dynamic configs for group group are:"));
assertTrue(message.contains("consumer.session.timeout.ms=50000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000}"));
assertTrue(message.contains("consumer.session.timeout.ms=50000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000, DEFAULT_CONFIG:group.consumer.session.timeout.ms=45000}"));
}

@ClusterTest(serverProperties = {
Expand Down Expand Up @@ -293,10 +293,10 @@ public void testDescribeStreamsGroupConfigs() {
"--describe", "--all"));
String message = captureStandardOut(run(command));

assertTrue(message.contains("streams.heartbeat.interval.ms=5000 sensitive=false synonyms={DEFAULT_CONFIG:streams.heartbeat.interval.ms=5000}"));
assertTrue(message.contains("streams.num.standby.replicas=0 sensitive=false synonyms={DEFAULT_CONFIG:streams.num.standby.replicas=0}"));
assertTrue(message.contains("streams.session.timeout.ms=45000 sensitive=false synonyms={DEFAULT_CONFIG:streams.session.timeout.ms=45000}"));
assertTrue(message.contains("streams.task.offset.interval.ms=60000 sensitive=false synonyms={DEFAULT_CONFIG:streams.task.offset.interval.ms=60000}"));
assertTrue(message.contains("streams.heartbeat.interval.ms=5000 sensitive=false synonyms={DEFAULT_CONFIG:group.streams.heartbeat.interval.ms=5000}"));
assertTrue(message.contains("streams.num.standby.replicas=0 sensitive=false synonyms={DEFAULT_CONFIG:group.streams.num.standby.replicas=0}"));
assertTrue(message.contains("streams.session.timeout.ms=45000 sensitive=false synonyms={DEFAULT_CONFIG:group.streams.session.timeout.ms=45000}"));
assertTrue(message.contains("streams.task.offset.interval.ms=60000 sensitive=false synonyms={DEFAULT_CONFIG:group.streams.task.offset.interval.ms=60000}"));
}

@ClusterTest
Expand Down
Loading