diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientConfigPushIntegrationTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientConfigPushIntegrationTest.java
new file mode 100644
index 0000000000000..fee93d640df1c
--- /dev/null
+++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientConfigPushIntegrationTest.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ClientConfigPolicyException;
+import org.apache.kafka.common.errors.UnknownConfigProfileException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.server.config.ServerLogConfigs;
+import org.apache.kafka.server.policy.ClientConfigPolicy;
+import org.apache.kafka.server.policy.ClientConfigProfileKeys;
+import org.apache.kafka.server.policy.ClientProfile;
+import org.apache.kafka.server.policy.ClientPushConfigData;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for the client configuration push handshake (KIP-CFG).
+ *
+ * Tests the end-to-end flow:
+ * 1. Client sends GetConfigProfileKeys request
+ * 2. Broker responds with profile keys and CRC
+ * 3. Client sends PushConfig request with collected configs
+ * 4. Broker validates and processes the config
+ */
+@ClusterTestDefaults(
+ brokers = 1,
+ serverProperties = {
+ @ClusterConfigProperty(key = ServerLogConfigs.CLIENT_CONFIG_POLICY_CLASS_NAME_CONFIG,
+ value = "org.apache.kafka.clients.ClientConfigPushIntegrationTest$TestClientConfigPolicy"),
+ @ClusterConfigProperty(key = ServerLogConfigs.CLIENT_CONFIG_MAX_BYTES_CONFIG, value = "1048576")
+ }
+)
+public class ClientConfigPushIntegrationTest {
+
+ private final ClusterInstance clusterInstance;
+ private TestClientConfigPolicy clientConfigPolicy;
+
+ public ClientConfigPushIntegrationTest(ClusterInstance clusterInstance) {
+ this.clusterInstance = clusterInstance;
+ }
+
+ @BeforeEach
+ public void setup() throws InterruptedException {
+ clusterInstance.waitForReadyBrokers();
+ clientConfigPolicy = new TestClientConfigPolicy();
+ }
+
+ @ClusterTest
+ public void testProducerConfigPushHandshake() throws Exception {
+ Map producerConfig = new java.util.HashMap<>();
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "test-producer-config-push");
+ producerConfig.put(CommonClientConfigs.ENABLE_CONFIGS_PUSH_CONFIG, true);
+
+ try (Producer producer = new KafkaProducer<>(producerConfig)) {
+ // Give time for handshake to complete
+ Thread.sleep(2000);
+
+ // Verify policy was called
+ assertTrue(clientConfigPolicy.profileKeysCallCount.get() > 0, "profileKeys() should have been called");
+ assertTrue(clientConfigPolicy.processCallCount.get() > 0, "process() should have been called");
+
+ // Verify we received a client profile
+ assertFalse(clientConfigPolicy.receivedProfiles.isEmpty(), "Should have received client profile");
+
+ // Verify we received configs
+ assertFalse(clientConfigPolicy.receivedConfigs.isEmpty(), "Should have received client configs");
+
+ // Verify configs were collected (should have client.id but not bootstrap.servers)
+ Map configs = clientConfigPolicy.receivedConfigs.values().iterator().next();
+ assertTrue(configs.containsKey(ProducerConfig.CLIENT_ID_CONFIG),
+ "Should contain client.id config");
+ assertFalse(configs.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
+ "Should NOT contain bootstrap.servers (sensitive)");
+ }
+ }
+
+ @ClusterTest
+ public void testConsumerConfigPushHandshake() throws Exception {
+ Map consumerConfig = new java.util.HashMap<>();
+ consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
+ consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-config-push");
+ consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerConfig.put(CommonClientConfigs.ENABLE_CONFIGS_PUSH_CONFIG, true);
+
+ try (Consumer consumer = new KafkaConsumer<>(consumerConfig)) {
+ // Give time for handshake to complete
+ Thread.sleep(2000);
+
+ // Verify policy was called
+ assertTrue(clientConfigPolicy.profileKeysCallCount.get() > 0, "profileKeys() should have been called");
+ assertTrue(clientConfigPolicy.processCallCount.get() > 0, "process() should have been called");
+
+ // Verify we received configs
+ assertFalse(clientConfigPolicy.receivedConfigs.isEmpty(), "Should have received client configs");
+
+ Map configs = clientConfigPolicy.receivedConfigs.values().iterator().next();
+ assertTrue(configs.containsKey(ConsumerConfig.GROUP_ID_CONFIG),
+ "Should contain group.id config");
+ assertTrue(configs.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
+ "Should contain auto.offset.reset config");
+ }
+ }
+
+ @ClusterTest
+ public void testConfigPushWithEmptyProfile() throws Exception {
+ // Configure policy to return empty profile (no config keys requested)
+ clientConfigPolicy.returnEmptyProfile = true;
+
+ Map producerConfig = new java.util.HashMap<>();
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerConfig.put(CommonClientConfigs.ENABLE_CONFIGS_PUSH_CONFIG, true);
+
+ try (Producer producer = new KafkaProducer<>(producerConfig)) {
+ // Give time for handshake to complete
+ Thread.sleep(2000);
+
+ // profileKeys() should be called
+ assertTrue(clientConfigPolicy.profileKeysCallCount.get() > 0, "profileKeys() should have been called");
+
+ // But process() should NOT be called since no configs were requested
+ assertEquals(0, clientConfigPolicy.processCallCount.get(), "process() should NOT have been called with empty profile");
+ }
+ }
+
+ @ClusterTest
+ public void testConfigPushDisabled() throws Exception {
+ Map producerConfig = new java.util.HashMap<>();
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ // Disable config push
+ producerConfig.put(CommonClientConfigs.ENABLE_CONFIGS_PUSH_CONFIG, false);
+
+ try (Producer producer = new KafkaProducer<>(producerConfig)) {
+ // Give time to ensure no handshake occurs
+ Thread.sleep(2000);
+
+ // Policy should NOT be called when disabled
+ assertEquals(0, clientConfigPolicy.profileKeysCallCount.get(), "profileKeys() should NOT be called when disabled");
+ assertEquals(0, clientConfigPolicy.processCallCount.get(), "process() should NOT be called when disabled");
+ }
+ }
+
+ @ClusterTest
+ public void testClientProfileContainsMetadata() throws Exception {
+ Map producerConfig = new java.util.HashMap<>();
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "metadata-test-producer");
+ producerConfig.put(CommonClientConfigs.ENABLE_CONFIGS_PUSH_CONFIG, true);
+
+ try (Producer producer = new KafkaProducer<>(producerConfig)) {
+ Thread.sleep(2000);
+
+ assertFalse(clientConfigPolicy.receivedProfiles.isEmpty(), "Should have received client profile");
+ ClientProfile profile = clientConfigPolicy.receivedProfiles.values().iterator().next();
+
+ assertNotNull(profile.clientInstanceId(), "Client instance ID should not be null");
+ assertNotNull(profile.clientSoftwareName(), "Client software name should not be null");
+ assertNotNull(profile.clientSoftwareVersion(), "Client software version should not be null");
+ assertNotNull(profile.clientMetadata(), "Client metadata should not be null");
+
+ // Verify software name is populated (e.g., "apache-kafka-java")
+ assertFalse(profile.clientSoftwareName().isEmpty(),
+ "Client software name should not be empty");
+ }
+ }
+
+ @ClusterTest
+ public void testCrcBasedProfileChangeDetection() throws Exception {
+ Map producerConfig = new java.util.HashMap<>();
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerConfig.put(CommonClientConfigs.ENABLE_CONFIGS_PUSH_CONFIG, true);
+
+ try (Producer producer = new KafkaProducer<>(producerConfig)) {
+ Thread.sleep(2000);
+
+ // Verify CRC was computed and is non-zero
+ assertTrue(clientConfigPolicy.profileKeysCallCount.get() > 0, "profileKeys() should have been called");
+
+ // The CRC should be deterministic based on the keys
+ // (We can't easily verify the exact value here, but we verified it's used)
+ }
+ }
+
+ /**
+ * Test implementation of ClientConfigPolicy for integration tests.
+ *
+ * This policy:
+ * - Returns a standard set of config keys for all clients
+ * - Captures received profiles and configs for verification
+ * - Supports test scenarios via static flags
+ */
+ public static class TestClientConfigPolicy implements ClientConfigPolicy {
+
+ private final Map receivedProfiles = new ConcurrentHashMap<>();
+ private final Map> receivedConfigs = new ConcurrentHashMap<>();
+ private final AtomicInteger profileKeysCallCount = new AtomicInteger(0);
+ private final AtomicInteger processCallCount = new AtomicInteger(0);
+ private boolean returnEmptyProfile;
+ private boolean throwUnknownProfileOnKeys;
+ private boolean rejectNextPush;
+
+ private final SortedSet standardConfigKeys = new TreeSet<>(Set.of(
+ "client.id",
+ "request.timeout.ms",
+ "retry.backoff.ms",
+ "metadata.max.age.ms",
+ "send.buffer.bytes",
+ "receive.buffer.bytes",
+ "reconnect.backoff.ms",
+ "reconnect.backoff.max.ms"
+ ));
+
+ @Override
+ public void configure(Map configs) {
+ // No configuration needed for test policy
+ }
+
+ @Override
+ public Set reconfigurableConfigs() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public void validateReconfiguration(Map configs) {
+ // No reconfiguration validation needed
+ }
+
+ @Override
+ public void reconfigure(Map configs) {
+ // No reconfiguration needed
+ }
+
+ @Override
+ public Optional profileKeys(ClientProfile clientProfile) {
+ profileKeysCallCount.incrementAndGet();
+
+ // Store the profile for verification
+ receivedProfiles.put(clientProfile.clientInstanceId(), clientProfile);
+
+ if (throwUnknownProfileOnKeys) {
+ throw new UnknownConfigProfileException("Test: Unknown profile");
+ }
+
+ if (returnEmptyProfile) {
+ return Optional.empty();
+ }
+
+ // Return standard config keys
+ long crc = configurationProfileCrc(standardConfigKeys);
+ return Optional.of(new ClientConfigProfileKeys(standardConfigKeys, crc));
+ }
+
+ @Override
+ public void process(ClientPushConfigData pushConfigData) {
+ processCallCount.incrementAndGet();
+
+ // Store configs for verification
+ receivedConfigs.put(
+ pushConfigData.clientProfile().clientInstanceId(),
+ pushConfigData.configs()
+ );
+
+ if (rejectNextPush) {
+ throw new ClientConfigPolicyException("Test: Config validation failed");
+ }
+
+ // Validate that we received some configs
+ if (pushConfigData.configs().isEmpty()) {
+ throw new ClientConfigPolicyException("No configs received");
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ // Cleanup if needed
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientConfigsSender.java b/clients/src/main/java/org/apache/kafka/clients/ClientConfigsSender.java
new file mode 100644
index 0000000000000..8298d49f38bdb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientConfigsSender.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.GetConfigProfileKeysResponse;
+import org.apache.kafka.common.requests.PushConfigResponse;
+
+import java.util.Optional;
+
+/**
+ * Interface for managing the client configuration push handshake with brokers.
+ *
+ * This is a one-time, best-effort operation performed during client initialization
+ * to push non-sensitive configuration to the broker for observability purposes.
+ *
+ * The handshake consists of two steps:
+ *
+ * - GetConfigProfileKeys - Broker tells client what configs it wants
+ * - PushConfig - Client sends the requested configs
+ *
+ */
+public interface ClientConfigsSender extends AutoCloseable {
+
+ /**
+ * Returns true if the config push handshake needs to proceed.
+ *
+ * Once the handshake is completed or fails, this should return false.
+ *
+ * @return true if handshake should continue, false if terminal state reached
+ */
+ boolean shouldAttemptHandshake();
+
+ /**
+ * Creates the next request in the handshake flow based on current state.
+ *
+ * Returns GetConfigProfileKeysRequest if profile keys are needed, or
+ * PushConfigRequest if ready to push configs.
+ *
+ * @return Optional containing the next request builder, or empty if no request needed
+ */
+ Optional> createRequest();
+
+ /**
+ * Handle successful GetConfigProfileKeys response.
+ *
+ * This extracts the profile keys (configuration profile CRC, requested keys, max bytes)
+ * and prepares for the PushConfig step.
+ *
+ * @param response the profile keys response from broker
+ */
+ void handleResponse(GetConfigProfileKeysResponse response);
+
+ /**
+ * Handle successful PushConfig response.
+ *
+ * This completes the handshake or handles errors like UNKNOWN_CONFIG_PROFILE.
+ *
+ * @param response the push config response from broker
+ */
+ void handleResponse(PushConfigResponse response);
+
+ /**
+ * Handle get configs subscription request failure.
+ *
+ * @param kafkaException the fatal exception.
+ */
+ void handleFailedGetConfigsSubscriptionRequest(KafkaException kafkaException);
+
+ /**
+ * Handle push configs request failure.
+ *
+ * @param kafkaException the fatal exception.
+ */
+ void handleFailedPushConfigsRequest(KafkaException kafkaException);
+
+ /**
+ * Handle disconnection during the handshake.
+ *
+ * If a connection is lost during the handshake, mark it as failed.
+ */
+ void handleDisconnect();
+
+ /**
+ * Returns the client instance ID.
+ *
+ * This is a UUID v4 generated by the client before any network activity and is
+ * shared across KIP-714 (telemetry) and KIP-CFG (config push).
+ *
+ * @return the client instance ID
+ */
+ Uuid clientInstanceId();
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index c295713df2370..281dbbb3c1135 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -159,7 +159,8 @@ public static NetworkClient createNetworkClient(AbstractConfig config,
int maxInFlightRequestsPerConnection,
Metadata metadata,
Sensor throttleTimeSensor,
- ClientTelemetrySender clientTelemetrySender) {
+ ClientTelemetrySender clientTelemetrySender,
+ ClientConfigsSender clientConfigsSender) {
return createNetworkClient(config,
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG),
metrics,
@@ -173,7 +174,8 @@ public static NetworkClient createNetworkClient(AbstractConfig config,
null,
new DefaultHostResolver(),
throttleTimeSensor,
- clientTelemetrySender);
+ clientTelemetrySender,
+ clientConfigsSender);
}
public static NetworkClient createNetworkClient(AbstractConfig config,
@@ -200,6 +202,7 @@ public static NetworkClient createNetworkClient(AbstractConfig config,
metadataUpdater,
hostResolver,
null,
+ null,
null);
}
@@ -216,7 +219,8 @@ public static NetworkClient createNetworkClient(AbstractConfig config,
MetadataUpdater metadataUpdater,
HostResolver hostResolver,
Sensor throttleTimeSensor,
- ClientTelemetrySender clientTelemetrySender) {
+ ClientTelemetrySender clientTelemetrySender,
+ ClientConfigsSender clientConfigsSender) {
ChannelBuilder channelBuilder = null;
Selector selector = null;
@@ -247,6 +251,7 @@ public static NetworkClient createNetworkClient(AbstractConfig config,
logContext,
hostResolver,
clientTelemetrySender,
+ clientConfigsSender,
config.getLong(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG),
MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
);
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 08b861673e3d7..da1c173ade9e1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -111,6 +111,9 @@ public class CommonClientConfigs {
public static final String ENABLE_METRICS_PUSH_CONFIG = "enable.metrics.push";
public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable pushing of client metrics to the cluster, if the cluster has a client metrics subscription which matches this client.";
+ public static final String ENABLE_CONFIGS_PUSH_CONFIG = "enable.configs.push";
+ public static final String ENABLE_CONFIGS_PUSH_DOC = "When set to 'true', the consumer will push its configuration to the broker for observability and troubleshooting.";
+
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
public static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The window of time a metrics sample is computed over.";
@@ -327,4 +330,12 @@ public static Optional telemetryReporter(String clientI
telemetryReporter.configure(config.originals(Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId)));
return Optional.of(telemetryReporter);
}
+
+ public static Optional configsSender(AbstractConfig config) {
+ if (!config.getBoolean(CommonClientConfigs.ENABLE_CONFIGS_PUSH_CONFIG)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(new DefaultClientConfigsSender(config));
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/ConfigCollector.java b/clients/src/main/java/org/apache/kafka/clients/ConfigCollector.java
new file mode 100644
index 0000000000000..8093e9c9a223d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/ConfigCollector.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.message.PushConfigRequestData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+
+/**
+ * Utility class for collecting and filtering client configuration for transmission to brokers.
+ * This class handles filtering out sensitive configuration (passwords, security settings, etc.)
+ * and converting configuration to the format required by the PushConfig RPC.
+ */
+public class ConfigCollector {
+ private static final Logger log = LoggerFactory.getLogger(ConfigCollector.class);
+
+ /**
+ * Collect non-sensitive configuration values for transmission to broker.
+ *
+ * @param config Client configuration (ConsumerConfig, ProducerConfig, etc.)
+ * @param requestedKeys Keys requested by broker ("*" for all non-sensitive)
+ * @param maxBytes Maximum payload size in bytes
+ * @return List of config entries ready for PushConfigRequest
+ */
+ public static List collectConfigs(
+ AbstractConfig config,
+ List requestedKeys,
+ int maxBytes) {
+
+ List result = new ArrayList<>();
+
+ // Expand wildcard "*" to all keys
+ Set keysToInclude = expandKeys(config, requestedKeys);
+
+ // Filter and convert
+ int currentBytes = 0;
+ for (String key : keysToInclude) {
+ if (shouldExclude(key, config)) {
+ continue; // Skip sensitive configs
+ }
+
+ Object value = config.values().get(key);
+ if (value == null) {
+ continue;
+ }
+
+ ConfigDef.Type type = config.typeOf(key);
+ if (type == null) {
+ continue; // Unknown config
+ }
+
+ PushConfigRequestData.Config entry =
+ convertToConfig(key, value, type);
+
+ // Check size limit
+ int entrySize = estimateSize(entry);
+ if (currentBytes + entrySize > maxBytes) {
+ log.warn("Config payload would exceed {} bytes, truncating at {} entries",
+ maxBytes, result.size());
+ break;
+ }
+
+ result.add(entry);
+ currentBytes += entrySize;
+ }
+
+ log.debug("Collected {} config entries ({} bytes)", result.size(), currentBytes);
+ return result;
+ }
+
+ /**
+ * Expand wildcard or specific key list to actual keys to include.
+ */
+ private static Set expandKeys(AbstractConfig config, List requestedKeys) {
+ Set keysToInclude = new HashSet<>();
+
+ for (String requestedKey : requestedKeys) {
+ if ("*".equals(requestedKey)) {
+ // Wildcard - include all keys from config
+ keysToInclude.addAll(config.values().keySet());
+ } else {
+ // Specific key requested
+ keysToInclude.add(requestedKey);
+ }
+ }
+
+ return keysToInclude;
+ }
+
+ /**
+ * Determine if a config key should be excluded from transmission.
+ * Excludes passwords, security settings, class names, and other sensitive data.
+ */
+ private static boolean shouldExclude(String key, AbstractConfig config) {
+ ConfigDef.Type type = config.typeOf(key);
+
+ // 1. Exclude PASSWORD type
+ if (type == ConfigDef.Type.PASSWORD) {
+ return true;
+ }
+
+ // 2. Exclude CLASS type (per KIP requirement)
+ if (type == ConfigDef.Type.CLASS) {
+ return true;
+ }
+
+ // 3. Exclude bootstrap.servers
+ if ("bootstrap.servers".equals(key)) {
+ return true;
+ }
+
+ // 4. Exclude security/auth related keys
+ String lowerKey = key.toLowerCase(Locale.ROOT);
+ if (lowerKey.contains("sasl.") ||
+ lowerKey.contains("ssl.") ||
+ lowerKey.contains("security.")) {
+ return true;
+ }
+
+ // 5. Exclude keys ending with sensitive suffixes
+ if (lowerKey.endsWith(".password") ||
+ lowerKey.endsWith(".secret") ||
+ lowerKey.endsWith(".key") ||
+ lowerKey.endsWith(".token")) {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Convert a config entry to the protocol format.
+ */
+ private static PushConfigRequestData.Config convertToConfig(
+ String key,
+ Object value,
+ ConfigDef.Type type) {
+
+ PushConfigRequestData.Config config = new PushConfigRequestData.Config();
+ config.setConfigKey(key);
+ config.setConfigValue(String.valueOf(value));
+ config.setConfigType(mapConfigType(type));
+ return config;
+ }
+
+ /**
+ * Convert ConfigDef.Type to protocol short value (int16).
+ * Maps to ConfigDef.Type ordinal values. CLASS and PASSWORD types are excluded by filtering.
+ */
+ private static short mapConfigType(ConfigDef.Type type) {
+ switch (type) {
+ case BOOLEAN:
+ return 0;
+ case STRING:
+ return 1;
+ case INT:
+ return 2;
+ case SHORT:
+ return 3;
+ case LONG:
+ return 4;
+ case DOUBLE:
+ return 5;
+ case LIST:
+ return 6;
+ case CLASS:
+ return 7; // Should never reach here due to filtering
+ case PASSWORD:
+ return 8; // Should never reach here due to filtering
+ default:
+ return 1; // Default to STRING
+ }
+ }
+
+ /**
+ * Estimate the size of a config entry in bytes.
+ * This is a rough estimate for checking against maxBytes limit.
+ */
+ private static int estimateSize(PushConfigRequestData.Config config) {
+ // Rough estimate: key length + value length + overhead for type and framing
+ return config.configKey().length() + config.configValue().length() + 10;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/DefaultClientConfigsSender.java b/clients/src/main/java/org/apache/kafka/clients/DefaultClientConfigsSender.java
new file mode 100644
index 0000000000000..9b7f2711f78ba
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/DefaultClientConfigsSender.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.message.GetConfigProfileKeysRequestData;
+import org.apache.kafka.common.message.GetConfigProfileKeysResponseData;
+import org.apache.kafka.common.message.PushConfigRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.GetConfigProfileKeysRequest;
+import org.apache.kafka.common.requests.GetConfigProfileKeysResponse;
+import org.apache.kafka.common.requests.PushConfigRequest;
+import org.apache.kafka.common.requests.PushConfigResponse;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Default implementation of ClientConfigsSender that manages the config push handshake.
+ *
+ * This implementation follows a simple state machine:
+ *
+ * NOT_STARTED → PROFILE_KEYS_IN_PROGRESS → PUSH_IN_PROGRESS → COMPLETED/FAILED
+ *
+ */
+public class DefaultClientConfigsSender implements ClientConfigsSender {
+
+ private static final Logger log = LoggerFactory.getLogger(DefaultClientConfigsSender.class);
+
+ @Override
+ public void close() throws Exception {
+
+ }
+
+ private enum State {
+ NOT_STARTED, // Initial state, need to send GetConfigProfileKeys
+ PROFILE_KEYS_IN_PROGRESS, // Waiting for GetConfigProfileKeys response
+ PUSH_IN_PROGRESS, // Waiting for PushConfig response
+ COMPLETED, // Successfully pushed config
+ FAILED // Failed (but client continues)
+ }
+
+ private final AbstractConfig clientConfig;
+ private volatile Uuid clientInstanceId = Uuid.ZERO_UUID;
+ private volatile State state = State.NOT_STARTED;
+ private volatile long configurationProfileCrc = -1L;
+ private volatile int configMaxBytes = 0;
+ private volatile List requestedConfigKeys = new ArrayList<>();
+
+ public DefaultClientConfigsSender(AbstractConfig clientConfig) {
+ this.clientConfig = clientConfig;
+ }
+
+ @Override
+ public boolean shouldAttemptHandshake() {
+ return state != State.COMPLETED && state != State.FAILED;
+ }
+
+ @Override
+ public synchronized Optional> createRequest() {
+ switch (state) {
+ case NOT_STARTED:
+ log.debug("Creating GetConfigProfileKeys request");
+ state = State.PROFILE_KEYS_IN_PROGRESS;
+ return Optional.of(createGetConfigProfileKeysRequest());
+
+ case PROFILE_KEYS_IN_PROGRESS:
+ // Waiting for profile keys response, no new request to send
+ return Optional.empty();
+
+ case PUSH_IN_PROGRESS:
+ // Check if we have subscription details and need to send push request
+ if (needsPushRequest()) {
+ PushConfigRequest.Builder builder = createPushConfigRequest();
+ if (builder != null) {
+ // Mark that we've created the push request
+ requestedConfigKeys.clear(); // Clear to avoid creating duplicate requests
+ return Optional.of(builder);
+ }
+ }
+ return Optional.empty();
+
+ default:
+ // Terminal states (COMPLETED, FAILED)
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public synchronized void handleResponse(GetConfigProfileKeysResponse response) {
+ if (state != State.PROFILE_KEYS_IN_PROGRESS) {
+ log.warn("Received GetConfigProfileKeys response in unexpected state: {}", state);
+ return;
+ }
+
+ Errors error = response.error();
+ if (error != Errors.NONE) {
+ log.warn("GetConfigProfileKeys request failed with error: {} - {}",
+ error, response.data().errorMessage());
+ state = State.FAILED;
+ return;
+ }
+
+ GetConfigProfileKeysResponseData data = response.data();
+
+ // Store configuration profile CRC
+ configurationProfileCrc = data.configurationProfileCrc();
+ configMaxBytes = data.configMaxBytes();
+
+ // Extract requested keys (now simple string array, not nested structure)
+ requestedConfigKeys = new ArrayList<>(data.configKeys());
+
+ log.debug("Config profile received: crc={}, maxBytes={}, keys={}",
+ configurationProfileCrc, configMaxBytes, requestedConfigKeys.size());
+
+ // Transition to next state - PushConfig will be created on next createRequest() call
+ state = State.PUSH_IN_PROGRESS;
+ }
+
+ @Override
+ public synchronized void handleResponse(PushConfigResponse response) {
+ if (state != State.PUSH_IN_PROGRESS) {
+ log.warn("Received PushConfig response in unexpected state: {}", state);
+ return;
+ }
+
+ Errors error = response.error();
+
+ if (error == Errors.NONE) {
+ log.info("Configuration push completed successfully");
+ state = State.COMPLETED;
+
+ } else if (error == Errors.INVALID_CONFIG) {
+ // Log error message from the response
+ String errorMessage = response.data().errorMessage();
+ if (errorMessage != null && !errorMessage.isEmpty()) {
+ log.error("Configuration push failed: INVALID_CONFIG - {}", errorMessage);
+ } else {
+ log.error("Configuration push failed: INVALID_CONFIG (no details provided)");
+ }
+ state = State.FAILED;
+
+ } else if (error == Errors.UNKNOWN_CONFIG_PROFILE) {
+ log.warn("Configuration profile changed, retrying GetConfigProfileKeys");
+ // Reset to retry once
+ state = State.NOT_STARTED;
+ configurationProfileCrc = -1L;
+ requestedConfigKeys.clear();
+
+ } else if (error == Errors.CONFIG_TOO_LARGE) {
+ String errorMessage = response.data().errorMessage();
+ log.error("Config payload too large, cannot retry: {}",
+ errorMessage != null ? errorMessage : "");
+ state = State.FAILED;
+
+ } else {
+ String errorMessage = response.data().errorMessage();
+ log.warn("PushConfig failed with error: {} - {}",
+ error, errorMessage != null ? errorMessage : "");
+ state = State.FAILED;
+ }
+ }
+
+ @Override
+ public void handleFailedGetConfigsSubscriptionRequest(KafkaException kafkaException) {
+
+ }
+
+ @Override
+ public void handleFailedPushConfigsRequest(KafkaException kafkaException) {
+
+ }
+
+ @Override
+ public synchronized void handleDisconnect() {
+ if (state == State.PROFILE_KEYS_IN_PROGRESS || state == State.PUSH_IN_PROGRESS) {
+ log.debug("Disconnected during config push handshake");
+ state = State.FAILED;
+ }
+ }
+
+ @Override
+ public Uuid clientInstanceId() {
+ return clientInstanceId;
+ }
+
+ private GetConfigProfileKeysRequest.Builder createGetConfigProfileKeysRequest() {
+ // No fields in GetConfigProfileKeysRequest - client profile comes from ApiVersionsRequest context
+ GetConfigProfileKeysRequestData requestData = new GetConfigProfileKeysRequestData();
+
+ return new GetConfigProfileKeysRequest.Builder(requestData);
+ }
+
+ /**
+ * Creates a PushConfig request with collected configuration.
+ * This should only be called after receiving a successful GetConfigProfileKeys response.
+ */
+ private PushConfigRequest.Builder createPushConfigRequest() {
+ log.debug("Collecting and preparing config push");
+
+ // Collect configs using ConfigCollector
+ List configs;
+ try {
+ configs = ConfigCollector.collectConfigs(
+ clientConfig,
+ requestedConfigKeys,
+ configMaxBytes
+ );
+ } catch (Exception e) {
+ log.error("Failed to collect configs for push", e);
+ state = State.FAILED;
+ return null;
+ }
+
+ // Build request
+ PushConfigRequestData requestData = new PushConfigRequestData()
+ .setConfigurationProfileCrc(configurationProfileCrc)
+ .setConfigs(configs);
+
+ return new PushConfigRequest.Builder(requestData);
+ }
+
+ /**
+ * Checks if we need to send a PushConfig request.
+ * This is true when we've received profile keys but haven't pushed yet.
+ */
+ synchronized boolean needsPushRequest() {
+ return state == State.PUSH_IN_PROGRESS &&
+ configurationProfileCrc != -1L &&
+ !requestedConfigKeys.isEmpty();
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 692847a8b1553..9d056503f51a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -23,7 +23,7 @@
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
-import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.NetworkReceive;
@@ -38,9 +38,11 @@
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.CorrelationIdMismatchException;
+import org.apache.kafka.common.requests.GetConfigProfileKeysResponse;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.PushConfigResponse;
import org.apache.kafka.common.requests.PushTelemetryResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
@@ -138,6 +140,7 @@ private enum State {
private final AtomicReference state;
private final TelemetrySender telemetrySender;
+ private final ConfigsSender configsSender;
public NetworkClient(Selectable selector,
Metadata metadata,
@@ -155,8 +158,9 @@ public NetworkClient(Selectable selector,
ApiVersions apiVersions,
LogContext logContext,
MetadataRecoveryStrategy metadataRecoveryStrategy) {
- this(selector,
+ this(null,
metadata,
+ selector,
clientId,
maxInFlightRequestsPerConnection,
reconnectBackoffMs,
@@ -169,7 +173,11 @@ public NetworkClient(Selectable selector,
time,
discoverBrokerVersions,
apiVersions,
+ null,
logContext,
+ new DefaultHostResolver(),
+ null,
+ null,
Long.MAX_VALUE,
metadataRecoveryStrategy);
}
@@ -210,6 +218,7 @@ public NetworkClient(Selectable selector,
logContext,
new DefaultHostResolver(),
null,
+ null,
rebootstrapTriggerMs,
metadataRecoveryStrategy);
}
@@ -250,6 +259,7 @@ public NetworkClient(Selectable selector,
logContext,
new DefaultHostResolver(),
null,
+ null,
Long.MAX_VALUE,
metadataRecoveryStrategy);
}
@@ -289,6 +299,7 @@ public NetworkClient(Selectable selector,
logContext,
new DefaultHostResolver(),
null,
+ null,
Long.MAX_VALUE,
metadataRecoveryStrategy);
}
@@ -312,6 +323,7 @@ public NetworkClient(MetadataUpdater metadataUpdater,
LogContext logContext,
HostResolver hostResolver,
ClientTelemetrySender clientTelemetrySender,
+ ClientConfigsSender clientConfigsSender,
long rebootstrapTriggerMs,
MetadataRecoveryStrategy metadataRecoveryStrategy) {
/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
@@ -344,6 +356,7 @@ public NetworkClient(MetadataUpdater metadataUpdater,
this.log = logContext.logger(NetworkClient.class);
this.state = new AtomicReference<>(State.ACTIVE);
this.telemetrySender = (clientTelemetrySender != null) ? new TelemetrySender(clientTelemetrySender) : null;
+ this.configsSender = (clientConfigsSender != null) ? new ConfigsSender(clientConfigsSender) : null;
this.rebootstrapTriggerMs = rebootstrapTriggerMs;
this.metadataRecoveryStrategy = metadataRecoveryStrategy;
}
@@ -430,6 +443,8 @@ private void cancelInFlightRequests(String nodeId,
metadataUpdater.handleFailedRequest(now, Optional.empty());
} else if (isTelemetryApi(request.header.apiKey()) && telemetrySender != null) {
telemetrySender.handleFailedRequest(request.header.apiKey(), null);
+ } else if (isConfigPushApi(request.header.apiKey()) && configsSender != null) {
+ configsSender.handleFailedRequest(request.header.apiKey(), null);
}
}
}
@@ -595,6 +610,8 @@ else if (clientRequest.apiKey() == ApiKeys.METADATA)
metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException));
else if (isTelemetryApi(clientRequest.apiKey()) && telemetrySender != null)
telemetrySender.handleFailedRequest(clientRequest.apiKey(), unsupportedVersionException);
+ else if (isConfigPushApi(clientRequest.apiKey()) && configsSender != null)
+ configsSender.handleFailedRequest(clientRequest.apiKey(), unsupportedVersionException);
}
}
@@ -1015,6 +1032,10 @@ else if (req.isInternalRequest && response instanceof GetTelemetrySubscriptionsR
telemetrySender.handleResponse((GetTelemetrySubscriptionsResponse) response);
else if (req.isInternalRequest && response instanceof PushTelemetryResponse)
telemetrySender.handleResponse((PushTelemetryResponse) response);
+ else if (req.isInternalRequest && response instanceof GetConfigProfileKeysResponse)
+ configsSender.handleResponse((GetConfigProfileKeysResponse) response);
+ else if (req.isInternalRequest && response instanceof PushConfigResponse)
+ configsSender.handleResponse((PushConfigResponse) response);
else
responses.add(req.completed(response, now));
}
@@ -1035,7 +1056,7 @@ private void handleApiVersionsResponse(List responses,
// If not provided, the client falls back to version 0.
short maxApiVersion = 0;
if (apiVersionsResponse.data().apiKeys().size() > 0) {
- ApiVersion apiVersion = apiVersionsResponse.data().apiKeys().find(ApiKeys.API_VERSIONS.id);
+ ApiVersionsResponseData.ApiVersion apiVersion = apiVersionsResponse.data().apiKeys().find(ApiKeys.API_VERSIONS.id);
if (apiVersion != null) {
maxApiVersion = apiVersion.maxVersion();
}
@@ -1050,6 +1071,7 @@ private void handleApiVersionsResponse(List responses,
apiVersionsResponse.data().finalizedFeatures(),
apiVersionsResponse.data().finalizedFeaturesEpoch());
apiVersions.update(node, nodeVersionInfo);
+
this.connectionStates.ready(node);
log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, API versions: {}.",
node, apiVersionsResponse.data().finalizedFeaturesEpoch(), apiVersionsResponse.data().finalizedFeatures(),
@@ -1171,6 +1193,10 @@ private boolean isTelemetryApi(ApiKeys apiKey) {
return apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY;
}
+ private boolean isConfigPushApi(ApiKeys apiKey) {
+ return apiKey == ApiKeys.GET_CONFIG_PROFILE_KEYS || apiKey == ApiKeys.PUSH_CONFIG;
+ }
+
class DefaultMetadataUpdater implements MetadataUpdater {
/* the current cluster metadata */
@@ -1468,6 +1494,40 @@ public void close() {
}
}
+ class ConfigsSender {
+
+ private final ClientConfigsSender clientConfigsSender;
+
+ public ConfigsSender(ClientConfigsSender clientConfigsSender) {
+ this.clientConfigsSender = clientConfigsSender;
+ }
+
+ public void handleResponse(GetConfigProfileKeysResponse response) {
+ clientConfigsSender.handleResponse(response);
+ }
+
+ public void handleResponse(PushConfigResponse response) {
+ clientConfigsSender.handleResponse(response);
+ }
+
+ public void handleFailedRequest(ApiKeys apiKey, KafkaException maybeFatalException) {
+ if (apiKey == ApiKeys.GET_CONFIG_PROFILE_KEYS)
+ clientConfigsSender.handleFailedGetConfigsSubscriptionRequest(maybeFatalException);
+ else if (apiKey == ApiKeys.PUSH_CONFIG)
+ clientConfigsSender.handleFailedPushConfigsRequest(maybeFatalException);
+ else
+ throw new IllegalStateException("Invalid api key for failed configs request");
+ }
+
+ public void close() {
+ try {
+ clientConfigsSender.close();
+ } catch (Exception exception) {
+ log.error("Failed to close client configs sender", exception);
+ }
+ }
+ }
+
@Override
public ClientRequest newClientRequest(String nodeId,
AbstractRequest.Builder> requestBuilder,
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 85f1e87459f01..5f8b57035d1c5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -563,7 +563,8 @@ static KafkaAdminClient createInternal(
metadataManager.updater(),
(hostResolver == null) ? new DefaultHostResolver() : hostResolver,
null,
- clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
+ clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
+ null);
return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient,
timeoutProcessorFactory, logContext, clientTelemetryReporter);
} catch (Throwable exc) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 7ad52c2122627..9ef0ec109d389 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -262,6 +262,12 @@ public class ConsumerConfig extends AbstractConfig {
public static final String ENABLE_METRICS_PUSH_CONFIG = CommonClientConfigs.ENABLE_METRICS_PUSH_CONFIG;
public static final String ENABLE_METRICS_PUSH_DOC = CommonClientConfigs.ENABLE_METRICS_PUSH_DOC;
+ /**
+ * enable.configs.push
+ */
+ public static final String ENABLE_CONFIGS_PUSH_CONFIG = CommonClientConfigs.ENABLE_CONFIGS_PUSH_CONFIG;
+ public static final String ENABLE_CONFIGS_PUSH_DOC = CommonClientConfigs.ENABLE_CONFIGS_PUSH_DOC;
+
/**
* retry.backoff.max.ms
*/
@@ -701,6 +707,11 @@ public class ConsumerConfig extends AbstractConfig {
new ShareAcquireMode.Validator(),
Importance.MEDIUM,
ConsumerConfig.SHARE_ACQUIRE_MODE_DOC)
+ .define(ENABLE_CONFIGS_PUSH_CONFIG,
+ Type.BOOLEAN,
+ true,
+ Importance.LOW,
+ ENABLE_CONFIGS_PUSH_DOC)
.define(CONFIG_PROVIDERS_CONFIG,
ConfigDef.Type.LIST,
List.of(),
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 644613a8dee45..c0754e4cf2831 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientConfigsSender;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
@@ -324,6 +325,7 @@ private StreamsRebalanceListenerInvoker streamsRebalanceListenerInvoker() {
private volatile boolean closed = false;
// Init value is needed to avoid NPE in case of exception raised in the constructor
private Optional clientTelemetryReporter = Optional.empty();
+ private Optional clientConfigsSender = Optional.empty();
private final PositionsValidator positionsValidator;
private AsyncPollEvent inflightPoll;
@@ -399,6 +401,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
List reporters = CommonClientConfigs.metricsReporters(clientId, config);
this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
this.clientTelemetryReporter.ifPresent(reporters::add);
+ this.clientConfigsSender = CommonClientConfigs.configsSender(config);
this.metrics = createMetrics(config, time, reporters);
this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, CONSUMER_METRIC_GROUP);
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics);
@@ -439,6 +442,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
metrics,
fetchMetricsManager.throttleTimeSensor(),
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
+ clientConfigsSender.orElse(null),
backgroundEventHandler,
false,
asyncConsumerMetrics
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 5b54a759a9844..eeeb18d8ad4cc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientConfigsSender;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
@@ -143,6 +144,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate {
private final List assignors;
// Init value is needed to avoid NPE in case of exception raised in the constructor
private Optional clientTelemetryReporter = Optional.empty();
+ private Optional clientConfigsSender = Optional.empty();
// currentThread holds the threadId of the current thread accessing this Consumer
// and is used to prevent multi-threaded access
@@ -175,6 +177,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate {
List reporters = CommonClientConfigs.metricsReporters(clientId, config);
this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
this.clientTelemetryReporter.ifPresent(reporters::add);
+ this.clientConfigsSender = CommonClientConfigs.configsSender(config);
this.metrics = createMetrics(config, time, reporters);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
@@ -204,7 +207,8 @@ public class ClassicKafkaConsumer implements ConsumerDelegate {
metadata,
fetchMetricsManager.throttleTimeSensor(),
retryBackoffMs,
- clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
+ clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
+ clientConfigsSender.orElse(null));
this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
index 2f715e206cc07..f6d2b8c88003d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientConfigsSender;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
@@ -87,7 +88,8 @@ public static ConsumerNetworkClient createConsumerNetworkClient(ConsumerConfig c
Metadata metadata,
Sensor throttleTimeSensor,
long retryBackoffMs,
- ClientTelemetrySender clientTelemetrySender) {
+ ClientTelemetrySender clientTelemetrySender,
+ ClientConfigsSender clientConfigsSender) {
NetworkClient netClient = ClientUtils.createNetworkClient(config,
metrics,
CONSUMER_METRIC_GROUP_PREFIX,
@@ -97,7 +99,8 @@ public static ConsumerNetworkClient createConsumerNetworkClient(ConsumerConfig c
CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION,
metadata,
throttleTimeSensor,
- clientTelemetrySender);
+ clientTelemetrySender,
+ clientConfigsSender);
// Will avoid blocking an extended period of time to prevent heartbeat thread starvation
int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index 42c1c73acb565..42d11e52c0df3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientConfigsSender;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ClientUtils;
@@ -469,6 +470,7 @@ public static Supplier supplier(final Time time,
final Metrics metrics,
final Sensor throttleTimeSensor,
final ClientTelemetrySender clientTelemetrySender,
+ final ClientConfigsSender clientConfigsSender,
final BackgroundEventHandler backgroundEventHandler,
final boolean notifyMetadataErrorsViaErrorQueue,
final AsyncConsumerMetrics asyncConsumerMetrics) {
@@ -484,7 +486,8 @@ protected NetworkClientDelegate create() {
CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION,
metadata,
throttleTimeSensor,
- clientTelemetrySender);
+ clientTelemetrySender,
+ clientConfigsSender);
return new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, asyncConsumerMetrics);
}
};
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 3a7eb81df6062..457c95d9dcf9d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -291,6 +291,7 @@ private void process(final ErrorEvent event) {
metrics,
shareFetchMetricsManager.throttleTimeSensor(),
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
+ null,
backgroundEventHandler,
true,
asyncConsumerMetrics
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 31a9f6b945c61..a1a30b34847e0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -536,7 +536,8 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadat
maxInflightRequests,
metadata,
throttleTimeSensor,
- clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
+ clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
+ null);
short acks = Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG));
return new Sender(logContext,
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ClientConfigPolicyException.java b/clients/src/main/java/org/apache/kafka/common/errors/ClientConfigPolicyException.java
new file mode 100644
index 0000000000000..40b42b5244a45
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ClientConfigPolicyException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Exception thrown when a client configuration policy violation occurs.
+ *
+ * This is a generic exception for ClientConfigPolicy implementations to throw when
+ * client configurations fail validation or enforcement rules.
+ */
+public class ClientConfigPolicyException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public ClientConfigPolicyException(String message) {
+ super(message);
+ }
+
+ public ClientConfigPolicyException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ConfigTooLargeException.java b/clients/src/main/java/org/apache/kafka/common/errors/ConfigTooLargeException.java
new file mode 100644
index 0000000000000..3982eaf95d9a4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ConfigTooLargeException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Exception thrown when a client configuration payload exceeds the broker's ConfigMaxBytes limit.
+ */
+public class ConfigTooLargeException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public ConfigTooLargeException(String message) {
+ super(message);
+ }
+
+ public ConfigTooLargeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownConfigProfileException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownConfigProfileException.java
new file mode 100644
index 0000000000000..bbdc6e667ac86
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownConfigProfileException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Exception thrown when a client's configuration profile is unknown or not supported by the broker.
+ *
+ * This can occur in two scenarios:
+ *
+ * - During GetConfigProfileKeys: The client profile did not match any configuration profiles
+ * and the policy implementation does not allow for that case. This is a fatal error.
+ * - During PushConfig: The client sent a request with an invalid or outdated configuration profile CRC,
+ * which means the configuration profile has changed. The client should retry the handshake.
+ *
+ */
+public class UnknownConfigProfileException extends RetriableException {
+
+ private static final long serialVersionUID = 1L;
+
+ public UnknownConfigProfileException(String message) {
+ super(message);
+ }
+
+ public UnknownConfigProfileException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ClientInformation.java b/clients/src/main/java/org/apache/kafka/common/network/ClientInformation.java
index cb99a8669e40f..de9feff496da2 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ClientInformation.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ClientInformation.java
@@ -21,14 +21,16 @@
public class ClientInformation {
public static final String UNKNOWN_NAME_OR_VERSION = "unknown";
- public static final ClientInformation EMPTY = new ClientInformation(UNKNOWN_NAME_OR_VERSION, UNKNOWN_NAME_OR_VERSION);
+ public static final ClientInformation EMPTY = new ClientInformation(UNKNOWN_NAME_OR_VERSION, UNKNOWN_NAME_OR_VERSION, UNKNOWN_NAME_OR_VERSION);
private final String softwareName;
private final String softwareVersion;
+ private final String softwareRole;
- public ClientInformation(String softwareName, String softwareVersion) {
+ public ClientInformation(String softwareName, String softwareVersion, String softwareRole) {
this.softwareName = softwareName.isEmpty() ? UNKNOWN_NAME_OR_VERSION : softwareName;
this.softwareVersion = softwareVersion.isEmpty() ? UNKNOWN_NAME_OR_VERSION : softwareVersion;
+ this.softwareRole = (softwareRole == null || softwareRole.isEmpty()) ? UNKNOWN_NAME_OR_VERSION : softwareRole;
}
public String softwareName() {
@@ -39,15 +41,20 @@ public String softwareVersion() {
return this.softwareVersion;
}
+ public String softwareRole() {
+ return this.softwareRole;
+ }
+
@Override
public String toString() {
return "ClientInformation(softwareName=" + softwareName +
- ", softwareVersion=" + softwareVersion + ")";
+ ", softwareVersion=" + softwareVersion +
+ ", softwareRole=" + softwareRole + ")";
}
@Override
public int hashCode() {
- return Objects.hash(softwareName, softwareVersion);
+ return Objects.hash(softwareName, softwareVersion, softwareRole);
}
@Override
@@ -60,6 +67,7 @@ public boolean equals(Object o) {
}
ClientInformation other = (ClientInformation) o;
return other.softwareName.equals(softwareName) &&
- other.softwareVersion.equals(softwareVersion);
+ other.softwareVersion.equals(softwareVersion) &&
+ other.softwareRole.equals(softwareRole);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 79b283b4f8d89..50e0b2e409a60 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -137,7 +137,9 @@ public enum ApiKeys {
STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE),
DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS),
ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS),
- DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS);
+ DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS),
+ GET_CONFIG_PROFILE_KEYS(ApiMessageType.GET_CONFIG_PROFILE_KEYS),
+ PUSH_CONFIG(ApiMessageType.PUSH_CONFIG);
private static final Map> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index a27a7fcf23c77..5232c05810619 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -22,6 +22,7 @@
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.ConcurrentTransactionsException;
+import org.apache.kafka.common.errors.ConfigTooLargeException;
import org.apache.kafka.common.errors.ControllerMovedException;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
@@ -134,6 +135,7 @@
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
import org.apache.kafka.common.errors.UnacceptableCredentialException;
+import org.apache.kafka.common.errors.UnknownConfigProfileException;
import org.apache.kafka.common.errors.UnknownControllerIdException;
import org.apache.kafka.common.errors.UnknownLeaderEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -418,7 +420,9 @@ public enum Errors {
STREAMS_INVALID_TOPOLOGY(130, "The supplied topology is invalid.", StreamsInvalidTopologyException::new),
STREAMS_INVALID_TOPOLOGY_EPOCH(131, "The supplied topology epoch is invalid.", StreamsInvalidTopologyEpochException::new),
STREAMS_TOPOLOGY_FENCED(132, "The supplied topology epoch is outdated.", StreamsTopologyFencedException::new),
- SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been reached.", ShareSessionLimitReachedException::new);
+ SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been reached.", ShareSessionLimitReachedException::new),
+ CONFIG_TOO_LARGE(134, "Configuration payload exceeds broker's ConfigMaxBytes limit.", ConfigTooLargeException::new),
+ UNKNOWN_CONFIG_PROFILE(135, "Client configuration profile is unknown or not supported.", UnknownConfigProfileException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index bc313078d7424..7a89110e60606 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -291,6 +291,10 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, Readable readable,
return AlterShareGroupOffsetsResponse.parse(readable, version);
case DELETE_SHARE_GROUP_OFFSETS:
return DeleteShareGroupOffsetsResponse.parse(readable, version);
+ case GET_CONFIG_PROFILE_KEYS:
+ return GetConfigProfileKeysResponse.parse(readable, version);
+ case PUSH_CONFIG:
+ return PushConfigResponse.parse(readable, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
index 1bdb0903c7d7d..4b9c4915997d6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -95,8 +95,9 @@ public boolean hasUnsupportedRequestVersion() {
public boolean isValid() {
if (version() >= 3) {
- return SOFTWARE_NAME_VERSION_PATTERN.matcher(data.clientSoftwareName()).matches() &&
- SOFTWARE_NAME_VERSION_PATTERN.matcher(data.clientSoftwareVersion()).matches();
+ boolean nameValid = SOFTWARE_NAME_VERSION_PATTERN.matcher(data.clientSoftwareName()).matches();
+ boolean versionValid = SOFTWARE_NAME_VERSION_PATTERN.matcher(data.clientSoftwareVersion()).matches();
+ return nameValid && versionValid;
} else {
return true;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GetConfigProfileKeysRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GetConfigProfileKeysRequest.java
new file mode 100644
index 0000000000000..3d3ae6dd19aa4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GetConfigProfileKeysRequest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.GetConfigProfileKeysRequestData;
+import org.apache.kafka.common.message.GetConfigProfileKeysResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.Readable;
+
+public class GetConfigProfileKeysRequest extends AbstractRequest {
+
+ public static class Builder extends AbstractRequest.Builder {
+ private final GetConfigProfileKeysRequestData data;
+
+ public Builder(GetConfigProfileKeysRequestData data) {
+ this(data, false);
+ }
+
+ public Builder(GetConfigProfileKeysRequestData data, boolean enableUnstableLastVersion) {
+ super(ApiKeys.GET_CONFIG_PROFILE_KEYS, enableUnstableLastVersion);
+ this.data = data;
+ }
+
+ @Override
+ public GetConfigProfileKeysRequest build(short version) {
+ return new GetConfigProfileKeysRequest(data, version);
+ }
+
+ @Override
+ public String toString() {
+ return data.toString();
+ }
+ }
+
+ private final GetConfigProfileKeysRequestData data;
+
+ public GetConfigProfileKeysRequest(GetConfigProfileKeysRequestData data, short version) {
+ super(ApiKeys.GET_CONFIG_PROFILE_KEYS, version);
+ this.data = data;
+ }
+
+ @Override
+ public GetConfigProfileKeysResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ GetConfigProfileKeysResponseData responseData = new GetConfigProfileKeysResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setErrorCode(Errors.forException(e).code())
+ .setErrorMessage(e.getMessage());
+ return new GetConfigProfileKeysResponse(responseData);
+ }
+
+ @Override
+ public GetConfigProfileKeysRequestData data() {
+ return data;
+ }
+
+ public static GetConfigProfileKeysRequest parse(Readable readable, short version) {
+ return new GetConfigProfileKeysRequest(new GetConfigProfileKeysRequestData(readable, version), version);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GetConfigProfileKeysResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GetConfigProfileKeysResponse.java
new file mode 100644
index 0000000000000..e08a0b964e925
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GetConfigProfileKeysResponse.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.GetConfigProfileKeysResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.Readable;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+/**
+ * Possible error codes:
+ * - {@link Errors#UNKNOWN_CONFIG_PROFILE}
+ * - {@link Errors#UNSUPPORTED_VERSION}
+ * - {@link Errors#INVALID_REQUEST}
+ */
+public class GetConfigProfileKeysResponse extends AbstractResponse {
+ private final GetConfigProfileKeysResponseData data;
+
+ public GetConfigProfileKeysResponse(GetConfigProfileKeysResponseData data) {
+ super(ApiKeys.GET_CONFIG_PROFILE_KEYS);
+ this.data = data;
+ }
+
+ @Override
+ public GetConfigProfileKeysResponseData data() {
+ return data;
+ }
+
+ @Override
+ public Map errorCounts() {
+ Map counts = new EnumMap<>(Errors.class);
+ updateErrorCounts(counts, Errors.forCode(data.errorCode()));
+ return counts;
+ }
+
+ @Override
+ public int throttleTimeMs() {
+ return data.throttleTimeMs();
+ }
+
+ @Override
+ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+ data.setThrottleTimeMs(throttleTimeMs);
+ }
+
+ public boolean hasError() {
+ return error() != Errors.NONE;
+ }
+
+ public Errors error() {
+ return Errors.forCode(data.errorCode());
+ }
+
+ public static GetConfigProfileKeysResponse parse(Readable readable, short version) {
+ return new GetConfigProfileKeysResponse(new GetConfigProfileKeysResponseData(readable, version));
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/PushConfigRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/PushConfigRequest.java
new file mode 100644
index 0000000000000..d060765949ca6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/PushConfigRequest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.PushConfigRequestData;
+import org.apache.kafka.common.message.PushConfigResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.Readable;
+
+public class PushConfigRequest extends AbstractRequest {
+
+ public static class Builder extends AbstractRequest.Builder {
+ private final PushConfigRequestData data;
+
+ public Builder(PushConfigRequestData data) {
+ this(data, false);
+ }
+
+ public Builder(PushConfigRequestData data, boolean enableUnstableLastVersion) {
+ super(ApiKeys.PUSH_CONFIG, enableUnstableLastVersion);
+ this.data = data;
+ }
+
+ @Override
+ public PushConfigRequest build(short version) {
+ return new PushConfigRequest(data, version);
+ }
+
+ @Override
+ public String toString() {
+ return data.toString();
+ }
+ }
+
+ private final PushConfigRequestData data;
+
+ public PushConfigRequest(PushConfigRequestData data, short version) {
+ super(ApiKeys.PUSH_CONFIG, version);
+ this.data = data;
+ }
+
+ @Override
+ public PushConfigResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ PushConfigResponseData responseData = new PushConfigResponseData()
+ .setErrorCode(Errors.forException(e).code())
+ .setThrottleTimeMs(throttleTimeMs);
+ return new PushConfigResponse(responseData);
+ }
+
+ @Override
+ public PushConfigRequestData data() {
+ return data;
+ }
+
+ public static PushConfigRequest parse(Readable readable, short version) {
+ return new PushConfigRequest(new PushConfigRequestData(readable, version), version);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/PushConfigResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/PushConfigResponse.java
new file mode 100644
index 0000000000000..b1b97f19f4d87
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/PushConfigResponse.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.PushConfigResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.Readable;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+/**
+ * Possible error codes:
+ * - {@link Errors#CONFIG_TOO_LARGE}
+ * - {@link Errors#INVALID_CONFIG}
+ * - {@link Errors#CLIENT_CONFIG_POLICY_EXCEPTION}
+ * - {@link Errors#UNKNOWN_CONFIG_PROFILE}
+ * - {@link Errors#UNSUPPORTED_VERSION}
+ * - {@link Errors#INVALID_REQUEST}
+ */
+public class PushConfigResponse extends AbstractResponse {
+ private final PushConfigResponseData data;
+
+ public PushConfigResponse(PushConfigResponseData data) {
+ super(ApiKeys.PUSH_CONFIG);
+ this.data = data;
+ }
+
+ @Override
+ public PushConfigResponseData data() {
+ return data;
+ }
+
+ @Override
+ public Map errorCounts() {
+ Map counts = new EnumMap<>(Errors.class);
+ updateErrorCounts(counts, Errors.forCode(data.errorCode()));
+ return counts;
+ }
+
+ @Override
+ public int throttleTimeMs() {
+ return data.throttleTimeMs();
+ }
+
+ @Override
+ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+ data.setThrottleTimeMs(throttleTimeMs);
+ }
+
+ public boolean hasError() {
+ return error() != Errors.NONE;
+ }
+
+ public Errors error() {
+ return Errors.forCode(data.errorCode());
+ }
+
+ public static PushConfigResponse parse(Readable readable, short version) {
+ return new PushConfigResponse(new PushConfigResponseData(readable, version));
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index b84b5dc2abc94..5b87f2c25632a 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -579,7 +579,8 @@ else if (!apiVersionsRequest.isValid())
sendKafkaResponse(context, apiVersionsRequest.getErrorResponse(0, Errors.INVALID_REQUEST.exception()));
else {
metadataRegistry.registerClientInformation(new ClientInformation(apiVersionsRequest.data().clientSoftwareName(),
- apiVersionsRequest.data().clientSoftwareVersion()));
+ apiVersionsRequest.data().clientSoftwareVersion(),
+ null));
sendKafkaResponse(context, apiVersionSupplier.apply(apiVersionsRequest.version()));
setSaslState(SaslState.HANDSHAKE_REQUEST);
}
diff --git a/clients/src/main/java/org/apache/kafka/server/policy/ClientConfigPolicy.java b/clients/src/main/java/org/apache/kafka/server/policy/ClientConfigPolicy.java
new file mode 100644
index 0000000000000..2717d8c4aaa78
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/server/policy/ClientConfigPolicy.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.policy;
+
+import org.apache.kafka.common.Reconfigurable;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ClientConfigPolicyException;
+import org.apache.kafka.common.errors.ConfigTooLargeException;
+import org.apache.kafka.common.errors.UnknownConfigProfileException;
+import org.apache.kafka.common.utils.Crc32C;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.SortedSet;
+import java.util.stream.Collectors;
+
+/**
+ * An interface for intercepting and enforcing client configuration.
+ *
+ * Common use cases include:
+ *
+ * - Selecting which configuration keys to request from clients based on their profile
+ * - Validating that client configurations match expected values or fall within acceptable ranges
+ * - Storing client configuration snapshots for observability and troubleshooting
+ *
+ *
+ * If client.config.policy.class.name is defined, Kafka will create an instance of the specified class
+ * using the default constructor and will then pass the broker configs to its configure() method.
+ * During broker shutdown, the close() method will be invoked so that resources can be released (if
+ * necessary).
+ */
+@InterfaceStability.Evolving
+public interface ClientConfigPolicy extends Reconfigurable, AutoCloseable {
+
+ /**
+ * Computes the CRC that serves to identify a configuration profile. The
+ * value is generated by calculating a CRC32C of the configuration set.
+ *
+ * This default implementation sorts the keys for deterministic ordering and computes
+ * a CRC32C checksum. Implementations may override this method if a different CRC
+ * calculation is needed.
+ *
+ * @param configKeys The set of configuration keys in the profile
+ * @return CRC32C checksum of the configuration keys
+ */
+ default long configurationProfileCrc(SortedSet configKeys) {
+ // Sort keys for deterministic ordering
+ String keysString = configKeys.stream()
+ .collect(Collectors.joining(","));
+ byte[] keysBytes = keysString.getBytes(StandardCharsets.UTF_8);
+ return Crc32C.compute(keysBytes, 0, keysBytes.length);
+ }
+
+ /**
+ * Select the configuration profile keys for a given client profile.
+ *
+ * How much of the client profile must match a configuration profile is left up to the
+ * implementation. It is not a requirement that all of the client profile (name, version, and
+ * any optional metadata) match exactly. For example, if the configuration of the client doesn't
+ * change between minor versions, there's no need to provide a distinct configuration profile
+ * for each minor client profile difference.
+ *
+ * If this method returns {@code Optional.empty()}, the client will not send a PushConfig request.
+ *
+ * @param clientProfile The client profile information from the request context
+ * @return Optional containing the configuration profile keys and CRC, or empty if no profile matches
+ * @throws UnknownConfigProfileException if the client profile did not match any configuration profiles
+ * and the policy implementation does not allow for that case
+ */
+ Optional profileKeys(ClientProfile clientProfile)
+ throws UnknownConfigProfileException;
+
+ /**
+ * Receive the client configuration data for observability, enforcement, etc.
+ *
+ * This method should avoid blocking. Implementations typically:
+ *
+ * - Validate configuration values against expected ranges or patterns
+ * - Store configuration snapshots in external storage for auditing
+ * - Track configuration changes over time
+ *
+ *
+ * Note: this method will not be invoked if the {@code Configs} array
+ * of the PushConfig request was larger than {@code client.config.max.bytes}.
+ *
+ * @param pushConfigData The client profile and configuration data from the PushConfig request
+ * @throws UnknownConfigProfileException if the configuration profile CRC is invalid or outdated
+ * @throws ConfigTooLargeException if the configuration data exceeds size limits
+ * @throws ClientConfigPolicyException if the configurations violate the policy
+ */
+ void process(ClientPushConfigData pushConfigData)
+ throws UnknownConfigProfileException, ConfigTooLargeException, ClientConfigPolicyException;
+
+ /**
+ * Close this policy instance. Default implementation is a no-op.
+ */
+ @Override
+ default void close() throws Exception {
+ // Default no-op
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/server/policy/ClientConfigProfileKeys.java b/clients/src/main/java/org/apache/kafka/server/policy/ClientConfigProfileKeys.java
new file mode 100644
index 0000000000000..d2d76a6c388be
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/server/policy/ClientConfigProfileKeys.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.policy;
+
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * Immutable class containing the configuration profile keys.
+ *
+ * A configuration profile defines the set of configuration keys to capture. The ClientConfigPolicy
+ * uses the information in the client profile to select the most appropriate configuration profile.
+ *
+ * The CRC is generated by calculating a CRC32C of the configuration keys and is used to detect
+ * changes to the configuration profile between GetConfigProfileKeys and PushConfig requests.
+ */
+public final class ClientConfigProfileKeys {
+ private final SortedSet keys;
+ private final long crc;
+
+ /**
+ * Creates a new ClientConfigProfileKeys.
+ *
+ * @param keys The set of configuration keys the client should provide (sorted for deterministic ordering)
+ * @param crc CRC32C checksum of the configuration keys for profile change detection
+ */
+ public ClientConfigProfileKeys(SortedSet keys, long crc) {
+ this.keys = keys;
+ this.crc = crc;
+ }
+
+ public SortedSet keys() {
+ return keys;
+ }
+
+ public long crc() {
+ return crc;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ClientConfigProfileKeys that = (ClientConfigProfileKeys) o;
+ return crc == that.crc && Objects.equals(keys, that.keys);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(keys, crc);
+ }
+
+ @Override
+ public String toString() {
+ return "ClientConfigProfileKeys{" +
+ "keys=" + keys +
+ ", crc=" + crc +
+ '}';
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/server/policy/ClientProfile.java b/clients/src/main/java/org/apache/kafka/server/policy/ClientProfile.java
new file mode 100644
index 0000000000000..528fea90bc1ac
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/server/policy/ClientProfile.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.policy;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Objects;
+import java.util.SortedMap;
+
+/**
+ * Immutable class containing the client profile from the RequestContext.
+ *
+ * A client profile is made up of the tuple of ClientSoftwareName, ClientSoftwareVersion,
+ * ClientInstanceId, and ClientMetadata. The client profile provides the ClientConfigPolicy
+ * implementation with a detailed view of the client in use, allowing it to distinguish between
+ * different client types (e.g., librdkafka 2.12.0 vs Apache Kafka Java 4.4.0 Producer).
+ */
+public final class ClientProfile {
+ private final Uuid clientInstanceId;
+ private final String clientSoftwareName;
+ private final String clientSoftwareVersion;
+ private final SortedMap clientMetadata;
+
+ /**
+ * Creates a new ClientProfile.
+ *
+ * @param clientInstanceId Unique identifier for this client instance (UUID v4)
+ * @param clientSoftwareName The name of the client software (e.g., "apache-kafka-java")
+ * @param clientSoftwareVersion The version of the client software (e.g., "3.8.0")
+ * @param clientMetadata Optional metadata as key-value pairs for additional client context
+ */
+ public ClientProfile(
+ Uuid clientInstanceId,
+ String clientSoftwareName,
+ String clientSoftwareVersion,
+ SortedMap clientMetadata
+ ) {
+ this.clientInstanceId = clientInstanceId;
+ this.clientSoftwareName = clientSoftwareName;
+ this.clientSoftwareVersion = clientSoftwareVersion;
+ this.clientMetadata = clientMetadata;
+ }
+
+ public Uuid clientInstanceId() {
+ return clientInstanceId;
+ }
+
+ public String clientSoftwareName() {
+ return clientSoftwareName;
+ }
+
+ public String clientSoftwareVersion() {
+ return clientSoftwareVersion;
+ }
+
+ public SortedMap clientMetadata() {
+ return clientMetadata;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ClientProfile that = (ClientProfile) o;
+ return Objects.equals(clientInstanceId, that.clientInstanceId) &&
+ Objects.equals(clientSoftwareName, that.clientSoftwareName) &&
+ Objects.equals(clientSoftwareVersion, that.clientSoftwareVersion) &&
+ Objects.equals(clientMetadata, that.clientMetadata);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(clientInstanceId, clientSoftwareName, clientSoftwareVersion, clientMetadata);
+ }
+
+ @Override
+ public String toString() {
+ return "ClientProfile{" +
+ "clientInstanceId=" + clientInstanceId +
+ ", clientSoftwareName='" + clientSoftwareName + '\'' +
+ ", clientSoftwareVersion='" + clientSoftwareVersion + '\'' +
+ ", clientMetadata=" + clientMetadata +
+ '}';
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/server/policy/ClientPushConfigData.java b/clients/src/main/java/org/apache/kafka/server/policy/ClientPushConfigData.java
new file mode 100644
index 0000000000000..39806a995fe2d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/server/policy/ClientPushConfigData.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.policy;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Immutable class containing the PushConfig API data.
+ *
+ * The client profile and configuration values come directly from the RPC.
+ * The broker supplies its current timestamp (UTC) for when the request was received.
+ */
+public final class ClientPushConfigData {
+ private final ClientProfile clientProfile;
+ private final Map configs;
+ private final long timestamp;
+
+ /**
+ * Creates a new ClientPushConfigData.
+ *
+ * @param clientProfile The client profile information from the request context
+ * @param configs The configuration key-value pairs pushed by the client
+ * @param timestamp UTC timestamp (milliseconds) when the broker received the request
+ */
+ public ClientPushConfigData(
+ ClientProfile clientProfile,
+ Map configs,
+ long timestamp
+ ) {
+ this.clientProfile = clientProfile;
+ this.configs = configs;
+ this.timestamp = timestamp;
+ }
+
+ public ClientProfile clientProfile() {
+ return clientProfile;
+ }
+
+ public Map configs() {
+ return configs;
+ }
+
+ public long timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ClientPushConfigData that = (ClientPushConfigData) o;
+ return timestamp == that.timestamp &&
+ Objects.equals(clientProfile, that.clientProfile) &&
+ Objects.equals(configs, that.configs);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(clientProfile, configs, timestamp);
+ }
+
+ @Override
+ public String toString() {
+ return "ClientPushConfigData{" +
+ "clientProfile=" + clientProfile +
+ ", configs=" + configs +
+ ", timestamp=" + timestamp +
+ '}';
+ }
+}
diff --git a/clients/src/main/resources/common/message/ApiVersionsRequest.json b/clients/src/main/resources/common/message/ApiVersionsRequest.json
index 56170c9667350..489a6c1cf953e 100644
--- a/clients/src/main/resources/common/message/ApiVersionsRequest.json
+++ b/clients/src/main/resources/common/message/ApiVersionsRequest.json
@@ -23,12 +23,25 @@
// Version 3 is the first flexible version and adds ClientSoftwareName and ClientSoftwareVersion.
//
// Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion in the response from being 0.
- "validVersions": "0-4",
+ //
+ // Version 5 adds ClientInstanceId and ClientMetadata for KIP-CFG.
+ "validVersions": "0-5",
"flexibleVersions": "3+",
"fields": [
{ "name": "ClientSoftwareName", "type": "string", "versions": "3+",
"ignorable": true, "about": "The name of the client." },
{ "name": "ClientSoftwareVersion", "type": "string", "versions": "3+",
- "ignorable": true, "about": "The version of the client." }
+ "ignorable": true, "about": "The version of the client." },
+ { "name": "ClientInstanceId", "type": "uuid", "versions": "0+",
+ "ignorable": false, "about": "Unique ID for this client instance, must be set to 0 on the first request." },
+ { "name": "ClientMetadata", "type": "[]ClientMetadataEntry", "versions": "5+",
+ "ignorable": true, "about": "Client metadata as key-value pairs.",
+ "fields": [
+ { "name": "Key", "type": "string", "versions": "5+",
+ "about": "Client metadata key." },
+ { "name": "Value", "type": "string", "versions": "5+",
+ "about": "Client metadata value." }
+ ]
+ }
]
}
diff --git a/clients/src/main/resources/common/message/GetConfigProfileKeysRequest.json b/clients/src/main/resources/common/message/GetConfigProfileKeysRequest.json
new file mode 100644
index 0000000000000..931cc290b23b7
--- /dev/null
+++ b/clients/src/main/resources/common/message/GetConfigProfileKeysRequest.json
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 93,
+ "type": "request",
+ "listeners": ["broker"],
+ "name": "GetConfigProfileKeysRequest",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": []
+}
diff --git a/clients/src/main/resources/common/message/GetConfigProfileKeysResponse.json b/clients/src/main/resources/common/message/GetConfigProfileKeysResponse.json
new file mode 100644
index 0000000000000..8958b4bc8e608
--- /dev/null
+++ b/clients/src/main/resources/common/message/GetConfigProfileKeysResponse.json
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 93,
+ "type": "response",
+ "name": "GetConfigProfileKeysResponse",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ {
+ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+ "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."
+ },
+ {
+ "name": "ErrorCode", "type": "int16", "versions": "0+",
+ "about": "The top-level error code."
+ },
+ {
+ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
+ "about": "The top-level error message, or null if there was no error."
+ },
+ {
+ "name": "ConfigurationProfileCrc", "type": "int64", "versions": "0+",
+ "about": "CRC for the current configuration profile."
+ },
+ {
+ "name": "ConfigMaxBytes", "type": "int32", "versions": "0+",
+ "about": "The maximum number of bytes for ConfigKeys in its serialized form, as specified by client.config.max.bytes."
+ },
+ {
+ "name": "ConfigKeys", "type": "[]string", "versions": "0+",
+ "about": "The client configuration keys the server wants the client to send."
+ }
+ ]
+}
\ No newline at end of file
diff --git a/clients/src/main/resources/common/message/PushConfigRequest.json b/clients/src/main/resources/common/message/PushConfigRequest.json
new file mode 100644
index 0000000000000..4c5fc833c750a
--- /dev/null
+++ b/clients/src/main/resources/common/message/PushConfigRequest.json
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 94,
+ "type": "request",
+ "listeners": ["broker"],
+ "name": "PushConfigRequest",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ {
+ "name": "ConfigurationProfileCrc", "type": "int64", "versions": "0+",
+ "about": "CRC for the current configuration profile."
+ },
+ {
+ "name": "Configs", "type": "[]Config", "versions": "0+",
+ "about": "The client configuration entries.",
+ "fields": [
+ {
+ "name": "ConfigKey", "type": "string", "versions": "0+",
+ "about": "The configuration key."
+ },
+ {
+ "name": "ConfigValue", "type": "string", "versions": "0+",
+ "about": "The configuration value."
+ },
+ {
+ "name": "ConfigType", "type": "int16", "versions": "0+",
+ "about": "Type (from ConfigDef.Type) of the ConfigValue field. Maps to ConfigDef.Type ordinal: 0=BOOLEAN, 1=STRING, 2=INT, 3=SHORT, 4=LONG, 5=DOUBLE, 6=LIST. CLASS(7) and PASSWORD(8) types are intentionally excluded."
+ }
+ ]
+ }
+ ]
+}
diff --git a/clients/src/main/resources/common/message/PushConfigResponse.json b/clients/src/main/resources/common/message/PushConfigResponse.json
new file mode 100644
index 0000000000000..11f4207ed210f
--- /dev/null
+++ b/clients/src/main/resources/common/message/PushConfigResponse.json
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 94,
+ "type": "response",
+ "name": "PushConfigResponse",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ {
+ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+ "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."
+ },
+ {
+ "name": "ErrorCode", "type": "int16", "versions": "0+",
+ "about": "The error code, or 0 if there was no error."
+ },
+ {
+ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
+ "about": "The top-level error message, or null if there was no error."
+ }
+ ]
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index e8dcf5843dcb8..8927d83f48ab2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -1151,7 +1151,7 @@ public void testReconnectAfterAddressChange() {
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
- time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender,
+ time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender, null,
Long.MAX_VALUE, MetadataRecoveryStrategy.NONE);
// Connect to one the initial addresses, then change the addresses and disconnect
@@ -1212,7 +1212,7 @@ public void testFailedConnectionToFirstAddress() {
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
- time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender,
+ time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender, null,
Long.MAX_VALUE, MetadataRecoveryStrategy.NONE);
// First connection attempt should fail
@@ -1265,7 +1265,7 @@ public void testFailedConnectionToFirstAddressAfterReconnect() {
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
- time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender,
+ time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender, null,
Long.MAX_VALUE, MetadataRecoveryStrategy.NONE);
// Connect to one the initial addresses, then change the addresses and disconnect
@@ -1374,7 +1374,7 @@ public void testTelemetryRequest() {
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
- time, true, new ApiVersions(), null, new LogContext(), new DefaultHostResolver(), mockClientTelemetrySender,
+ time, true, new ApiVersions(), null, new LogContext(), new DefaultHostResolver(), mockClientTelemetrySender, null,
Long.MAX_VALUE, MetadataRecoveryStrategy.NONE);
// Send the ApiVersionsRequest
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 347f76135866d..865f4e6a6ca84 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -865,7 +865,7 @@ public void testConnectionsByClientMetric() throws Exception {
selector.channel(node).channelMetadataRegistry().clientInformation());
// Metric with unknown / unknown should not be there, metric with A / B should be there
- ClientInformation clientInformation = new ClientInformation("A", "B");
+ ClientInformation clientInformation = new ClientInformation("A", "B", null);
selector.channel(node).channelMetadataRegistry()
.registerClientInformation(clientInformation);
assertEquals(clientInformation,
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index ebc0f990ce1fe..0fd164ec3b272 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -1031,7 +1031,8 @@ private[kafka] class Processor(
if (apiVersionsRequest.isValid) {
channel.channelMetadataRegistry.registerClientInformation(new ClientInformation(
apiVersionsRequest.data.clientSoftwareName,
- apiVersionsRequest.data.clientSoftwareVersion))
+ apiVersionsRequest.data.clientSoftwareVersion,
+ null))
}
}
requestChannel.sendRequest(req)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0065986438470..e345797fb874c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -64,6 +64,8 @@ import org.apache.kafka.security.DelegationTokenManager
import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager, FetchManager, ProcessRole}
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.{GroupVersion, RequestLocal, ShareVersion, StreamsVersion, TransactionVersion}
+import org.apache.kafka.server.config.ServerLogConfigs
+import org.apache.kafka.server.policy.ClientConfigPolicy
import org.apache.kafka.server.share.context.ShareFetchContext
import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, SharePartitionKey}
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
@@ -72,6 +74,7 @@ import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.storage.internals.log.{AppendOrigin, RecordValidationStats}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
+import java.nio.charset.StandardCharsets
import java.util
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
@@ -120,6 +123,12 @@ class KafkaApis(val requestChannel: RequestChannel,
val configManager = new ConfigAdminManager(brokerId, config, configRepository)
val describeTopicPartitionsRequestHandler = new DescribeTopicPartitionsRequestHandler(
metadataCache, authHelper, config)
+ val clientConfigPolicy: Option[ClientConfigPolicy] =
+ Option(config.getConfiguredInstance(
+ ServerLogConfigs.CLIENT_CONFIG_POLICY_CLASS_NAME_CONFIG,
+ classOf[ClientConfigPolicy]))
+ val clientConfigMaxBytes: Int =
+ config.getInt(ServerLogConfigs.CLIENT_CONFIG_MAX_BYTES_CONFIG)
def close(): Unit = {
aclApis.close()
@@ -225,7 +234,9 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError)
case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError)
case ApiKeys.DESCRIBE_TOPIC_PARTITIONS => handleDescribeTopicPartitionsRequest(request)
+ case ApiKeys.GET_CONFIG_PROFILE_KEYS => handleGetConfigProfileKeysRequest(request)
case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => handleGetTelemetrySubscriptionsRequest(request)
+ case ApiKeys.PUSH_CONFIG => handlePushConfigRequest(request)
case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request)
case ApiKeys.LIST_CONFIG_RESOURCES => handleListConfigResources(request)
case ApiKeys.ADD_RAFT_VOTER => forwardToController(request)
@@ -2977,6 +2988,122 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
+ def handleGetConfigProfileKeysRequest(request: RequestChannel.Request): Unit = {
+ authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+
+ val clientInfo = request.context.clientInformation
+ // TODO: Extract clientInstanceId and clientMetadata from ApiVersionsRequest context
+ // For now, using placeholder values - need to update RequestContext/ClientInformation
+ // to store these fields from ApiVersionsRequest v5
+ val clientInstanceId = org.apache.kafka.common.Uuid.ZERO_UUID // TODO: Get from context
+ val clientMetadata = new java.util.TreeMap[String, String]() // TODO: Get from context
+
+ val clientProfile = new org.apache.kafka.server.policy.ClientProfile(
+ clientInstanceId,
+ clientInfo.softwareName,
+ clientInfo.softwareVersion,
+ clientMetadata
+ )
+
+ val (errorCode, errorMessage, crc, configKeys) = clientConfigPolicy match {
+ case Some(policy) =>
+ try {
+ val profileKeysOpt = policy.profileKeys(clientProfile)
+ OptionConverters.toScala(profileKeysOpt) match {
+ case Some(profileKeys) =>
+ val keys: List[String] = profileKeys.keys().asScala.toList
+ (Errors.NONE.code, null, profileKeys.crc(), keys)
+ case None =>
+ // No profile matched, return empty config keys
+ (Errors.NONE.code, null, 0L, List.empty[String])
+ }
+ } catch {
+ case e: org.apache.kafka.common.errors.UnknownConfigProfileException =>
+ (Errors.UNKNOWN_CONFIG_PROFILE.code, e.getMessage, 0L, List.empty[String])
+ case e: Exception =>
+ (Errors.UNKNOWN_SERVER_ERROR.code, e.getMessage, 0L, List.empty[String])
+ }
+ case None =>
+ // No policy configured, return empty config keys
+ (Errors.NONE.code, null, 0L, List.empty[String])
+ }
+
+ val configKeysList = configKeys.asJava
+
+ val responseData = new GetConfigProfileKeysResponseData()
+ .setErrorCode(errorCode.toShort)
+ .setErrorMessage(errorMessage)
+ .setConfigurationProfileCrc(crc)
+ .setConfigMaxBytes(clientConfigMaxBytes)
+ .setConfigKeys(configKeysList)
+
+ requestHelper.sendResponseMaybeThrottle(request,
+ requestThrottleMs => new GetConfigProfileKeysResponse(responseData.setThrottleTimeMs(requestThrottleMs)))
+ }
+
+ def handlePushConfigRequest(request: RequestChannel.Request): Unit = {
+ authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+
+ val clientInfo = request.context.clientInformation
+ val requestData = request.body[PushConfigRequest].data
+ val configsData = requestData.configs.asScala
+ val configs = configsData.map(c => c.configKey -> c.configValue).toMap
+
+ // Calculate size
+ val configsSize = configs.foldLeft(0)((acc, kv) =>
+ acc + kv._1.getBytes(StandardCharsets.UTF_8).length +
+ kv._2.getBytes(StandardCharsets.UTF_8).length)
+
+ val (errorCode, errorMessage) = if (configsSize > clientConfigMaxBytes) {
+ (Errors.CONFIG_TOO_LARGE.code, s"Configuration payload size ($configsSize bytes) exceeds limit ($clientConfigMaxBytes bytes)")
+ } else {
+ clientConfigPolicy match {
+ case Some(policy) =>
+ try {
+ // TODO: Extract clientInstanceId and clientMetadata from ApiVersionsRequest context
+ val clientInstanceId = org.apache.kafka.common.Uuid.ZERO_UUID // TODO: Get from context
+ val clientMetadata = new java.util.TreeMap[String, String]() // TODO: Get from context
+
+ val clientProfile = new org.apache.kafka.server.policy.ClientProfile(
+ clientInstanceId,
+ clientInfo.softwareName,
+ clientInfo.softwareVersion,
+ clientMetadata
+ )
+
+ val timestamp = time.milliseconds()
+ val pushConfigData = new org.apache.kafka.server.policy.ClientPushConfigData(
+ clientProfile,
+ configs.asJava,
+ timestamp
+ )
+
+ policy.process(pushConfigData)
+ (Errors.NONE.code, null)
+ } catch {
+ case e: org.apache.kafka.common.errors.UnknownConfigProfileException =>
+ (Errors.UNKNOWN_CONFIG_PROFILE.code, e.getMessage)
+ case e: org.apache.kafka.common.errors.ConfigTooLargeException =>
+ (Errors.CONFIG_TOO_LARGE.code, e.getMessage)
+ case e: org.apache.kafka.common.errors.ClientConfigPolicyException =>
+ (Errors.INVALID_CONFIG.code, e.getMessage)
+ case e: Exception =>
+ (Errors.UNKNOWN_SERVER_ERROR.code, e.getMessage)
+ }
+ case None =>
+ // No policy configured, accept all
+ (Errors.NONE.code, null)
+ }
+ }
+
+ val responseData = new PushConfigResponseData()
+ .setErrorCode(errorCode)
+ .setErrorMessage(errorMessage)
+
+ requestHelper.sendResponseMaybeThrottle(request,
+ requestThrottleMs => new PushConfigResponse(responseData.setThrottleTimeMs(requestThrottleMs)))
+ }
+
/**
* Handle ListConfigResourcesRequest. If resourceTypes are not specified, it uses ListConfigResourcesRequest#supportedResourceTypes
* to retrieve config resources. If resourceTypes are specified, it returns matched config resources.
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index df10430ad7246..43ee4b3e4472d 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -307,7 +307,7 @@ class RequestChannelTest {
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user"),
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT,
- new ClientInformation("name", "version"),
+ new ClientInformation("name", "version", null),
false)
}
diff --git a/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala b/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
index 400e13dc6bef7..90ed44eba617a 100644
--- a/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
@@ -128,7 +128,7 @@ class RequestConvertToJsonTest {
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user"),
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT,
- new ClientInformation("name", "version"),
+ new ClientInformation("name", "version", null),
false)
}
}
diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
index d8ffd8a5e2f1d..453dae17da34b 100644
--- a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
+++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
@@ -164,6 +164,15 @@ public class ServerLogConfigs {
"implement the org.apache.kafka.server.policy.AlterConfigPolicy interface. " +
"Note: This policy runs on the controller instead of the broker.
";
+ public static final String CLIENT_CONFIG_POLICY_CLASS_NAME_CONFIG = "client.config.policy.class.name";
+ public static final String CLIENT_CONFIG_POLICY_CLASS_NAME_DOC = "The client configuration policy class that should be used for validating client configurations. " +
+ "The class should implement the org.apache.kafka.server.policy.ClientConfigPolicy interface.";
+
+ public static final String CLIENT_CONFIG_MAX_BYTES_CONFIG = "client.config.max.bytes";
+ public static final int CLIENT_CONFIG_MAX_BYTES_DEFAULT = 1048576; // 1MB
+ public static final String CLIENT_CONFIG_MAX_BYTES_DOC = "Maximum size in bytes for client configuration data in PushConfig requests. " +
+ "Requests exceeding this limit will be rejected with CONFIG_TOO_LARGE error.";
+
public static final String LOG_INITIAL_TASK_DELAY_MS_CONFIG = LOG_PREFIX + "initial.task.delay.ms";
public static final long LOG_INITIAL_TASK_DELAY_MS_DEFAULT = 30 * 1000L;
public static final String LOG_INITIAL_TASK_DELAY_MS_DOC = "The initial task delay in millisecond when initializing " +
diff --git a/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java b/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java
index be0e4337c1413..42cc5b6fc5417 100644
--- a/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java
+++ b/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java
@@ -120,7 +120,7 @@ public void testAllResponseTypesHandled() {
@Test
public void testClientInfoNode() {
- ClientInformation clientInfo = new ClientInformation("name", "1");
+ ClientInformation clientInfo = new ClientInformation("name", "1", null);
ObjectNode expectedNode = JsonNodeFactory.instance.objectNode();
expectedNode.set("softwareName", new TextNode(clientInfo.softwareName()));
expectedNode.set("softwareVersion", new TextNode(clientInfo.softwareVersion()));
diff --git a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
index 8e8bb21b7da81..419ac130f0add 100644
--- a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
+++ b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
@@ -68,7 +68,7 @@ public static RequestContext requestContext() throws UnknownHostException {
KafkaPrincipal.ANONYMOUS,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT,
- new ClientInformation("apache-kafka-java", "3.5.2"),
+ new ClientInformation("apache-kafka-java", "3.5.2", null),
false);
}
@@ -94,7 +94,7 @@ public static RequestContext requestContextWithConnectionId(String connectionId)
KafkaPrincipal.ANONYMOUS,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT,
- new ClientInformation("apache-kafka-java", "3.5.2"),
+ new ClientInformation("apache-kafka-java", "3.5.2", null),
false);
}
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index f81d224e7ea7f..8f8dd9248e961 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -179,6 +179,8 @@ public Optional serverConfigName(String configName) {
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT, atLeast(0), MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC)
.define(ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_DOC)
.define(ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_DOC)
+ .define(ServerLogConfigs.CLIENT_CONFIG_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.CLIENT_CONFIG_POLICY_CLASS_NAME_DOC)
+ .define(ServerLogConfigs.CLIENT_CONFIG_MAX_BYTES_CONFIG, INT, ServerLogConfigs.CLIENT_CONFIG_MAX_BYTES_DEFAULT, atLeast(0), LOW, ServerLogConfigs.CLIENT_CONFIG_MAX_BYTES_DOC)
.define(ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG, LONG, ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DEFAULT, atLeast(1), LOW, ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DOC)
.defineInternal(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, LONG, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT, atLeast(0), LOW, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DOC);