diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientConfigPushIntegrationTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientConfigPushIntegrationTest.java new file mode 100644 index 0000000000000..fee93d640df1c --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientConfigPushIntegrationTest.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ClientConfigPolicyException; +import org.apache.kafka.common.errors.UnknownConfigProfileException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.server.policy.ClientConfigPolicy; +import org.apache.kafka.server.policy.ClientConfigProfileKeys; +import org.apache.kafka.server.policy.ClientProfile; +import org.apache.kafka.server.policy.ClientPushConfigData; +import org.junit.jupiter.api.BeforeEach; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration tests for the client configuration push handshake (KIP-CFG). + *

+ * Tests the end-to-end flow: + * 1. Client sends GetConfigProfileKeys request + * 2. Broker responds with profile keys and CRC + * 3. Client sends PushConfig request with collected configs + * 4. Broker validates and processes the config + */ +@ClusterTestDefaults( + brokers = 1, + serverProperties = { + @ClusterConfigProperty(key = ServerLogConfigs.CLIENT_CONFIG_POLICY_CLASS_NAME_CONFIG, + value = "org.apache.kafka.clients.ClientConfigPushIntegrationTest$TestClientConfigPolicy"), + @ClusterConfigProperty(key = ServerLogConfigs.CLIENT_CONFIG_MAX_BYTES_CONFIG, value = "1048576") + } +) +public class ClientConfigPushIntegrationTest { + + private final ClusterInstance clusterInstance; + private TestClientConfigPolicy clientConfigPolicy; + + public ClientConfigPushIntegrationTest(ClusterInstance clusterInstance) { + this.clusterInstance = clusterInstance; + } + + @BeforeEach + public void setup() throws InterruptedException { + clusterInstance.waitForReadyBrokers(); + clientConfigPolicy = new TestClientConfigPolicy(); + } + + @ClusterTest + public void testProducerConfigPushHandshake() throws Exception { + Map producerConfig = new java.util.HashMap<>(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "test-producer-config-push"); + producerConfig.put(CommonClientConfigs.ENABLE_CONFIGS_PUSH_CONFIG, true); + + try (Producer producer = new KafkaProducer<>(producerConfig)) { + // Give time for handshake to complete + Thread.sleep(2000); + + // Verify policy was called + assertTrue(clientConfigPolicy.profileKeysCallCount.get() > 0, "profileKeys() should have been called"); + assertTrue(clientConfigPolicy.processCallCount.get() > 0, "process() should have been called"); + + // Verify we received a client profile + assertFalse(clientConfigPolicy.receivedProfiles.isEmpty(), "Should have received client profile"); + + // Verify we received configs + assertFalse(clientConfigPolicy.receivedConfigs.isEmpty(), "Should have received client configs"); + + // Verify configs were collected (should have client.id but not bootstrap.servers) + Map configs = clientConfigPolicy.receivedConfigs.values().iterator().next(); + assertTrue(configs.containsKey(ProducerConfig.CLIENT_ID_CONFIG), + "Should contain client.id config"); + assertFalse(configs.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), + "Should NOT contain bootstrap.servers (sensitive)"); + } + } + + @ClusterTest + public void testConsumerConfigPushHandshake() throws Exception { + Map consumerConfig = new java.util.HashMap<>(); + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); + consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-config-push"); + consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerConfig.put(CommonClientConfigs.ENABLE_CONFIGS_PUSH_CONFIG, true); + + try (Consumer consumer = new KafkaConsumer<>(consumerConfig)) { + // Give time for handshake to complete + Thread.sleep(2000); + + // Verify policy was called + assertTrue(clientConfigPolicy.profileKeysCallCount.get() > 0, "profileKeys() should have been called"); + assertTrue(clientConfigPolicy.processCallCount.get() > 0, "process() should have been called"); + + // Verify we received configs + assertFalse(clientConfigPolicy.receivedConfigs.isEmpty(), "Should have received client configs"); + + Map configs = clientConfigPolicy.receivedConfigs.values().iterator().next(); + assertTrue(configs.containsKey(ConsumerConfig.GROUP_ID_CONFIG), + "Should contain group.id config"); + assertTrue(configs.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), + "Should contain auto.offset.reset config"); + } + } + + @ClusterTest + public void testConfigPushWithEmptyProfile() throws Exception { + // Configure policy to return empty profile (no config keys requested) + clientConfigPolicy.returnEmptyProfile = true; + + Map producerConfig = new java.util.HashMap<>(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerConfig.put(CommonClientConfigs.ENABLE_CONFIGS_PUSH_CONFIG, true); + + try (Producer producer = new KafkaProducer<>(producerConfig)) { + // Give time for handshake to complete + Thread.sleep(2000); + + // profileKeys() should be called + assertTrue(clientConfigPolicy.profileKeysCallCount.get() > 0, "profileKeys() should have been called"); + + // But process() should NOT be called since no configs were requested + assertEquals(0, clientConfigPolicy.processCallCount.get(), "process() should NOT have been called with empty profile"); + } + } + + @ClusterTest + public void testConfigPushDisabled() throws Exception { + Map producerConfig = new java.util.HashMap<>(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + // Disable config push + producerConfig.put(CommonClientConfigs.ENABLE_CONFIGS_PUSH_CONFIG, false); + + try (Producer producer = new KafkaProducer<>(producerConfig)) { + // Give time to ensure no handshake occurs + Thread.sleep(2000); + + // Policy should NOT be called when disabled + assertEquals(0, clientConfigPolicy.profileKeysCallCount.get(), "profileKeys() should NOT be called when disabled"); + assertEquals(0, clientConfigPolicy.processCallCount.get(), "process() should NOT be called when disabled"); + } + } + + @ClusterTest + public void testClientProfileContainsMetadata() throws Exception { + Map producerConfig = new java.util.HashMap<>(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "metadata-test-producer"); + producerConfig.put(CommonClientConfigs.ENABLE_CONFIGS_PUSH_CONFIG, true); + + try (Producer producer = new KafkaProducer<>(producerConfig)) { + Thread.sleep(2000); + + assertFalse(clientConfigPolicy.receivedProfiles.isEmpty(), "Should have received client profile"); + ClientProfile profile = clientConfigPolicy.receivedProfiles.values().iterator().next(); + + assertNotNull(profile.clientInstanceId(), "Client instance ID should not be null"); + assertNotNull(profile.clientSoftwareName(), "Client software name should not be null"); + assertNotNull(profile.clientSoftwareVersion(), "Client software version should not be null"); + assertNotNull(profile.clientMetadata(), "Client metadata should not be null"); + + // Verify software name is populated (e.g., "apache-kafka-java") + assertFalse(profile.clientSoftwareName().isEmpty(), + "Client software name should not be empty"); + } + } + + @ClusterTest + public void testCrcBasedProfileChangeDetection() throws Exception { + Map producerConfig = new java.util.HashMap<>(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerConfig.put(CommonClientConfigs.ENABLE_CONFIGS_PUSH_CONFIG, true); + + try (Producer producer = new KafkaProducer<>(producerConfig)) { + Thread.sleep(2000); + + // Verify CRC was computed and is non-zero + assertTrue(clientConfigPolicy.profileKeysCallCount.get() > 0, "profileKeys() should have been called"); + + // The CRC should be deterministic based on the keys + // (We can't easily verify the exact value here, but we verified it's used) + } + } + + /** + * Test implementation of ClientConfigPolicy for integration tests. + *

+ * This policy: + * - Returns a standard set of config keys for all clients + * - Captures received profiles and configs for verification + * - Supports test scenarios via static flags + */ + public static class TestClientConfigPolicy implements ClientConfigPolicy { + + private final Map receivedProfiles = new ConcurrentHashMap<>(); + private final Map> receivedConfigs = new ConcurrentHashMap<>(); + private final AtomicInteger profileKeysCallCount = new AtomicInteger(0); + private final AtomicInteger processCallCount = new AtomicInteger(0); + private boolean returnEmptyProfile; + private boolean throwUnknownProfileOnKeys; + private boolean rejectNextPush; + + private final SortedSet standardConfigKeys = new TreeSet<>(Set.of( + "client.id", + "request.timeout.ms", + "retry.backoff.ms", + "metadata.max.age.ms", + "send.buffer.bytes", + "receive.buffer.bytes", + "reconnect.backoff.ms", + "reconnect.backoff.max.ms" + )); + + @Override + public void configure(Map configs) { + // No configuration needed for test policy + } + + @Override + public Set reconfigurableConfigs() { + return Collections.emptySet(); + } + + @Override + public void validateReconfiguration(Map configs) { + // No reconfiguration validation needed + } + + @Override + public void reconfigure(Map configs) { + // No reconfiguration needed + } + + @Override + public Optional profileKeys(ClientProfile clientProfile) { + profileKeysCallCount.incrementAndGet(); + + // Store the profile for verification + receivedProfiles.put(clientProfile.clientInstanceId(), clientProfile); + + if (throwUnknownProfileOnKeys) { + throw new UnknownConfigProfileException("Test: Unknown profile"); + } + + if (returnEmptyProfile) { + return Optional.empty(); + } + + // Return standard config keys + long crc = configurationProfileCrc(standardConfigKeys); + return Optional.of(new ClientConfigProfileKeys(standardConfigKeys, crc)); + } + + @Override + public void process(ClientPushConfigData pushConfigData) { + processCallCount.incrementAndGet(); + + // Store configs for verification + receivedConfigs.put( + pushConfigData.clientProfile().clientInstanceId(), + pushConfigData.configs() + ); + + if (rejectNextPush) { + throw new ClientConfigPolicyException("Test: Config validation failed"); + } + + // Validate that we received some configs + if (pushConfigData.configs().isEmpty()) { + throw new ClientConfigPolicyException("No configs received"); + } + } + + @Override + public void close() throws Exception { + // Cleanup if needed + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientConfigsSender.java b/clients/src/main/java/org/apache/kafka/clients/ClientConfigsSender.java new file mode 100644 index 0000000000000..8298d49f38bdb --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ClientConfigsSender.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.GetConfigProfileKeysResponse; +import org.apache.kafka.common.requests.PushConfigResponse; + +import java.util.Optional; + +/** + * Interface for managing the client configuration push handshake with brokers. + *

+ * This is a one-time, best-effort operation performed during client initialization + * to push non-sensitive configuration to the broker for observability purposes. + *

+ * The handshake consists of two steps: + *

    + *
  1. GetConfigProfileKeys - Broker tells client what configs it wants
  2. + *
  3. PushConfig - Client sends the requested configs
  4. + *
+ */ +public interface ClientConfigsSender extends AutoCloseable { + + /** + * Returns true if the config push handshake needs to proceed. + *

+ * Once the handshake is completed or fails, this should return false. + * + * @return true if handshake should continue, false if terminal state reached + */ + boolean shouldAttemptHandshake(); + + /** + * Creates the next request in the handshake flow based on current state. + *

+ * Returns GetConfigProfileKeysRequest if profile keys are needed, or + * PushConfigRequest if ready to push configs. + * + * @return Optional containing the next request builder, or empty if no request needed + */ + Optional> createRequest(); + + /** + * Handle successful GetConfigProfileKeys response. + *

+ * This extracts the profile keys (configuration profile CRC, requested keys, max bytes) + * and prepares for the PushConfig step. + * + * @param response the profile keys response from broker + */ + void handleResponse(GetConfigProfileKeysResponse response); + + /** + * Handle successful PushConfig response. + *

+ * This completes the handshake or handles errors like UNKNOWN_CONFIG_PROFILE. + * + * @param response the push config response from broker + */ + void handleResponse(PushConfigResponse response); + + /** + * Handle get configs subscription request failure. + * + * @param kafkaException the fatal exception. + */ + void handleFailedGetConfigsSubscriptionRequest(KafkaException kafkaException); + + /** + * Handle push configs request failure. + * + * @param kafkaException the fatal exception. + */ + void handleFailedPushConfigsRequest(KafkaException kafkaException); + + /** + * Handle disconnection during the handshake. + *

+ * If a connection is lost during the handshake, mark it as failed. + */ + void handleDisconnect(); + + /** + * Returns the client instance ID. + *

+ * This is a UUID v4 generated by the client before any network activity and is + * shared across KIP-714 (telemetry) and KIP-CFG (config push). + * + * @return the client instance ID + */ + Uuid clientInstanceId(); +} diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index c295713df2370..281dbbb3c1135 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -159,7 +159,8 @@ public static NetworkClient createNetworkClient(AbstractConfig config, int maxInFlightRequestsPerConnection, Metadata metadata, Sensor throttleTimeSensor, - ClientTelemetrySender clientTelemetrySender) { + ClientTelemetrySender clientTelemetrySender, + ClientConfigsSender clientConfigsSender) { return createNetworkClient(config, config.getString(CommonClientConfigs.CLIENT_ID_CONFIG), metrics, @@ -173,7 +174,8 @@ public static NetworkClient createNetworkClient(AbstractConfig config, null, new DefaultHostResolver(), throttleTimeSensor, - clientTelemetrySender); + clientTelemetrySender, + clientConfigsSender); } public static NetworkClient createNetworkClient(AbstractConfig config, @@ -200,6 +202,7 @@ public static NetworkClient createNetworkClient(AbstractConfig config, metadataUpdater, hostResolver, null, + null, null); } @@ -216,7 +219,8 @@ public static NetworkClient createNetworkClient(AbstractConfig config, MetadataUpdater metadataUpdater, HostResolver hostResolver, Sensor throttleTimeSensor, - ClientTelemetrySender clientTelemetrySender) { + ClientTelemetrySender clientTelemetrySender, + ClientConfigsSender clientConfigsSender) { ChannelBuilder channelBuilder = null; Selector selector = null; @@ -247,6 +251,7 @@ public static NetworkClient createNetworkClient(AbstractConfig config, logContext, hostResolver, clientTelemetrySender, + clientConfigsSender, config.getLong(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG), MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG)) ); diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index 08b861673e3d7..da1c173ade9e1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -111,6 +111,9 @@ public class CommonClientConfigs { public static final String ENABLE_METRICS_PUSH_CONFIG = "enable.metrics.push"; public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable pushing of client metrics to the cluster, if the cluster has a client metrics subscription which matches this client."; + public static final String ENABLE_CONFIGS_PUSH_CONFIG = "enable.configs.push"; + public static final String ENABLE_CONFIGS_PUSH_DOC = "When set to 'true', the consumer will push its configuration to the broker for observability and troubleshooting."; + public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms"; public static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The window of time a metrics sample is computed over."; @@ -327,4 +330,12 @@ public static Optional telemetryReporter(String clientI telemetryReporter.configure(config.originals(Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId))); return Optional.of(telemetryReporter); } + + public static Optional configsSender(AbstractConfig config) { + if (!config.getBoolean(CommonClientConfigs.ENABLE_CONFIGS_PUSH_CONFIG)) { + return Optional.empty(); + } + + return Optional.of(new DefaultClientConfigsSender(config)); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/ConfigCollector.java b/clients/src/main/java/org/apache/kafka/clients/ConfigCollector.java new file mode 100644 index 0000000000000..8093e9c9a223d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ConfigCollector.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.message.PushConfigRequestData; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Set; + +/** + * Utility class for collecting and filtering client configuration for transmission to brokers. + * This class handles filtering out sensitive configuration (passwords, security settings, etc.) + * and converting configuration to the format required by the PushConfig RPC. + */ +public class ConfigCollector { + private static final Logger log = LoggerFactory.getLogger(ConfigCollector.class); + + /** + * Collect non-sensitive configuration values for transmission to broker. + * + * @param config Client configuration (ConsumerConfig, ProducerConfig, etc.) + * @param requestedKeys Keys requested by broker ("*" for all non-sensitive) + * @param maxBytes Maximum payload size in bytes + * @return List of config entries ready for PushConfigRequest + */ + public static List collectConfigs( + AbstractConfig config, + List requestedKeys, + int maxBytes) { + + List result = new ArrayList<>(); + + // Expand wildcard "*" to all keys + Set keysToInclude = expandKeys(config, requestedKeys); + + // Filter and convert + int currentBytes = 0; + for (String key : keysToInclude) { + if (shouldExclude(key, config)) { + continue; // Skip sensitive configs + } + + Object value = config.values().get(key); + if (value == null) { + continue; + } + + ConfigDef.Type type = config.typeOf(key); + if (type == null) { + continue; // Unknown config + } + + PushConfigRequestData.Config entry = + convertToConfig(key, value, type); + + // Check size limit + int entrySize = estimateSize(entry); + if (currentBytes + entrySize > maxBytes) { + log.warn("Config payload would exceed {} bytes, truncating at {} entries", + maxBytes, result.size()); + break; + } + + result.add(entry); + currentBytes += entrySize; + } + + log.debug("Collected {} config entries ({} bytes)", result.size(), currentBytes); + return result; + } + + /** + * Expand wildcard or specific key list to actual keys to include. + */ + private static Set expandKeys(AbstractConfig config, List requestedKeys) { + Set keysToInclude = new HashSet<>(); + + for (String requestedKey : requestedKeys) { + if ("*".equals(requestedKey)) { + // Wildcard - include all keys from config + keysToInclude.addAll(config.values().keySet()); + } else { + // Specific key requested + keysToInclude.add(requestedKey); + } + } + + return keysToInclude; + } + + /** + * Determine if a config key should be excluded from transmission. + * Excludes passwords, security settings, class names, and other sensitive data. + */ + private static boolean shouldExclude(String key, AbstractConfig config) { + ConfigDef.Type type = config.typeOf(key); + + // 1. Exclude PASSWORD type + if (type == ConfigDef.Type.PASSWORD) { + return true; + } + + // 2. Exclude CLASS type (per KIP requirement) + if (type == ConfigDef.Type.CLASS) { + return true; + } + + // 3. Exclude bootstrap.servers + if ("bootstrap.servers".equals(key)) { + return true; + } + + // 4. Exclude security/auth related keys + String lowerKey = key.toLowerCase(Locale.ROOT); + if (lowerKey.contains("sasl.") || + lowerKey.contains("ssl.") || + lowerKey.contains("security.")) { + return true; + } + + // 5. Exclude keys ending with sensitive suffixes + if (lowerKey.endsWith(".password") || + lowerKey.endsWith(".secret") || + lowerKey.endsWith(".key") || + lowerKey.endsWith(".token")) { + return true; + } + + return false; + } + + /** + * Convert a config entry to the protocol format. + */ + private static PushConfigRequestData.Config convertToConfig( + String key, + Object value, + ConfigDef.Type type) { + + PushConfigRequestData.Config config = new PushConfigRequestData.Config(); + config.setConfigKey(key); + config.setConfigValue(String.valueOf(value)); + config.setConfigType(mapConfigType(type)); + return config; + } + + /** + * Convert ConfigDef.Type to protocol short value (int16). + * Maps to ConfigDef.Type ordinal values. CLASS and PASSWORD types are excluded by filtering. + */ + private static short mapConfigType(ConfigDef.Type type) { + switch (type) { + case BOOLEAN: + return 0; + case STRING: + return 1; + case INT: + return 2; + case SHORT: + return 3; + case LONG: + return 4; + case DOUBLE: + return 5; + case LIST: + return 6; + case CLASS: + return 7; // Should never reach here due to filtering + case PASSWORD: + return 8; // Should never reach here due to filtering + default: + return 1; // Default to STRING + } + } + + /** + * Estimate the size of a config entry in bytes. + * This is a rough estimate for checking against maxBytes limit. + */ + private static int estimateSize(PushConfigRequestData.Config config) { + // Rough estimate: key length + value length + overhead for type and framing + return config.configKey().length() + config.configValue().length() + 10; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/DefaultClientConfigsSender.java b/clients/src/main/java/org/apache/kafka/clients/DefaultClientConfigsSender.java new file mode 100644 index 0000000000000..9b7f2711f78ba --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/DefaultClientConfigsSender.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.message.GetConfigProfileKeysRequestData; +import org.apache.kafka.common.message.GetConfigProfileKeysResponseData; +import org.apache.kafka.common.message.PushConfigRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.GetConfigProfileKeysRequest; +import org.apache.kafka.common.requests.GetConfigProfileKeysResponse; +import org.apache.kafka.common.requests.PushConfigRequest; +import org.apache.kafka.common.requests.PushConfigResponse; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * Default implementation of ClientConfigsSender that manages the config push handshake. + *

+ * This implementation follows a simple state machine: + *

+ *   NOT_STARTED → PROFILE_KEYS_IN_PROGRESS → PUSH_IN_PROGRESS → COMPLETED/FAILED
+ * 
+ */ +public class DefaultClientConfigsSender implements ClientConfigsSender { + + private static final Logger log = LoggerFactory.getLogger(DefaultClientConfigsSender.class); + + @Override + public void close() throws Exception { + + } + + private enum State { + NOT_STARTED, // Initial state, need to send GetConfigProfileKeys + PROFILE_KEYS_IN_PROGRESS, // Waiting for GetConfigProfileKeys response + PUSH_IN_PROGRESS, // Waiting for PushConfig response + COMPLETED, // Successfully pushed config + FAILED // Failed (but client continues) + } + + private final AbstractConfig clientConfig; + private volatile Uuid clientInstanceId = Uuid.ZERO_UUID; + private volatile State state = State.NOT_STARTED; + private volatile long configurationProfileCrc = -1L; + private volatile int configMaxBytes = 0; + private volatile List requestedConfigKeys = new ArrayList<>(); + + public DefaultClientConfigsSender(AbstractConfig clientConfig) { + this.clientConfig = clientConfig; + } + + @Override + public boolean shouldAttemptHandshake() { + return state != State.COMPLETED && state != State.FAILED; + } + + @Override + public synchronized Optional> createRequest() { + switch (state) { + case NOT_STARTED: + log.debug("Creating GetConfigProfileKeys request"); + state = State.PROFILE_KEYS_IN_PROGRESS; + return Optional.of(createGetConfigProfileKeysRequest()); + + case PROFILE_KEYS_IN_PROGRESS: + // Waiting for profile keys response, no new request to send + return Optional.empty(); + + case PUSH_IN_PROGRESS: + // Check if we have subscription details and need to send push request + if (needsPushRequest()) { + PushConfigRequest.Builder builder = createPushConfigRequest(); + if (builder != null) { + // Mark that we've created the push request + requestedConfigKeys.clear(); // Clear to avoid creating duplicate requests + return Optional.of(builder); + } + } + return Optional.empty(); + + default: + // Terminal states (COMPLETED, FAILED) + return Optional.empty(); + } + } + + @Override + public synchronized void handleResponse(GetConfigProfileKeysResponse response) { + if (state != State.PROFILE_KEYS_IN_PROGRESS) { + log.warn("Received GetConfigProfileKeys response in unexpected state: {}", state); + return; + } + + Errors error = response.error(); + if (error != Errors.NONE) { + log.warn("GetConfigProfileKeys request failed with error: {} - {}", + error, response.data().errorMessage()); + state = State.FAILED; + return; + } + + GetConfigProfileKeysResponseData data = response.data(); + + // Store configuration profile CRC + configurationProfileCrc = data.configurationProfileCrc(); + configMaxBytes = data.configMaxBytes(); + + // Extract requested keys (now simple string array, not nested structure) + requestedConfigKeys = new ArrayList<>(data.configKeys()); + + log.debug("Config profile received: crc={}, maxBytes={}, keys={}", + configurationProfileCrc, configMaxBytes, requestedConfigKeys.size()); + + // Transition to next state - PushConfig will be created on next createRequest() call + state = State.PUSH_IN_PROGRESS; + } + + @Override + public synchronized void handleResponse(PushConfigResponse response) { + if (state != State.PUSH_IN_PROGRESS) { + log.warn("Received PushConfig response in unexpected state: {}", state); + return; + } + + Errors error = response.error(); + + if (error == Errors.NONE) { + log.info("Configuration push completed successfully"); + state = State.COMPLETED; + + } else if (error == Errors.INVALID_CONFIG) { + // Log error message from the response + String errorMessage = response.data().errorMessage(); + if (errorMessage != null && !errorMessage.isEmpty()) { + log.error("Configuration push failed: INVALID_CONFIG - {}", errorMessage); + } else { + log.error("Configuration push failed: INVALID_CONFIG (no details provided)"); + } + state = State.FAILED; + + } else if (error == Errors.UNKNOWN_CONFIG_PROFILE) { + log.warn("Configuration profile changed, retrying GetConfigProfileKeys"); + // Reset to retry once + state = State.NOT_STARTED; + configurationProfileCrc = -1L; + requestedConfigKeys.clear(); + + } else if (error == Errors.CONFIG_TOO_LARGE) { + String errorMessage = response.data().errorMessage(); + log.error("Config payload too large, cannot retry: {}", + errorMessage != null ? errorMessage : ""); + state = State.FAILED; + + } else { + String errorMessage = response.data().errorMessage(); + log.warn("PushConfig failed with error: {} - {}", + error, errorMessage != null ? errorMessage : ""); + state = State.FAILED; + } + } + + @Override + public void handleFailedGetConfigsSubscriptionRequest(KafkaException kafkaException) { + + } + + @Override + public void handleFailedPushConfigsRequest(KafkaException kafkaException) { + + } + + @Override + public synchronized void handleDisconnect() { + if (state == State.PROFILE_KEYS_IN_PROGRESS || state == State.PUSH_IN_PROGRESS) { + log.debug("Disconnected during config push handshake"); + state = State.FAILED; + } + } + + @Override + public Uuid clientInstanceId() { + return clientInstanceId; + } + + private GetConfigProfileKeysRequest.Builder createGetConfigProfileKeysRequest() { + // No fields in GetConfigProfileKeysRequest - client profile comes from ApiVersionsRequest context + GetConfigProfileKeysRequestData requestData = new GetConfigProfileKeysRequestData(); + + return new GetConfigProfileKeysRequest.Builder(requestData); + } + + /** + * Creates a PushConfig request with collected configuration. + * This should only be called after receiving a successful GetConfigProfileKeys response. + */ + private PushConfigRequest.Builder createPushConfigRequest() { + log.debug("Collecting and preparing config push"); + + // Collect configs using ConfigCollector + List configs; + try { + configs = ConfigCollector.collectConfigs( + clientConfig, + requestedConfigKeys, + configMaxBytes + ); + } catch (Exception e) { + log.error("Failed to collect configs for push", e); + state = State.FAILED; + return null; + } + + // Build request + PushConfigRequestData requestData = new PushConfigRequestData() + .setConfigurationProfileCrc(configurationProfileCrc) + .setConfigs(configs); + + return new PushConfigRequest.Builder(requestData); + } + + /** + * Checks if we need to send a PushConfig request. + * This is true when we've received profile keys but haven't pushed yet. + */ + synchronized boolean needsPushRequest() { + return state == State.PUSH_IN_PROGRESS && + configurationProfileCrc != -1L && + !requestedConfigKeys.isEmpty(); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 692847a8b1553..9d056503f51a9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; +import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.ChannelState; import org.apache.kafka.common.network.NetworkReceive; @@ -38,9 +38,11 @@ import org.apache.kafka.common.requests.ApiVersionsRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.CorrelationIdMismatchException; +import org.apache.kafka.common.requests.GetConfigProfileKeysResponse; import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.PushConfigResponse; import org.apache.kafka.common.requests.PushTelemetryResponse; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator; @@ -138,6 +140,7 @@ private enum State { private final AtomicReference state; private final TelemetrySender telemetrySender; + private final ConfigsSender configsSender; public NetworkClient(Selectable selector, Metadata metadata, @@ -155,8 +158,9 @@ public NetworkClient(Selectable selector, ApiVersions apiVersions, LogContext logContext, MetadataRecoveryStrategy metadataRecoveryStrategy) { - this(selector, + this(null, metadata, + selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs, @@ -169,7 +173,11 @@ public NetworkClient(Selectable selector, time, discoverBrokerVersions, apiVersions, + null, logContext, + new DefaultHostResolver(), + null, + null, Long.MAX_VALUE, metadataRecoveryStrategy); } @@ -210,6 +218,7 @@ public NetworkClient(Selectable selector, logContext, new DefaultHostResolver(), null, + null, rebootstrapTriggerMs, metadataRecoveryStrategy); } @@ -250,6 +259,7 @@ public NetworkClient(Selectable selector, logContext, new DefaultHostResolver(), null, + null, Long.MAX_VALUE, metadataRecoveryStrategy); } @@ -289,6 +299,7 @@ public NetworkClient(Selectable selector, logContext, new DefaultHostResolver(), null, + null, Long.MAX_VALUE, metadataRecoveryStrategy); } @@ -312,6 +323,7 @@ public NetworkClient(MetadataUpdater metadataUpdater, LogContext logContext, HostResolver hostResolver, ClientTelemetrySender clientTelemetrySender, + ClientConfigsSender clientConfigsSender, long rebootstrapTriggerMs, MetadataRecoveryStrategy metadataRecoveryStrategy) { /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not @@ -344,6 +356,7 @@ public NetworkClient(MetadataUpdater metadataUpdater, this.log = logContext.logger(NetworkClient.class); this.state = new AtomicReference<>(State.ACTIVE); this.telemetrySender = (clientTelemetrySender != null) ? new TelemetrySender(clientTelemetrySender) : null; + this.configsSender = (clientConfigsSender != null) ? new ConfigsSender(clientConfigsSender) : null; this.rebootstrapTriggerMs = rebootstrapTriggerMs; this.metadataRecoveryStrategy = metadataRecoveryStrategy; } @@ -430,6 +443,8 @@ private void cancelInFlightRequests(String nodeId, metadataUpdater.handleFailedRequest(now, Optional.empty()); } else if (isTelemetryApi(request.header.apiKey()) && telemetrySender != null) { telemetrySender.handleFailedRequest(request.header.apiKey(), null); + } else if (isConfigPushApi(request.header.apiKey()) && configsSender != null) { + configsSender.handleFailedRequest(request.header.apiKey(), null); } } } @@ -595,6 +610,8 @@ else if (clientRequest.apiKey() == ApiKeys.METADATA) metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException)); else if (isTelemetryApi(clientRequest.apiKey()) && telemetrySender != null) telemetrySender.handleFailedRequest(clientRequest.apiKey(), unsupportedVersionException); + else if (isConfigPushApi(clientRequest.apiKey()) && configsSender != null) + configsSender.handleFailedRequest(clientRequest.apiKey(), unsupportedVersionException); } } @@ -1015,6 +1032,10 @@ else if (req.isInternalRequest && response instanceof GetTelemetrySubscriptionsR telemetrySender.handleResponse((GetTelemetrySubscriptionsResponse) response); else if (req.isInternalRequest && response instanceof PushTelemetryResponse) telemetrySender.handleResponse((PushTelemetryResponse) response); + else if (req.isInternalRequest && response instanceof GetConfigProfileKeysResponse) + configsSender.handleResponse((GetConfigProfileKeysResponse) response); + else if (req.isInternalRequest && response instanceof PushConfigResponse) + configsSender.handleResponse((PushConfigResponse) response); else responses.add(req.completed(response, now)); } @@ -1035,7 +1056,7 @@ private void handleApiVersionsResponse(List responses, // If not provided, the client falls back to version 0. short maxApiVersion = 0; if (apiVersionsResponse.data().apiKeys().size() > 0) { - ApiVersion apiVersion = apiVersionsResponse.data().apiKeys().find(ApiKeys.API_VERSIONS.id); + ApiVersionsResponseData.ApiVersion apiVersion = apiVersionsResponse.data().apiKeys().find(ApiKeys.API_VERSIONS.id); if (apiVersion != null) { maxApiVersion = apiVersion.maxVersion(); } @@ -1050,6 +1071,7 @@ private void handleApiVersionsResponse(List responses, apiVersionsResponse.data().finalizedFeatures(), apiVersionsResponse.data().finalizedFeaturesEpoch()); apiVersions.update(node, nodeVersionInfo); + this.connectionStates.ready(node); log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, API versions: {}.", node, apiVersionsResponse.data().finalizedFeaturesEpoch(), apiVersionsResponse.data().finalizedFeatures(), @@ -1171,6 +1193,10 @@ private boolean isTelemetryApi(ApiKeys apiKey) { return apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY; } + private boolean isConfigPushApi(ApiKeys apiKey) { + return apiKey == ApiKeys.GET_CONFIG_PROFILE_KEYS || apiKey == ApiKeys.PUSH_CONFIG; + } + class DefaultMetadataUpdater implements MetadataUpdater { /* the current cluster metadata */ @@ -1468,6 +1494,40 @@ public void close() { } } + class ConfigsSender { + + private final ClientConfigsSender clientConfigsSender; + + public ConfigsSender(ClientConfigsSender clientConfigsSender) { + this.clientConfigsSender = clientConfigsSender; + } + + public void handleResponse(GetConfigProfileKeysResponse response) { + clientConfigsSender.handleResponse(response); + } + + public void handleResponse(PushConfigResponse response) { + clientConfigsSender.handleResponse(response); + } + + public void handleFailedRequest(ApiKeys apiKey, KafkaException maybeFatalException) { + if (apiKey == ApiKeys.GET_CONFIG_PROFILE_KEYS) + clientConfigsSender.handleFailedGetConfigsSubscriptionRequest(maybeFatalException); + else if (apiKey == ApiKeys.PUSH_CONFIG) + clientConfigsSender.handleFailedPushConfigsRequest(maybeFatalException); + else + throw new IllegalStateException("Invalid api key for failed configs request"); + } + + public void close() { + try { + clientConfigsSender.close(); + } catch (Exception exception) { + log.error("Failed to close client configs sender", exception); + } + } + } + @Override public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder requestBuilder, diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 85f1e87459f01..5f8b57035d1c5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -563,7 +563,8 @@ static KafkaAdminClient createInternal( metadataManager.updater(), (hostResolver == null) ? new DefaultHostResolver() : hostResolver, null, - clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null)); + clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), + null); return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient, timeoutProcessorFactory, logContext, clientTelemetryReporter); } catch (Throwable exc) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 7ad52c2122627..9ef0ec109d389 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -262,6 +262,12 @@ public class ConsumerConfig extends AbstractConfig { public static final String ENABLE_METRICS_PUSH_CONFIG = CommonClientConfigs.ENABLE_METRICS_PUSH_CONFIG; public static final String ENABLE_METRICS_PUSH_DOC = CommonClientConfigs.ENABLE_METRICS_PUSH_DOC; + /** + * enable.configs.push + */ + public static final String ENABLE_CONFIGS_PUSH_CONFIG = CommonClientConfigs.ENABLE_CONFIGS_PUSH_CONFIG; + public static final String ENABLE_CONFIGS_PUSH_DOC = CommonClientConfigs.ENABLE_CONFIGS_PUSH_DOC; + /** * retry.backoff.max.ms */ @@ -701,6 +707,11 @@ public class ConsumerConfig extends AbstractConfig { new ShareAcquireMode.Validator(), Importance.MEDIUM, ConsumerConfig.SHARE_ACQUIRE_MODE_DOC) + .define(ENABLE_CONFIGS_PUSH_CONFIG, + Type.BOOLEAN, + true, + Importance.LOW, + ENABLE_CONFIGS_PUSH_DOC) .define(CONFIG_PROVIDERS_CONFIG, ConfigDef.Type.LIST, List.of(), diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 644613a8dee45..c0754e4cf2831 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientConfigsSender; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.GroupRebalanceConfig; @@ -324,6 +325,7 @@ private StreamsRebalanceListenerInvoker streamsRebalanceListenerInvoker() { private volatile boolean closed = false; // Init value is needed to avoid NPE in case of exception raised in the constructor private Optional clientTelemetryReporter = Optional.empty(); + private Optional clientConfigsSender = Optional.empty(); private final PositionsValidator positionsValidator; private AsyncPollEvent inflightPoll; @@ -399,6 +401,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config, List reporters = CommonClientConfigs.metricsReporters(clientId, config); this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); this.clientTelemetryReporter.ifPresent(reporters::add); + this.clientConfigsSender = CommonClientConfigs.configsSender(config); this.metrics = createMetrics(config, time, reporters); this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, CONSUMER_METRIC_GROUP); this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics); @@ -439,6 +442,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config, metrics, fetchMetricsManager.throttleTimeSensor(), clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), + clientConfigsSender.orElse(null), backgroundEventHandler, false, asyncConsumerMetrics diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index 5b54a759a9844..eeeb18d8ad4cc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientConfigsSender; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.GroupRebalanceConfig; @@ -143,6 +144,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { private final List assignors; // Init value is needed to avoid NPE in case of exception raised in the constructor private Optional clientTelemetryReporter = Optional.empty(); + private Optional clientConfigsSender = Optional.empty(); // currentThread holds the threadId of the current thread accessing this Consumer // and is used to prevent multi-threaded access @@ -175,6 +177,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { List reporters = CommonClientConfigs.metricsReporters(clientId, config); this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); this.clientTelemetryReporter.ifPresent(reporters::add); + this.clientConfigsSender = CommonClientConfigs.configsSender(config); this.metrics = createMetrics(config, time, reporters); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); @@ -204,7 +207,8 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { metadata, fetchMetricsManager.throttleTimeSensor(), retryBackoffMs, - clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null)); + clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), + clientConfigsSender.orElse(null)); this.assignors = ConsumerPartitionAssignor.getAssignorInstances( config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java index 2f715e206cc07..f6d2b8c88003d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientConfigsSender; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.GroupRebalanceConfig; @@ -87,7 +88,8 @@ public static ConsumerNetworkClient createConsumerNetworkClient(ConsumerConfig c Metadata metadata, Sensor throttleTimeSensor, long retryBackoffMs, - ClientTelemetrySender clientTelemetrySender) { + ClientTelemetrySender clientTelemetrySender, + ClientConfigsSender clientConfigsSender) { NetworkClient netClient = ClientUtils.createNetworkClient(config, metrics, CONSUMER_METRIC_GROUP_PREFIX, @@ -97,7 +99,8 @@ public static ConsumerNetworkClient createConsumerNetworkClient(ConsumerConfig c CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION, metadata, throttleTimeSensor, - clientTelemetrySender); + clientTelemetrySender, + clientConfigsSender); // Will avoid blocking an extended period of time to prevent heartbeat thread starvation int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 42c1c73acb565..42d11e52c0df3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientConfigsSender; import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.ClientUtils; @@ -469,6 +470,7 @@ public static Supplier supplier(final Time time, final Metrics metrics, final Sensor throttleTimeSensor, final ClientTelemetrySender clientTelemetrySender, + final ClientConfigsSender clientConfigsSender, final BackgroundEventHandler backgroundEventHandler, final boolean notifyMetadataErrorsViaErrorQueue, final AsyncConsumerMetrics asyncConsumerMetrics) { @@ -484,7 +486,8 @@ protected NetworkClientDelegate create() { CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION, metadata, throttleTimeSensor, - clientTelemetrySender); + clientTelemetrySender, + clientConfigsSender); return new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, asyncConsumerMetrics); } }; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 3a7eb81df6062..457c95d9dcf9d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -291,6 +291,7 @@ private void process(final ErrorEvent event) { metrics, shareFetchMetricsManager.throttleTimeSensor(), clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), + null, backgroundEventHandler, true, asyncConsumerMetrics diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 31a9f6b945c61..a1a30b34847e0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -536,7 +536,8 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadat maxInflightRequests, metadata, throttleTimeSensor, - clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null)); + clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), + null); short acks = Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG)); return new Sender(logContext, diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ClientConfigPolicyException.java b/clients/src/main/java/org/apache/kafka/common/errors/ClientConfigPolicyException.java new file mode 100644 index 0000000000000..40b42b5244a45 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/ClientConfigPolicyException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * Exception thrown when a client configuration policy violation occurs. + *

+ * This is a generic exception for ClientConfigPolicy implementations to throw when + * client configurations fail validation or enforcement rules. + */ +public class ClientConfigPolicyException extends ApiException { + + private static final long serialVersionUID = 1L; + + public ClientConfigPolicyException(String message) { + super(message); + } + + public ClientConfigPolicyException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ConfigTooLargeException.java b/clients/src/main/java/org/apache/kafka/common/errors/ConfigTooLargeException.java new file mode 100644 index 0000000000000..3982eaf95d9a4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/ConfigTooLargeException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * Exception thrown when a client configuration payload exceeds the broker's ConfigMaxBytes limit. + */ +public class ConfigTooLargeException extends ApiException { + + private static final long serialVersionUID = 1L; + + public ConfigTooLargeException(String message) { + super(message); + } + + public ConfigTooLargeException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownConfigProfileException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownConfigProfileException.java new file mode 100644 index 0000000000000..bbdc6e667ac86 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownConfigProfileException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * Exception thrown when a client's configuration profile is unknown or not supported by the broker. + *

+ * This can occur in two scenarios: + *

    + *
  • During GetConfigProfileKeys: The client profile did not match any configuration profiles + * and the policy implementation does not allow for that case. This is a fatal error.
  • + *
  • During PushConfig: The client sent a request with an invalid or outdated configuration profile CRC, + * which means the configuration profile has changed. The client should retry the handshake.
  • + *
+ */ +public class UnknownConfigProfileException extends RetriableException { + + private static final long serialVersionUID = 1L; + + public UnknownConfigProfileException(String message) { + super(message); + } + + public UnknownConfigProfileException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/ClientInformation.java b/clients/src/main/java/org/apache/kafka/common/network/ClientInformation.java index cb99a8669e40f..de9feff496da2 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ClientInformation.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ClientInformation.java @@ -21,14 +21,16 @@ public class ClientInformation { public static final String UNKNOWN_NAME_OR_VERSION = "unknown"; - public static final ClientInformation EMPTY = new ClientInformation(UNKNOWN_NAME_OR_VERSION, UNKNOWN_NAME_OR_VERSION); + public static final ClientInformation EMPTY = new ClientInformation(UNKNOWN_NAME_OR_VERSION, UNKNOWN_NAME_OR_VERSION, UNKNOWN_NAME_OR_VERSION); private final String softwareName; private final String softwareVersion; + private final String softwareRole; - public ClientInformation(String softwareName, String softwareVersion) { + public ClientInformation(String softwareName, String softwareVersion, String softwareRole) { this.softwareName = softwareName.isEmpty() ? UNKNOWN_NAME_OR_VERSION : softwareName; this.softwareVersion = softwareVersion.isEmpty() ? UNKNOWN_NAME_OR_VERSION : softwareVersion; + this.softwareRole = (softwareRole == null || softwareRole.isEmpty()) ? UNKNOWN_NAME_OR_VERSION : softwareRole; } public String softwareName() { @@ -39,15 +41,20 @@ public String softwareVersion() { return this.softwareVersion; } + public String softwareRole() { + return this.softwareRole; + } + @Override public String toString() { return "ClientInformation(softwareName=" + softwareName + - ", softwareVersion=" + softwareVersion + ")"; + ", softwareVersion=" + softwareVersion + + ", softwareRole=" + softwareRole + ")"; } @Override public int hashCode() { - return Objects.hash(softwareName, softwareVersion); + return Objects.hash(softwareName, softwareVersion, softwareRole); } @Override @@ -60,6 +67,7 @@ public boolean equals(Object o) { } ClientInformation other = (ClientInformation) o; return other.softwareName.equals(softwareName) && - other.softwareVersion.equals(softwareVersion); + other.softwareVersion.equals(softwareVersion) && + other.softwareRole.equals(softwareRole); } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 79b283b4f8d89..50e0b2e409a60 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -137,7 +137,9 @@ public enum ApiKeys { STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE), DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS), ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS), - DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS); + DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS), + GET_CONFIG_PROFILE_KEYS(ApiMessageType.GET_CONFIG_PROFILE_KEYS), + PUSH_CONFIG(ApiMessageType.PUSH_CONFIG); private static final Map> APIS_BY_LISTENER = new EnumMap<>(ApiMessageType.ListenerType.class); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index a27a7fcf23c77..5232c05810619 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.ConcurrentTransactionsException; +import org.apache.kafka.common.errors.ConfigTooLargeException; import org.apache.kafka.common.errors.ControllerMovedException; import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; @@ -134,6 +135,7 @@ import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.common.errors.TransactionalIdNotFoundException; import org.apache.kafka.common.errors.UnacceptableCredentialException; +import org.apache.kafka.common.errors.UnknownConfigProfileException; import org.apache.kafka.common.errors.UnknownControllerIdException; import org.apache.kafka.common.errors.UnknownLeaderEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; @@ -418,7 +420,9 @@ public enum Errors { STREAMS_INVALID_TOPOLOGY(130, "The supplied topology is invalid.", StreamsInvalidTopologyException::new), STREAMS_INVALID_TOPOLOGY_EPOCH(131, "The supplied topology epoch is invalid.", StreamsInvalidTopologyEpochException::new), STREAMS_TOPOLOGY_FENCED(132, "The supplied topology epoch is outdated.", StreamsTopologyFencedException::new), - SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been reached.", ShareSessionLimitReachedException::new); + SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been reached.", ShareSessionLimitReachedException::new), + CONFIG_TOO_LARGE(134, "Configuration payload exceeds broker's ConfigMaxBytes limit.", ConfigTooLargeException::new), + UNKNOWN_CONFIG_PROFILE(135, "Client configuration profile is unknown or not supported.", UnknownConfigProfileException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index bc313078d7424..7a89110e60606 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -291,6 +291,10 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, Readable readable, return AlterShareGroupOffsetsResponse.parse(readable, version); case DELETE_SHARE_GROUP_OFFSETS: return DeleteShareGroupOffsetsResponse.parse(readable, version); + case GET_CONFIG_PROFILE_KEYS: + return GetConfigProfileKeysResponse.parse(readable, version); + case PUSH_CONFIG: + return PushConfigResponse.parse(readable, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java index 1bdb0903c7d7d..4b9c4915997d6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java @@ -95,8 +95,9 @@ public boolean hasUnsupportedRequestVersion() { public boolean isValid() { if (version() >= 3) { - return SOFTWARE_NAME_VERSION_PATTERN.matcher(data.clientSoftwareName()).matches() && - SOFTWARE_NAME_VERSION_PATTERN.matcher(data.clientSoftwareVersion()).matches(); + boolean nameValid = SOFTWARE_NAME_VERSION_PATTERN.matcher(data.clientSoftwareName()).matches(); + boolean versionValid = SOFTWARE_NAME_VERSION_PATTERN.matcher(data.clientSoftwareVersion()).matches(); + return nameValid && versionValid; } else { return true; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GetConfigProfileKeysRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GetConfigProfileKeysRequest.java new file mode 100644 index 0000000000000..3d3ae6dd19aa4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/GetConfigProfileKeysRequest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.GetConfigProfileKeysRequestData; +import org.apache.kafka.common.message.GetConfigProfileKeysResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.Readable; + +public class GetConfigProfileKeysRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder { + private final GetConfigProfileKeysRequestData data; + + public Builder(GetConfigProfileKeysRequestData data) { + this(data, false); + } + + public Builder(GetConfigProfileKeysRequestData data, boolean enableUnstableLastVersion) { + super(ApiKeys.GET_CONFIG_PROFILE_KEYS, enableUnstableLastVersion); + this.data = data; + } + + @Override + public GetConfigProfileKeysRequest build(short version) { + return new GetConfigProfileKeysRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final GetConfigProfileKeysRequestData data; + + public GetConfigProfileKeysRequest(GetConfigProfileKeysRequestData data, short version) { + super(ApiKeys.GET_CONFIG_PROFILE_KEYS, version); + this.data = data; + } + + @Override + public GetConfigProfileKeysResponse getErrorResponse(int throttleTimeMs, Throwable e) { + GetConfigProfileKeysResponseData responseData = new GetConfigProfileKeysResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setErrorCode(Errors.forException(e).code()) + .setErrorMessage(e.getMessage()); + return new GetConfigProfileKeysResponse(responseData); + } + + @Override + public GetConfigProfileKeysRequestData data() { + return data; + } + + public static GetConfigProfileKeysRequest parse(Readable readable, short version) { + return new GetConfigProfileKeysRequest(new GetConfigProfileKeysRequestData(readable, version), version); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GetConfigProfileKeysResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GetConfigProfileKeysResponse.java new file mode 100644 index 0000000000000..e08a0b964e925 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/GetConfigProfileKeysResponse.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.GetConfigProfileKeysResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.Readable; + +import java.util.EnumMap; +import java.util.Map; + +/** + * Possible error codes: + * - {@link Errors#UNKNOWN_CONFIG_PROFILE} + * - {@link Errors#UNSUPPORTED_VERSION} + * - {@link Errors#INVALID_REQUEST} + */ +public class GetConfigProfileKeysResponse extends AbstractResponse { + private final GetConfigProfileKeysResponseData data; + + public GetConfigProfileKeysResponse(GetConfigProfileKeysResponseData data) { + super(ApiKeys.GET_CONFIG_PROFILE_KEYS); + this.data = data; + } + + @Override + public GetConfigProfileKeysResponseData data() { + return data; + } + + @Override + public Map errorCounts() { + Map counts = new EnumMap<>(Errors.class); + updateErrorCounts(counts, Errors.forCode(data.errorCode())); + return counts; + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + public boolean hasError() { + return error() != Errors.NONE; + } + + public Errors error() { + return Errors.forCode(data.errorCode()); + } + + public static GetConfigProfileKeysResponse parse(Readable readable, short version) { + return new GetConfigProfileKeysResponse(new GetConfigProfileKeysResponseData(readable, version)); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/PushConfigRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/PushConfigRequest.java new file mode 100644 index 0000000000000..d060765949ca6 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/PushConfigRequest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.PushConfigRequestData; +import org.apache.kafka.common.message.PushConfigResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.Readable; + +public class PushConfigRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder { + private final PushConfigRequestData data; + + public Builder(PushConfigRequestData data) { + this(data, false); + } + + public Builder(PushConfigRequestData data, boolean enableUnstableLastVersion) { + super(ApiKeys.PUSH_CONFIG, enableUnstableLastVersion); + this.data = data; + } + + @Override + public PushConfigRequest build(short version) { + return new PushConfigRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final PushConfigRequestData data; + + public PushConfigRequest(PushConfigRequestData data, short version) { + super(ApiKeys.PUSH_CONFIG, version); + this.data = data; + } + + @Override + public PushConfigResponse getErrorResponse(int throttleTimeMs, Throwable e) { + PushConfigResponseData responseData = new PushConfigResponseData() + .setErrorCode(Errors.forException(e).code()) + .setThrottleTimeMs(throttleTimeMs); + return new PushConfigResponse(responseData); + } + + @Override + public PushConfigRequestData data() { + return data; + } + + public static PushConfigRequest parse(Readable readable, short version) { + return new PushConfigRequest(new PushConfigRequestData(readable, version), version); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/PushConfigResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/PushConfigResponse.java new file mode 100644 index 0000000000000..b1b97f19f4d87 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/PushConfigResponse.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.PushConfigResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.Readable; + +import java.util.EnumMap; +import java.util.Map; + +/** + * Possible error codes: + * - {@link Errors#CONFIG_TOO_LARGE} + * - {@link Errors#INVALID_CONFIG} + * - {@link Errors#CLIENT_CONFIG_POLICY_EXCEPTION} + * - {@link Errors#UNKNOWN_CONFIG_PROFILE} + * - {@link Errors#UNSUPPORTED_VERSION} + * - {@link Errors#INVALID_REQUEST} + */ +public class PushConfigResponse extends AbstractResponse { + private final PushConfigResponseData data; + + public PushConfigResponse(PushConfigResponseData data) { + super(ApiKeys.PUSH_CONFIG); + this.data = data; + } + + @Override + public PushConfigResponseData data() { + return data; + } + + @Override + public Map errorCounts() { + Map counts = new EnumMap<>(Errors.class); + updateErrorCounts(counts, Errors.forCode(data.errorCode())); + return counts; + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + public boolean hasError() { + return error() != Errors.NONE; + } + + public Errors error() { + return Errors.forCode(data.errorCode()); + } + + public static PushConfigResponse parse(Readable readable, short version) { + return new PushConfigResponse(new PushConfigResponseData(readable, version)); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index b84b5dc2abc94..5b87f2c25632a 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -579,7 +579,8 @@ else if (!apiVersionsRequest.isValid()) sendKafkaResponse(context, apiVersionsRequest.getErrorResponse(0, Errors.INVALID_REQUEST.exception())); else { metadataRegistry.registerClientInformation(new ClientInformation(apiVersionsRequest.data().clientSoftwareName(), - apiVersionsRequest.data().clientSoftwareVersion())); + apiVersionsRequest.data().clientSoftwareVersion(), + null)); sendKafkaResponse(context, apiVersionSupplier.apply(apiVersionsRequest.version())); setSaslState(SaslState.HANDSHAKE_REQUEST); } diff --git a/clients/src/main/java/org/apache/kafka/server/policy/ClientConfigPolicy.java b/clients/src/main/java/org/apache/kafka/server/policy/ClientConfigPolicy.java new file mode 100644 index 0000000000000..2717d8c4aaa78 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/server/policy/ClientConfigPolicy.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.policy; + +import org.apache.kafka.common.Reconfigurable; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.errors.ClientConfigPolicyException; +import org.apache.kafka.common.errors.ConfigTooLargeException; +import org.apache.kafka.common.errors.UnknownConfigProfileException; +import org.apache.kafka.common.utils.Crc32C; + +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.SortedSet; +import java.util.stream.Collectors; + +/** + * An interface for intercepting and enforcing client configuration. + * + *

Common use cases include: + *

    + *
  • Selecting which configuration keys to request from clients based on their profile
  • + *
  • Validating that client configurations match expected values or fall within acceptable ranges
  • + *
  • Storing client configuration snapshots for observability and troubleshooting
  • + *
+ * + *

If client.config.policy.class.name is defined, Kafka will create an instance of the specified class + * using the default constructor and will then pass the broker configs to its configure() method. + * During broker shutdown, the close() method will be invoked so that resources can be released (if + * necessary). + */ +@InterfaceStability.Evolving +public interface ClientConfigPolicy extends Reconfigurable, AutoCloseable { + + /** + * Computes the CRC that serves to identify a configuration profile. The + * value is generated by calculating a CRC32C of the configuration set. + *

+ * This default implementation sorts the keys for deterministic ordering and computes + * a CRC32C checksum. Implementations may override this method if a different CRC + * calculation is needed. + * + * @param configKeys The set of configuration keys in the profile + * @return CRC32C checksum of the configuration keys + */ + default long configurationProfileCrc(SortedSet configKeys) { + // Sort keys for deterministic ordering + String keysString = configKeys.stream() + .collect(Collectors.joining(",")); + byte[] keysBytes = keysString.getBytes(StandardCharsets.UTF_8); + return Crc32C.compute(keysBytes, 0, keysBytes.length); + } + + /** + * Select the configuration profile keys for a given client profile. + *

+ * How much of the client profile must match a configuration profile is left up to the + * implementation. It is not a requirement that all of the client profile (name, version, and + * any optional metadata) match exactly. For example, if the configuration of the client doesn't + * change between minor versions, there's no need to provide a distinct configuration profile + * for each minor client profile difference. + *

+ * If this method returns {@code Optional.empty()}, the client will not send a PushConfig request. + * + * @param clientProfile The client profile information from the request context + * @return Optional containing the configuration profile keys and CRC, or empty if no profile matches + * @throws UnknownConfigProfileException if the client profile did not match any configuration profiles + * and the policy implementation does not allow for that case + */ + Optional profileKeys(ClientProfile clientProfile) + throws UnknownConfigProfileException; + + /** + * Receive the client configuration data for observability, enforcement, etc. + *

+ * This method should avoid blocking. Implementations typically: + *

    + *
  • Validate configuration values against expected ranges or patterns
  • + *
  • Store configuration snapshots in external storage for auditing
  • + *
  • Track configuration changes over time
  • + *
+ *

+ * Note: this method will not be invoked if the {@code Configs} array + * of the PushConfig request was larger than {@code client.config.max.bytes}. + * + * @param pushConfigData The client profile and configuration data from the PushConfig request + * @throws UnknownConfigProfileException if the configuration profile CRC is invalid or outdated + * @throws ConfigTooLargeException if the configuration data exceeds size limits + * @throws ClientConfigPolicyException if the configurations violate the policy + */ + void process(ClientPushConfigData pushConfigData) + throws UnknownConfigProfileException, ConfigTooLargeException, ClientConfigPolicyException; + + /** + * Close this policy instance. Default implementation is a no-op. + */ + @Override + default void close() throws Exception { + // Default no-op + } +} diff --git a/clients/src/main/java/org/apache/kafka/server/policy/ClientConfigProfileKeys.java b/clients/src/main/java/org/apache/kafka/server/policy/ClientConfigProfileKeys.java new file mode 100644 index 0000000000000..d2d76a6c388be --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/server/policy/ClientConfigProfileKeys.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.policy; + +import java.util.Objects; +import java.util.SortedSet; + +/** + * Immutable class containing the configuration profile keys. + *

+ * A configuration profile defines the set of configuration keys to capture. The ClientConfigPolicy + * uses the information in the client profile to select the most appropriate configuration profile. + *

+ * The CRC is generated by calculating a CRC32C of the configuration keys and is used to detect + * changes to the configuration profile between GetConfigProfileKeys and PushConfig requests. + */ +public final class ClientConfigProfileKeys { + private final SortedSet keys; + private final long crc; + + /** + * Creates a new ClientConfigProfileKeys. + * + * @param keys The set of configuration keys the client should provide (sorted for deterministic ordering) + * @param crc CRC32C checksum of the configuration keys for profile change detection + */ + public ClientConfigProfileKeys(SortedSet keys, long crc) { + this.keys = keys; + this.crc = crc; + } + + public SortedSet keys() { + return keys; + } + + public long crc() { + return crc; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClientConfigProfileKeys that = (ClientConfigProfileKeys) o; + return crc == that.crc && Objects.equals(keys, that.keys); + } + + @Override + public int hashCode() { + return Objects.hash(keys, crc); + } + + @Override + public String toString() { + return "ClientConfigProfileKeys{" + + "keys=" + keys + + ", crc=" + crc + + '}'; + } +} diff --git a/clients/src/main/java/org/apache/kafka/server/policy/ClientProfile.java b/clients/src/main/java/org/apache/kafka/server/policy/ClientProfile.java new file mode 100644 index 0000000000000..528fea90bc1ac --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/server/policy/ClientProfile.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.policy; + +import org.apache.kafka.common.Uuid; + +import java.util.Objects; +import java.util.SortedMap; + +/** + * Immutable class containing the client profile from the RequestContext. + *

+ * A client profile is made up of the tuple of ClientSoftwareName, ClientSoftwareVersion, + * ClientInstanceId, and ClientMetadata. The client profile provides the ClientConfigPolicy + * implementation with a detailed view of the client in use, allowing it to distinguish between + * different client types (e.g., librdkafka 2.12.0 vs Apache Kafka Java 4.4.0 Producer). + */ +public final class ClientProfile { + private final Uuid clientInstanceId; + private final String clientSoftwareName; + private final String clientSoftwareVersion; + private final SortedMap clientMetadata; + + /** + * Creates a new ClientProfile. + * + * @param clientInstanceId Unique identifier for this client instance (UUID v4) + * @param clientSoftwareName The name of the client software (e.g., "apache-kafka-java") + * @param clientSoftwareVersion The version of the client software (e.g., "3.8.0") + * @param clientMetadata Optional metadata as key-value pairs for additional client context + */ + public ClientProfile( + Uuid clientInstanceId, + String clientSoftwareName, + String clientSoftwareVersion, + SortedMap clientMetadata + ) { + this.clientInstanceId = clientInstanceId; + this.clientSoftwareName = clientSoftwareName; + this.clientSoftwareVersion = clientSoftwareVersion; + this.clientMetadata = clientMetadata; + } + + public Uuid clientInstanceId() { + return clientInstanceId; + } + + public String clientSoftwareName() { + return clientSoftwareName; + } + + public String clientSoftwareVersion() { + return clientSoftwareVersion; + } + + public SortedMap clientMetadata() { + return clientMetadata; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClientProfile that = (ClientProfile) o; + return Objects.equals(clientInstanceId, that.clientInstanceId) && + Objects.equals(clientSoftwareName, that.clientSoftwareName) && + Objects.equals(clientSoftwareVersion, that.clientSoftwareVersion) && + Objects.equals(clientMetadata, that.clientMetadata); + } + + @Override + public int hashCode() { + return Objects.hash(clientInstanceId, clientSoftwareName, clientSoftwareVersion, clientMetadata); + } + + @Override + public String toString() { + return "ClientProfile{" + + "clientInstanceId=" + clientInstanceId + + ", clientSoftwareName='" + clientSoftwareName + '\'' + + ", clientSoftwareVersion='" + clientSoftwareVersion + '\'' + + ", clientMetadata=" + clientMetadata + + '}'; + } +} diff --git a/clients/src/main/java/org/apache/kafka/server/policy/ClientPushConfigData.java b/clients/src/main/java/org/apache/kafka/server/policy/ClientPushConfigData.java new file mode 100644 index 0000000000000..39806a995fe2d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/server/policy/ClientPushConfigData.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.policy; + +import java.util.Map; +import java.util.Objects; + +/** + * Immutable class containing the PushConfig API data. + *

+ * The client profile and configuration values come directly from the RPC. + * The broker supplies its current timestamp (UTC) for when the request was received. + */ +public final class ClientPushConfigData { + private final ClientProfile clientProfile; + private final Map configs; + private final long timestamp; + + /** + * Creates a new ClientPushConfigData. + * + * @param clientProfile The client profile information from the request context + * @param configs The configuration key-value pairs pushed by the client + * @param timestamp UTC timestamp (milliseconds) when the broker received the request + */ + public ClientPushConfigData( + ClientProfile clientProfile, + Map configs, + long timestamp + ) { + this.clientProfile = clientProfile; + this.configs = configs; + this.timestamp = timestamp; + } + + public ClientProfile clientProfile() { + return clientProfile; + } + + public Map configs() { + return configs; + } + + public long timestamp() { + return timestamp; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClientPushConfigData that = (ClientPushConfigData) o; + return timestamp == that.timestamp && + Objects.equals(clientProfile, that.clientProfile) && + Objects.equals(configs, that.configs); + } + + @Override + public int hashCode() { + return Objects.hash(clientProfile, configs, timestamp); + } + + @Override + public String toString() { + return "ClientPushConfigData{" + + "clientProfile=" + clientProfile + + ", configs=" + configs + + ", timestamp=" + timestamp + + '}'; + } +} diff --git a/clients/src/main/resources/common/message/ApiVersionsRequest.json b/clients/src/main/resources/common/message/ApiVersionsRequest.json index 56170c9667350..489a6c1cf953e 100644 --- a/clients/src/main/resources/common/message/ApiVersionsRequest.json +++ b/clients/src/main/resources/common/message/ApiVersionsRequest.json @@ -23,12 +23,25 @@ // Version 3 is the first flexible version and adds ClientSoftwareName and ClientSoftwareVersion. // // Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion in the response from being 0. - "validVersions": "0-4", + // + // Version 5 adds ClientInstanceId and ClientMetadata for KIP-CFG. + "validVersions": "0-5", "flexibleVersions": "3+", "fields": [ { "name": "ClientSoftwareName", "type": "string", "versions": "3+", "ignorable": true, "about": "The name of the client." }, { "name": "ClientSoftwareVersion", "type": "string", "versions": "3+", - "ignorable": true, "about": "The version of the client." } + "ignorable": true, "about": "The version of the client." }, + { "name": "ClientInstanceId", "type": "uuid", "versions": "0+", + "ignorable": false, "about": "Unique ID for this client instance, must be set to 0 on the first request." }, + { "name": "ClientMetadata", "type": "[]ClientMetadataEntry", "versions": "5+", + "ignorable": true, "about": "Client metadata as key-value pairs.", + "fields": [ + { "name": "Key", "type": "string", "versions": "5+", + "about": "Client metadata key." }, + { "name": "Value", "type": "string", "versions": "5+", + "about": "Client metadata value." } + ] + } ] } diff --git a/clients/src/main/resources/common/message/GetConfigProfileKeysRequest.json b/clients/src/main/resources/common/message/GetConfigProfileKeysRequest.json new file mode 100644 index 0000000000000..931cc290b23b7 --- /dev/null +++ b/clients/src/main/resources/common/message/GetConfigProfileKeysRequest.json @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 93, + "type": "request", + "listeners": ["broker"], + "name": "GetConfigProfileKeysRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [] +} diff --git a/clients/src/main/resources/common/message/GetConfigProfileKeysResponse.json b/clients/src/main/resources/common/message/GetConfigProfileKeysResponse.json new file mode 100644 index 0000000000000..8958b4bc8e608 --- /dev/null +++ b/clients/src/main/resources/common/message/GetConfigProfileKeysResponse.json @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 93, + "type": "response", + "name": "GetConfigProfileKeysResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." + }, + { + "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code." + }, + { + "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." + }, + { + "name": "ConfigurationProfileCrc", "type": "int64", "versions": "0+", + "about": "CRC for the current configuration profile." + }, + { + "name": "ConfigMaxBytes", "type": "int32", "versions": "0+", + "about": "The maximum number of bytes for ConfigKeys in its serialized form, as specified by client.config.max.bytes." + }, + { + "name": "ConfigKeys", "type": "[]string", "versions": "0+", + "about": "The client configuration keys the server wants the client to send." + } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/PushConfigRequest.json b/clients/src/main/resources/common/message/PushConfigRequest.json new file mode 100644 index 0000000000000..4c5fc833c750a --- /dev/null +++ b/clients/src/main/resources/common/message/PushConfigRequest.json @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 94, + "type": "request", + "listeners": ["broker"], + "name": "PushConfigRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "ConfigurationProfileCrc", "type": "int64", "versions": "0+", + "about": "CRC for the current configuration profile." + }, + { + "name": "Configs", "type": "[]Config", "versions": "0+", + "about": "The client configuration entries.", + "fields": [ + { + "name": "ConfigKey", "type": "string", "versions": "0+", + "about": "The configuration key." + }, + { + "name": "ConfigValue", "type": "string", "versions": "0+", + "about": "The configuration value." + }, + { + "name": "ConfigType", "type": "int16", "versions": "0+", + "about": "Type (from ConfigDef.Type) of the ConfigValue field. Maps to ConfigDef.Type ordinal: 0=BOOLEAN, 1=STRING, 2=INT, 3=SHORT, 4=LONG, 5=DOUBLE, 6=LIST. CLASS(7) and PASSWORD(8) types are intentionally excluded." + } + ] + } + ] +} diff --git a/clients/src/main/resources/common/message/PushConfigResponse.json b/clients/src/main/resources/common/message/PushConfigResponse.json new file mode 100644 index 0000000000000..11f4207ed210f --- /dev/null +++ b/clients/src/main/resources/common/message/PushConfigResponse.json @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 94, + "type": "response", + "name": "PushConfigResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." + }, + { + "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no error." + }, + { + "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." + } + ] +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index e8dcf5843dcb8..8927d83f48ab2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -1151,7 +1151,7 @@ public void testReconnectAfterAddressChange() { NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, - time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender, + time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender, null, Long.MAX_VALUE, MetadataRecoveryStrategy.NONE); // Connect to one the initial addresses, then change the addresses and disconnect @@ -1212,7 +1212,7 @@ public void testFailedConnectionToFirstAddress() { NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, - time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender, + time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender, null, Long.MAX_VALUE, MetadataRecoveryStrategy.NONE); // First connection attempt should fail @@ -1265,7 +1265,7 @@ public void testFailedConnectionToFirstAddressAfterReconnect() { NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, - time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender, + time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender, null, Long.MAX_VALUE, MetadataRecoveryStrategy.NONE); // Connect to one the initial addresses, then change the addresses and disconnect @@ -1374,7 +1374,7 @@ public void testTelemetryRequest() { NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, - time, true, new ApiVersions(), null, new LogContext(), new DefaultHostResolver(), mockClientTelemetrySender, + time, true, new ApiVersions(), null, new LogContext(), new DefaultHostResolver(), mockClientTelemetrySender, null, Long.MAX_VALUE, MetadataRecoveryStrategy.NONE); // Send the ApiVersionsRequest diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 347f76135866d..865f4e6a6ca84 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -865,7 +865,7 @@ public void testConnectionsByClientMetric() throws Exception { selector.channel(node).channelMetadataRegistry().clientInformation()); // Metric with unknown / unknown should not be there, metric with A / B should be there - ClientInformation clientInformation = new ClientInformation("A", "B"); + ClientInformation clientInformation = new ClientInformation("A", "B", null); selector.channel(node).channelMetadataRegistry() .registerClientInformation(clientInformation); assertEquals(clientInformation, diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index ebc0f990ce1fe..0fd164ec3b272 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -1031,7 +1031,8 @@ private[kafka] class Processor( if (apiVersionsRequest.isValid) { channel.channelMetadataRegistry.registerClientInformation(new ClientInformation( apiVersionsRequest.data.clientSoftwareName, - apiVersionsRequest.data.clientSoftwareVersion)) + apiVersionsRequest.data.clientSoftwareVersion, + null)) } } requestChannel.sendRequest(req) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0065986438470..e345797fb874c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -64,6 +64,8 @@ import org.apache.kafka.security.DelegationTokenManager import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager, FetchManager, ProcessRole} import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.common.{GroupVersion, RequestLocal, ShareVersion, StreamsVersion, TransactionVersion} +import org.apache.kafka.server.config.ServerLogConfigs +import org.apache.kafka.server.policy.ClientConfigPolicy import org.apache.kafka.server.share.context.ShareFetchContext import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, SharePartitionKey} import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch @@ -72,6 +74,7 @@ import org.apache.kafka.server.transaction.AddPartitionsToTxnManager import org.apache.kafka.storage.internals.log.{AppendOrigin, RecordValidationStats} import org.apache.kafka.storage.log.metrics.BrokerTopicStats +import java.nio.charset.StandardCharsets import java.util import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{CompletableFuture, ConcurrentHashMap} @@ -120,6 +123,12 @@ class KafkaApis(val requestChannel: RequestChannel, val configManager = new ConfigAdminManager(brokerId, config, configRepository) val describeTopicPartitionsRequestHandler = new DescribeTopicPartitionsRequestHandler( metadataCache, authHelper, config) + val clientConfigPolicy: Option[ClientConfigPolicy] = + Option(config.getConfiguredInstance( + ServerLogConfigs.CLIENT_CONFIG_POLICY_CLASS_NAME_CONFIG, + classOf[ClientConfigPolicy])) + val clientConfigMaxBytes: Int = + config.getInt(ServerLogConfigs.CLIENT_CONFIG_MAX_BYTES_CONFIG) def close(): Unit = { aclApis.close() @@ -225,7 +234,9 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError) case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError) case ApiKeys.DESCRIBE_TOPIC_PARTITIONS => handleDescribeTopicPartitionsRequest(request) + case ApiKeys.GET_CONFIG_PROFILE_KEYS => handleGetConfigProfileKeysRequest(request) case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => handleGetTelemetrySubscriptionsRequest(request) + case ApiKeys.PUSH_CONFIG => handlePushConfigRequest(request) case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request) case ApiKeys.LIST_CONFIG_RESOURCES => handleListConfigResources(request) case ApiKeys.ADD_RAFT_VOTER => forwardToController(request) @@ -2977,6 +2988,122 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleGetConfigProfileKeysRequest(request: RequestChannel.Request): Unit = { + authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + + val clientInfo = request.context.clientInformation + // TODO: Extract clientInstanceId and clientMetadata from ApiVersionsRequest context + // For now, using placeholder values - need to update RequestContext/ClientInformation + // to store these fields from ApiVersionsRequest v5 + val clientInstanceId = org.apache.kafka.common.Uuid.ZERO_UUID // TODO: Get from context + val clientMetadata = new java.util.TreeMap[String, String]() // TODO: Get from context + + val clientProfile = new org.apache.kafka.server.policy.ClientProfile( + clientInstanceId, + clientInfo.softwareName, + clientInfo.softwareVersion, + clientMetadata + ) + + val (errorCode, errorMessage, crc, configKeys) = clientConfigPolicy match { + case Some(policy) => + try { + val profileKeysOpt = policy.profileKeys(clientProfile) + OptionConverters.toScala(profileKeysOpt) match { + case Some(profileKeys) => + val keys: List[String] = profileKeys.keys().asScala.toList + (Errors.NONE.code, null, profileKeys.crc(), keys) + case None => + // No profile matched, return empty config keys + (Errors.NONE.code, null, 0L, List.empty[String]) + } + } catch { + case e: org.apache.kafka.common.errors.UnknownConfigProfileException => + (Errors.UNKNOWN_CONFIG_PROFILE.code, e.getMessage, 0L, List.empty[String]) + case e: Exception => + (Errors.UNKNOWN_SERVER_ERROR.code, e.getMessage, 0L, List.empty[String]) + } + case None => + // No policy configured, return empty config keys + (Errors.NONE.code, null, 0L, List.empty[String]) + } + + val configKeysList = configKeys.asJava + + val responseData = new GetConfigProfileKeysResponseData() + .setErrorCode(errorCode.toShort) + .setErrorMessage(errorMessage) + .setConfigurationProfileCrc(crc) + .setConfigMaxBytes(clientConfigMaxBytes) + .setConfigKeys(configKeysList) + + requestHelper.sendResponseMaybeThrottle(request, + requestThrottleMs => new GetConfigProfileKeysResponse(responseData.setThrottleTimeMs(requestThrottleMs))) + } + + def handlePushConfigRequest(request: RequestChannel.Request): Unit = { + authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + + val clientInfo = request.context.clientInformation + val requestData = request.body[PushConfigRequest].data + val configsData = requestData.configs.asScala + val configs = configsData.map(c => c.configKey -> c.configValue).toMap + + // Calculate size + val configsSize = configs.foldLeft(0)((acc, kv) => + acc + kv._1.getBytes(StandardCharsets.UTF_8).length + + kv._2.getBytes(StandardCharsets.UTF_8).length) + + val (errorCode, errorMessage) = if (configsSize > clientConfigMaxBytes) { + (Errors.CONFIG_TOO_LARGE.code, s"Configuration payload size ($configsSize bytes) exceeds limit ($clientConfigMaxBytes bytes)") + } else { + clientConfigPolicy match { + case Some(policy) => + try { + // TODO: Extract clientInstanceId and clientMetadata from ApiVersionsRequest context + val clientInstanceId = org.apache.kafka.common.Uuid.ZERO_UUID // TODO: Get from context + val clientMetadata = new java.util.TreeMap[String, String]() // TODO: Get from context + + val clientProfile = new org.apache.kafka.server.policy.ClientProfile( + clientInstanceId, + clientInfo.softwareName, + clientInfo.softwareVersion, + clientMetadata + ) + + val timestamp = time.milliseconds() + val pushConfigData = new org.apache.kafka.server.policy.ClientPushConfigData( + clientProfile, + configs.asJava, + timestamp + ) + + policy.process(pushConfigData) + (Errors.NONE.code, null) + } catch { + case e: org.apache.kafka.common.errors.UnknownConfigProfileException => + (Errors.UNKNOWN_CONFIG_PROFILE.code, e.getMessage) + case e: org.apache.kafka.common.errors.ConfigTooLargeException => + (Errors.CONFIG_TOO_LARGE.code, e.getMessage) + case e: org.apache.kafka.common.errors.ClientConfigPolicyException => + (Errors.INVALID_CONFIG.code, e.getMessage) + case e: Exception => + (Errors.UNKNOWN_SERVER_ERROR.code, e.getMessage) + } + case None => + // No policy configured, accept all + (Errors.NONE.code, null) + } + } + + val responseData = new PushConfigResponseData() + .setErrorCode(errorCode) + .setErrorMessage(errorMessage) + + requestHelper.sendResponseMaybeThrottle(request, + requestThrottleMs => new PushConfigResponse(responseData.setThrottleTimeMs(requestThrottleMs))) + } + /** * Handle ListConfigResourcesRequest. If resourceTypes are not specified, it uses ListConfigResourcesRequest#supportedResourceTypes * to retrieve config resources. If resourceTypes are specified, it returns matched config resources. diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala index df10430ad7246..43ee4b3e4472d 100644 --- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala +++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala @@ -307,7 +307,7 @@ class RequestChannelTest { new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user"), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, - new ClientInformation("name", "version"), + new ClientInformation("name", "version", null), false) } diff --git a/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala b/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala index 400e13dc6bef7..90ed44eba617a 100644 --- a/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala +++ b/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala @@ -128,7 +128,7 @@ class RequestConvertToJsonTest { new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user"), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, - new ClientInformation("name", "version"), + new ClientInformation("name", "version", null), false) } } diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java index d8ffd8a5e2f1d..453dae17da34b 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java @@ -164,6 +164,15 @@ public class ServerLogConfigs { "implement the org.apache.kafka.server.policy.AlterConfigPolicy interface. " + "

Note: This policy runs on the controller instead of the broker.

"; + public static final String CLIENT_CONFIG_POLICY_CLASS_NAME_CONFIG = "client.config.policy.class.name"; + public static final String CLIENT_CONFIG_POLICY_CLASS_NAME_DOC = "The client configuration policy class that should be used for validating client configurations. " + + "The class should implement the org.apache.kafka.server.policy.ClientConfigPolicy interface."; + + public static final String CLIENT_CONFIG_MAX_BYTES_CONFIG = "client.config.max.bytes"; + public static final int CLIENT_CONFIG_MAX_BYTES_DEFAULT = 1048576; // 1MB + public static final String CLIENT_CONFIG_MAX_BYTES_DOC = "Maximum size in bytes for client configuration data in PushConfig requests. " + + "Requests exceeding this limit will be rejected with CONFIG_TOO_LARGE error."; + public static final String LOG_INITIAL_TASK_DELAY_MS_CONFIG = LOG_PREFIX + "initial.task.delay.ms"; public static final long LOG_INITIAL_TASK_DELAY_MS_DEFAULT = 30 * 1000L; public static final String LOG_INITIAL_TASK_DELAY_MS_DOC = "The initial task delay in millisecond when initializing " + diff --git a/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java b/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java index be0e4337c1413..42cc5b6fc5417 100644 --- a/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java +++ b/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java @@ -120,7 +120,7 @@ public void testAllResponseTypesHandled() { @Test public void testClientInfoNode() { - ClientInformation clientInfo = new ClientInformation("name", "1"); + ClientInformation clientInfo = new ClientInformation("name", "1", null); ObjectNode expectedNode = JsonNodeFactory.instance.objectNode(); expectedNode.set("softwareName", new TextNode(clientInfo.softwareName())); expectedNode.set("softwareVersion", new TextNode(clientInfo.softwareVersion())); diff --git a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java index 8e8bb21b7da81..419ac130f0add 100644 --- a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java +++ b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java @@ -68,7 +68,7 @@ public static RequestContext requestContext() throws UnknownHostException { KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, - new ClientInformation("apache-kafka-java", "3.5.2"), + new ClientInformation("apache-kafka-java", "3.5.2", null), false); } @@ -94,7 +94,7 @@ public static RequestContext requestContextWithConnectionId(String connectionId) KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, - new ClientInformation("apache-kafka-java", "3.5.2"), + new ClientInformation("apache-kafka-java", "3.5.2", null), false); } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index f81d224e7ea7f..8f8dd9248e961 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -179,6 +179,8 @@ public Optional serverConfigName(String configName) { .define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT, atLeast(0), MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC) .define(ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_DOC) .define(ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_DOC) + .define(ServerLogConfigs.CLIENT_CONFIG_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.CLIENT_CONFIG_POLICY_CLASS_NAME_DOC) + .define(ServerLogConfigs.CLIENT_CONFIG_MAX_BYTES_CONFIG, INT, ServerLogConfigs.CLIENT_CONFIG_MAX_BYTES_DEFAULT, atLeast(0), LOW, ServerLogConfigs.CLIENT_CONFIG_MAX_BYTES_DOC) .define(ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG, LONG, ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DEFAULT, atLeast(1), LOW, ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DOC) .defineInternal(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, LONG, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT, atLeast(0), LOW, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DOC);