-
Notifications
You must be signed in to change notification settings - Fork 15.1k
KAFKA-20289: Fix DescribeConfigs to correctly resolve broker synonyms for group configs #21855
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
001916d
b39fe33
03c9bf2
8162ded
e0b1c99
04dfa51
9a7bfd0
f443050
9936230
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -157,7 +157,7 @@ public final class GroupConfig extends AbstractConfig { | |
|
|
||
| public final boolean shareRenewAcknowledgeEnable; | ||
|
|
||
| private static final ConfigDef CONFIG = new ConfigDef() | ||
| public static final ConfigDef CONFIG_DEF = new ConfigDef() | ||
| .define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, | ||
| INT, | ||
| GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, | ||
|
|
@@ -281,8 +281,41 @@ public final class GroupConfig extends AbstractConfig { | |
| MEDIUM, | ||
| GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC); | ||
|
|
||
| /** | ||
| * Mapping from GroupConfig name to its broker-level synonym config name. | ||
| * {@code Optional.empty()} indicates that the config has no broker-level synonym. | ||
| */ | ||
| public static final Map<String, Optional<String>> ALL_GROUP_CONFIG_SYNONYMS = Map.ofEntries( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't we use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good suggestion. I checked, but |
||
| // Consumer group configs | ||
| Map.entry(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)), | ||
| Map.entry(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)), | ||
| Map.entry(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)), | ||
| Map.entry(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)), | ||
|
|
||
| // Share group configs | ||
| Map.entry(SHARE_SESSION_TIMEOUT_MS_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)), | ||
| Map.entry(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)), | ||
| Map.entry(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, Optional.of(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG)), | ||
| Map.entry(SHARE_DELIVERY_COUNT_LIMIT_CONFIG, Optional.of(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG)), | ||
| Map.entry(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, Optional.of(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG)), | ||
| Map.entry(SHARE_AUTO_OFFSET_RESET_CONFIG, Optional.empty()), | ||
| Map.entry(SHARE_ISOLATION_LEVEL_CONFIG, Optional.empty()), | ||
| Map.entry(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, Optional.empty()), | ||
| Map.entry(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)), | ||
| Map.entry(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)), | ||
|
|
||
| // Streams group configs | ||
| Map.entry(STREAMS_SESSION_TIMEOUT_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG)), | ||
| Map.entry(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)), | ||
| Map.entry(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG)), | ||
| Map.entry(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)), | ||
| Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)), | ||
| Map.entry(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)), | ||
| Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG)) | ||
| ); | ||
|
|
||
| public GroupConfig(Map<?, ?> props) { | ||
| super(CONFIG, props, false); | ||
| super(CONFIG_DEF, props, false); | ||
| this.consumerSessionTimeoutMs = getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG); | ||
| this.consumerHeartbeatIntervalMs = getInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG); | ||
| // These have to be optionals because their default group coordinator configs are dynamic, | ||
|
|
@@ -327,16 +360,12 @@ public GroupConfig(Map<?, ?> props) { | |
| this.shareRenewAcknowledgeEnable = getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG); | ||
| } | ||
|
|
||
| public static ConfigDef configDef() { | ||
| return CONFIG; | ||
| } | ||
|
|
||
| public static Optional<Type> configType(String configName) { | ||
| return Optional.ofNullable(CONFIG.configKeys().get(configName)).map(c -> c.type); | ||
| return Optional.ofNullable(CONFIG_DEF.configKeys().get(configName)).map(c -> c.type); | ||
| } | ||
|
|
||
| public static Set<String> configNames() { | ||
| return CONFIG.names(); | ||
| return CONFIG_DEF.names(); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -356,7 +385,7 @@ public static void validateNames(Map<String, ?> props) { | |
| */ | ||
| @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"}) | ||
| private static void validateValues(Map<String, Object> unparsedMap, GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) { | ||
| Map<String, Object> valueMaps = CONFIG.parse(unparsedMap); | ||
| Map<String, Object> valueMaps = CONFIG_DEF.parse(unparsedMap); | ||
| int consumerHeartbeatInterval = (Integer) valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG); | ||
| int consumerSessionTimeout = (Integer) valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG); | ||
| int consumerAssignmentIntervalMs = (Integer) valueMaps.get(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG); | ||
|
|
@@ -899,6 +928,6 @@ public boolean shareRenewAcknowledgeEnable() { | |
| } | ||
|
|
||
| public static void main(String[] args) { | ||
| System.out.println(CONFIG.toHtml(4, config -> "groupconfigs_" + config)); | ||
| System.out.println(CONFIG_DEF.toHtml(4, config -> "groupconfigs_" + config)); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we apply this fallback to topic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Applying the fallback to topic also makes sense. I’ll follow up with a separate JIRA.