diff --git a/docs/apis/_index.md b/docs/apis/_index.md
index c73b0ca16d251..eeafa904a2d2a 100644
--- a/docs/apis/_index.md
+++ b/docs/apis/_index.md
@@ -43,13 +43,14 @@ The Producer API allows applications to send streams of data to topics in the Ka
Examples of using the producer are shown in the [javadocs](/{version}/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html "Kafka 4.3 Javadoc").
To use the producer, add the following Maven dependency to your project:
-
-
-
- org.apache.kafka
- kafka-clients
- 4.3.0
-
+
+```xml
+
+ org.apache.kafka
+ kafka-clients
+ 4.3.0
+
+```
# Consumer API
@@ -58,13 +59,14 @@ The Consumer API allows applications to read streams of data from topics in the
Examples of using the consumer are shown in the [javadocs](/{version}/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html "Kafka 4.3 Javadoc").
To use the consumer, add the following Maven dependency to your project:
-
-
-
- org.apache.kafka
- kafka-clients
- 4.3.0
-
+
+```xml
+
+ org.apache.kafka
+ kafka-clients
+ 4.3.0
+
+```
# Share Consumer API
@@ -73,13 +75,14 @@ The Share Consumer API enables applications in a share group to cooperatively co
Examples of using the share consumer are shown in the [javadocs](/{version}/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaShareConsumer.html "Kafka 4.3 Javadoc").
To use the share consumer, add the following Maven dependency to your project:
-
-
-
- org.apache.kafka
- kafka-clients
- 4.3.0
-
+
+```xml
+
+ org.apache.kafka
+ kafka-clients
+ 4.3.0
+
+```
# Streams API
@@ -90,13 +93,14 @@ Examples of using this library are shown in the [javadocs](/{version}/javadoc/in
Additional documentation on using the Streams API is available [here](/43/documentation/streams).
To use Kafka Streams, add the following Maven dependency to your project:
-
-
-
- org.apache.kafka
- kafka-streams
- 4.3.0
-
+
+```xml
+
+ org.apache.kafka
+ kafka-streams
+ 4.3.0
+
+```
When using Scala you may optionally include the `kafka-streams-scala` library. Additional documentation on using the Kafka Streams DSL for Scala is available [in the developer guide](/43/documentation/streams/developer-guide/dsl-api.html#scala-dsl).
@@ -105,12 +109,14 @@ To use Kafka Streams DSL for Scala 2.13, add the following Maven dependency to y
> **⚠️ DEPRECATION NOTICE**: The `kafka-streams-scala` library is deprecated as of Kafka 4.3
> and will be removed in Kafka 5.0. Please migrate to using the Java Streams API directly from Scala.
> See the [migration guide](/{version}/streams/developer-guide/scala-migration) for details.
-
-
- org.apache.kafka
- kafka-streams-scala_2.13
- 4.3.0
-
+
+```xml
+
+ org.apache.kafka
+ kafka-streams-scala_2.13
+ 4.3.0
+
+```
# Connect API
@@ -125,12 +131,13 @@ Those who want to implement custom connectors can see the [javadoc](/{version}/j
The Admin API supports managing and inspecting topics, brokers, acls, and other Kafka objects.
To use the Admin API, add the following Maven dependency to your project:
-
-
-
- org.apache.kafka
- kafka-clients
- 4.3.0
-
+
+```xml
+
+ org.apache.kafka
+ kafka-clients
+ 4.3.0
+
+```
For more information about the Admin APIs, see the [javadoc](/{version}/javadoc/index.html?org/apache/kafka/clients/admin/Admin.html "Kafka 4.3 Javadoc").
diff --git a/docs/configuration/broker-configs.md b/docs/configuration/broker-configs.md
index 86b85f74ac8b8..bca732b31f835 100644
--- a/docs/configuration/broker-configs.md
+++ b/docs/configuration/broker-configs.md
@@ -47,34 +47,40 @@ From Kafka version 1.1 onwards, some of the broker configs can be updated withou
* `cluster-wide`: May be updated dynamically as a cluster-wide default. May also be updated as a per-broker value for testing.
To alter the current broker configs for broker id 0 (for example, the number of log cleaner threads):
-
-
- $ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2
+
+```bash
+$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2
+```
To describe the current dynamic broker configs for broker id 0:
-
-
- $ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe
+
+```bash
+$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe
+```
To delete a config override and revert to the statically configured or default value for broker id 0 (for example, the number of log cleaner threads):
-
-
- $ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config log.cleaner.threads
-To update the log level for a logger on broker id 0:
+```bash
+$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config log.cleaner.threads
+```
+To update the log level for a logger on broker id 0:
- $ bin/kafka-configs.sh --bootstrap-server localhost:9092 --broker-logger 0 --add-config org.apache.kafka.server.quota.ClientQuotaManager\$ThrottledChannelReaper=DEBUG --alter
+```bash
+$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --broker-logger 0 --add-config org.apache.kafka.server.quota.ClientQuotaManager\$ThrottledChannelReaper=DEBUG --alter
+```
Some configs may be configured as a cluster-wide default to maintain consistent values across the whole cluster. All brokers in the cluster will process the cluster default update. For example, to update log cleaner threads on all brokers:
-
-
- $ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2
+
+```bash
+$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2
+```
To describe the currently configured dynamic cluster-wide default configs:
-
-
- $ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe
+
+```bash
+$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe
+```
All configs that are configurable at cluster level may also be configured at per-broker level (e.g. for testing). If a config value is defined at different levels, the following order of precedence is used:
diff --git a/docs/configuration/configuration-providers.md b/docs/configuration/configuration-providers.md
index 5a6eb24e6a7fe..3d02912ca71f6 100644
--- a/docs/configuration/configuration-providers.md
+++ b/docs/configuration/configuration-providers.md
@@ -45,26 +45,29 @@ To use a configuration provider, specify it in your configuration using the `con
Configuration providers allow you to pass parameters and retrieve configuration data from various sources.
To specify configuration providers, you use a comma-separated list of aliases and the fully-qualified class names that implement the configuration providers:
-
-
- config.providers=provider1,provider2
- config.providers.provider1.class=com.example.Provider1
- config.providers.provider2.class=com.example.Provider2
+
+```properties
+config.providers=provider1,provider2
+config.providers.provider1.class=com.example.Provider1
+config.providers.provider2.class=com.example.Provider2
+```
Each provider can have its own set of parameters, which are passed in a specific format:
-
-
- config.providers..param.=
+
+```properties
+config.providers..param.=
+```
The `ConfigProvider` interface serves as a base for all configuration providers. Custom implementations of this interface can be created to retrieve configuration data from various sources. You can package the implementation as a JAR file, add the JAR to your classpath, and reference the provider's class in your configuration.
**Example custom provider configuration**
-
-
- config.providers=customProvider
- config.providers.customProvider.class=com.example.customProvider
- config.providers.customProvider.param.param1=value1
- config.providers.customProvider.param.param2=value2
+
+```properties
+config.providers=customProvider
+config.providers.customProvider.class=com.example.customProvider
+config.providers.customProvider.param.param1=value1
+config.providers.customProvider.param.param2=value2
+```
## DirectoryConfigProvider
@@ -75,16 +78,18 @@ Each file represents a key, and its content is the value. This provider is usefu
To restrict the files that the `DirectoryConfigProvider` can access, use the `allowed.paths` parameter. This parameter accepts a comma-separated list of paths that the provider is allowed to access. If not set, all paths are allowed.
**Example`DirectoryConfigProvider` configuration**
-
-
- config.providers=dirProvider
- config.providers.dirProvider.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider
- config.providers.dirProvider.param.allowed.paths=/path/to/dir1,/path/to/dir2
+
+```properties
+config.providers=dirProvider
+config.providers.dirProvider.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider
+config.providers.dirProvider.param.allowed.paths=/path/to/dir1,/path/to/dir2
+```
To reference a value supplied by the `DirectoryConfigProvider`, use the correct placeholder syntax:
-
-
- ${dirProvider::}
+
+```text
+${dirProvider::}
+```
## EnvVarConfigProvider
@@ -97,16 +102,18 @@ This provider is useful for configuring applications running in containers, for
To restrict which environment variables the `EnvVarConfigProvider` can access, use the `allowlist.pattern` parameter. This parameter accepts a regular expression that environment variable names must match to be used by the provider.
**Example`EnvVarConfigProvider` configuration**
-
-
- config.providers=envVarProvider
- config.providers.envVarProvider.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider
- config.providers.envVarProvider.param.allowlist.pattern=^MY_ENVAR1_.*
+
+```properties
+config.providers=envVarProvider
+config.providers.envVarProvider.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider
+config.providers.envVarProvider.param.allowlist.pattern=^MY_ENVAR1_.*
+```
To reference a value supplied by the `EnvVarConfigProvider`, use the correct placeholder syntax:
-
-
- ${envVarProvider:}
+
+```text
+${envVarProvider:}
+```
## FileConfigProvider
@@ -117,41 +124,46 @@ This provider is useful for loading configuration data from mounted files.
To restrict the file paths that the `FileConfigProvider` can access, use the `allowed.paths` parameter. This parameter accepts a comma-separated list of paths that the provider is allowed to access. If not set, all paths are allowed.
**Example`FileConfigProvider` configuration**
-
-
- config.providers=fileProvider
- config.providers.fileProvider.class=org.apache.kafka.common.config.provider.FileConfigProvider
- config.providers.fileProvider.param.allowed.paths=/path/to/config1,/path/to/config2
+
+```properties
+config.providers=fileProvider
+config.providers.fileProvider.class=org.apache.kafka.common.config.provider.FileConfigProvider
+config.providers.fileProvider.param.allowed.paths=/path/to/config1,/path/to/config2
+```
To reference a value supplied by the `FileConfigProvider`, use the correct placeholder syntax:
-
-
- ${fileProvider::}
+
+```text
+${fileProvider::}
+```
## Example: Referencing files
Here’s an example that uses a file configuration provider with Kafka Connect to provide authentication credentials to a database for a connector.
First, create a `connector-credentials.properties` configuration file with the following credentials:
-
-
- dbUsername=my-username
- dbPassword=my-password
+
+```properties
+dbUsername=my-username
+dbPassword=my-password
+```
Specify a `FileConfigProvider` in the Kafka Connect configuration:
**Example Kafka Connect configuration with a`FileConfigProvider`**
-
-
- config.providers=fileProvider
- config.providers.fileProvider.class=org.apache.kafka.common.config.provider.FileConfigProvider
+
+```properties
+config.providers=fileProvider
+config.providers.fileProvider.class=org.apache.kafka.common.config.provider.FileConfigProvider
+```
Next, reference the properties from the file in the connector configuration.
**Example connector configuration referencing file properties**
-
-
- database.user=${fileProvider:/path/to/connector-credentials.properties:dbUsername}
- database.password=${fileProvider:/path/to/connector-credentials.properties:dbPassword}
+
+```properties
+database.user=${fileProvider:/path/to/connector-credentials.properties:dbUsername}
+database.password=${fileProvider:/path/to/connector-credentials.properties:dbPassword}
+```
At runtime, the configuration provider reads and extracts the values from the properties file.
diff --git a/docs/configuration/system-properties.md b/docs/configuration/system-properties.md
index 2637441ffbfa7..6a82c1410416b 100644
--- a/docs/configuration/system-properties.md
+++ b/docs/configuration/system-properties.md
@@ -33,8 +33,10 @@ Kafka supports some configuration that can be enabled through Java system proper
This system property is used to determine which files, if any, are allowed to be read by the SASL OAUTHBEARER plugin. This property accepts comma-separated list of files. By default the value is an empty list.
If users want to enable some files, users need to explicitly set the system property like below.
-
- -Dorg.apache.kafka.sasl.oauthbearer.allowed.files=/tmp/token,/tmp/private_key.pem
+
+```bash
+-Dorg.apache.kafka.sasl.oauthbearer.allowed.files=/tmp/token,/tmp/private_key.pem
+```
@@ -61,8 +63,10 @@ Default Value:
This system property is used to set the allowed URLs as SASL OAUTHBEARER token or jwks endpoints. This property accepts comma-separated list of URLs. By default the value is an empty list.
If users want to enable some URLs, users need to explicitly set the system property like below.
-
- -Dorg.apache.kafka.sasl.oauthbearer.allowed.urls=https://www.example.com,file:///tmp/token
+
+```bash
+-Dorg.apache.kafka.sasl.oauthbearer.allowed.urls=https://www.example.com,file:///tmp/token
+```
@@ -89,12 +93,16 @@ Default Value:
This system property is used to disable the problematic login modules usage in SASL JAAS configuration. This property accepts comma-separated list of loginModule names. By default **com.sun.security.auth.module.JndiLoginModule** and **com.sun.security.auth.module.LdapLoginModule** loginModule is disabled.
If users want to enable JndiLoginModule or LdapLoginModule, users need to explicitly reset the system property like below. We advise the users to validate configurations and only allow trusted JNDI configurations. For more details [CVE-2023-25194](/community/cve-list/#CVE-2023-25194).
-
- -Dorg.apache.kafka.disallowed.login.modules=
+
+```bash
+-Dorg.apache.kafka.disallowed.login.modules=
+```
To disable more loginModules, update the system property with comma-separated loginModule names. Make sure to explicitly add **JndiLoginModule** module name to the comma-separated list like below.
-
- -Dorg.apache.kafka.disallowed.login.modules=com.sun.security.auth.module.JndiLoginModule,com.ibm.security.auth.module.LdapLoginModule,com.ibm.security.auth.module.Krb5LoginModule
+
+```bash
+-Dorg.apache.kafka.disallowed.login.modules=com.sun.security.auth.module.JndiLoginModule,com.ibm.security.auth.module.LdapLoginModule,com.ibm.security.auth.module.Krb5LoginModule
+```
The configuration is deprecated and will be removed in a future release. Please use **org.apache.kafka.allowed.login.modules** instead.
@@ -153,16 +161,22 @@ Default Value:
This system property controls the automatic loading of ConfigProvider implementations in Apache Kafka. ConfigProviders are used to dynamically supply configuration values from sources such as files, directories, or environment variables. This property accepts a comma-separated list of ConfigProvider names. By default, all built-in ConfigProviders are enabled, including **FileConfigProvider** , **DirectoryConfigProvider** , and **EnvVarConfigProvider**.
If users want to disable all automatic ConfigProviders, they need to explicitly set the system property as shown below. Disabling automatic ConfigProviders is recommended in environments where configuration data comes from untrusted sources or where increased security is required. For more details, see [CVE-2024-31141](/community/cve-list/#CVE-2024-31141).
-
- -Dorg.apache.kafka.automatic.config.providers=none
+
+```bash
+-Dorg.apache.kafka.automatic.config.providers=none
+```
To allow specific ConfigProviders, update the system property with a comma-separated list of fully qualified ConfigProvider class names. For example, to enable only the **EnvVarConfigProvider** , set the property as follows:
-
- -Dorg.apache.kafka.automatic.config.providers=org.apache.kafka.common.config.provider.EnvVarConfigProvider
+
+```bash
+-Dorg.apache.kafka.automatic.config.providers=org.apache.kafka.common.config.provider.EnvVarConfigProvider
+```
To use multiple ConfigProviders, include their names in a comma-separated list as shown below:
-
- -Dorg.apache.kafka.automatic.config.providers=org.apache.kafka.common.config.provider.FileConfigProvider,org.apache.kafka.common.config.provider.EnvVarConfigProvider
+
+```bash
+-Dorg.apache.kafka.automatic.config.providers=org.apache.kafka.common.config.provider.FileConfigProvider,org.apache.kafka.common.config.provider.EnvVarConfigProvider
+```
diff --git a/docs/configuration/tiered-storage-configs.md b/docs/configuration/tiered-storage-configs.md
index ac6041b660be5..a6d5d38bb2f0e 100644
--- a/docs/configuration/tiered-storage-configs.md
+++ b/docs/configuration/tiered-storage-configs.md
@@ -39,16 +39,17 @@ All configurations here should start with the prefix defined by `remote.log.meta
The implementation of `TopicBasedRemoteLogMetadataManager` needs to create admin, producer, and consumer clients for the internal topic `__remote_log_metadata`.
Additional configurations can be provided for different types of clients using the following configuration properties:
-
-
- # Configs for admin, producer, and consumer clients
- .remote.log.metadata.common.client. =
-
- # Configs only for admin client
- .remote.log.metadata.admin. =
-
- # Configs only for producer client
- .remote.log.metadata.producer. =
-
- # Configs only for consumer client
- .remote.log.metadata.consumer. =
+
+```properties
+# Configs for admin, producer, and consumer clients
+.remote.log.metadata.common.client. =
+
+# Configs only for admin client
+.remote.log.metadata.admin. =
+
+# Configs only for producer client
+.remote.log.metadata.producer. =
+
+# Configs only for consumer client
+.remote.log.metadata.consumer. =
+```
diff --git a/docs/configuration/topic-configs.md b/docs/configuration/topic-configs.md
index d95936c002370..dffba75e8d87f 100644
--- a/docs/configuration/topic-configs.md
+++ b/docs/configuration/topic-configs.md
@@ -27,26 +27,30 @@ type: docs
Configurations pertinent to topics have both a server default as well an optional per-topic override. If no per-topic configuration is given the server default is used. The override can be set at topic creation time by giving one or more `--config` options. This example creates a topic named _my-topic_ with a custom max message size and flush rate:
-
-
- $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 \
- --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
+
+```bash
+$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 \
+ --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
+```
Overrides can also be changed or set later using the alter configs command. This example updates the max message size for _my-topic_ :
-
-
- $ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
- --alter --add-config max.message.bytes=128000
+
+```bash
+$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
+ --alter --add-config max.message.bytes=128000
+```
To check overrides set on the topic you can do
-
-
- $ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe
+
+```bash
+$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe
+```
To remove an override you can do
-
-
- $ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
- --alter --delete-config max.message.bytes
+
+```bash
+$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
+ --alter --delete-config max.message.bytes
+```
Below is the topic configuration. The server's default configuration for this property is given under the Server Default Property heading. A given server default config value only applies to a topic if it does not have an explicit topic config override. {{< include-html file="/static/{version}/generated/topic_config.html" >}}
diff --git a/docs/design/design.md b/docs/design/design.md
index 24d66c198627c..e5eb380826394 100644
--- a/docs/design/design.md
+++ b/docs/design/design.md
@@ -370,17 +370,18 @@ Log compaction ensures that Kafka will always retain at least the last known val
So far we have described only the simpler approach to data retention where old log data is discarded after a fixed period of time or when the log reaches some predetermined size. This works well for temporal event data such as logging where each record stands alone. However an important class of data streams are the log of changes to keyed, mutable data (for example, the changes to a database table).
Let's discuss a concrete example of such a stream. Say we have a topic containing user email addresses; every time a user updates their email address we send a message to this topic using their user id as the primary key. Now say we send the following messages over some time period for a user with id 123, each message corresponding to a change in email address (messages for other ids are omitted):
-
-
- 123 => bill@microsoft.com
- .
- .
- .
- 123 => bill@gatesfoundation.org
- .
- .
- .
- 123 => bill@gmail.com
+
+```text
+123 => bill@microsoft.com
+ .
+ .
+ .
+123 => bill@gatesfoundation.org
+ .
+ .
+ .
+123 => bill@gmail.com
+```
Log compaction gives us a more granular retention mechanism so that we are guaranteed to retain at least the last update for each primary key (e.g. `bill@gmail.com`). By doing this we guarantee that the log contains a full snapshot of the final value for every key not just keys that changed recently. This means downstream consumers can restore their own state off this topic without us having to retain a complete log of all changes.
@@ -436,19 +437,22 @@ Log compaction is handled by the log cleaner, a pool of background threads that
### Configuring The Log Cleaner
The log cleaner is enabled by default. This will start the pool of cleaner threads. To enable log cleaning on a particular topic, add the log-specific property
-
-
- log.cleanup.policy=compact
+
+```properties
+log.cleanup.policy=compact
+```
The `log.cleanup.policy` property is a broker configuration setting defined in the broker's `server.properties` file; it affects all of the topics in the cluster that do not have a configuration override in place as documented [here](/documentation.html#brokerconfigs). The log cleaner can be configured to retain a minimum amount of the uncompacted "head" of the log. This is enabled by setting the compaction time lag.
-
-
- log.cleaner.min.compaction.lag.ms
+
+```properties
+log.cleaner.min.compaction.lag.ms
+```
This can be used to prevent messages newer than a minimum message age from being subject to compaction. If not set, all log segments are eligible for compaction except for the last segment, i.e. the one currently being written to. The active segment will not be compacted even if all of its messages are older than the minimum compaction time lag. The log cleaner can be configured to ensure a maximum delay after which the uncompacted "head" of the log becomes eligible for log compaction.
-
-
- log.cleaner.max.compaction.lag.ms
+
+```properties
+log.cleaner.max.compaction.lag.ms
+```
This can be used to prevent log with low produce rate from remaining ineligible for compaction for an unbounded duration. If not set, logs that do not exceed min.cleanable.dirty.ratio are not compacted. Note that this compaction deadline is not a hard guarantee since it is still subjected to the availability of log cleaner threads and the actual compaction time. You will want to monitor the uncleanable-partitions-count, max-clean-time-secs and max-compaction-delay-secs metrics.
diff --git a/docs/design/protocol.md b/docs/design/protocol.md
index 87966feee10f3..8807e5ebbbcf7 100644
--- a/docs/design/protocol.md
+++ b/docs/design/protocol.md
@@ -145,11 +145,12 @@ The [BNF](https://en.wikipedia.org/wiki/Backus%E2%80%93Naur_Form)s below give an
### Common Request and Response Structure
All requests and responses originate from the following grammar which will be incrementally describe through the rest of this document:
-
-
- RequestOrResponse => Size (RequestMessage | ResponseMessage)
- Size => int32
-
+
+```text
+RequestOrResponse => Size (RequestMessage | ResponseMessage)
+ Size => int32
+```
+
|
@@ -197,10 +198,11 @@ The following are the numeric codes that the stable ApiKey in the request can ta
This section gives details on each of the individual API Messages, their usage, their binary format, and the meaning of their fields.
The message consists of the header and body:
-
-
- Message => RequestOrResponseHeader Body
-
+
+```text
+Message => RequestOrResponseHeader Body
+```
+
`RequestOrResponseHeader` is the versioned request or response header. `Body` is the message-specific body.
diff --git a/docs/implementation/log.md b/docs/implementation/log.md
index ef54104964824..e7f6c257906fc 100644
--- a/docs/implementation/log.md
+++ b/docs/implementation/log.md
@@ -47,24 +47,25 @@ The actual process of reading from an offset requires first locating the log seg
The log provides the capability of getting the most recently written message to allow clients to start subscribing as of "right now". This is also useful in the case the consumer fails to consume its data within its SLA-specified number of days. In this case when the client attempts to consume a non-existent offset it is given an OutOfRangeException and can either reset itself or fail as appropriate to the use case.
The following is the format of the results sent to the consumer.
-
-
- MessageSetSend (fetch result)
-
- total length : 4 bytes
- error code : 2 bytes
- message 1 : x bytes
- ...
- message n : x bytes
-
-
- MultiMessageSetSend (multiFetch result)
-
- total length : 4 bytes
- error code : 2 bytes
- messageSetSend 1
- ...
- messageSetSend n
+
+```text
+MessageSetSend (fetch result)
+
+total length : 4 bytes
+error code : 2 bytes
+message 1 : x bytes
+...
+message n : x bytes
+
+
+MultiMessageSetSend (multiFetch result)
+
+total length : 4 bytes
+error code : 2 bytes
+messageSetSend 1
+...
+messageSetSend n
+```
## Deletes
diff --git a/docs/implementation/message-format.md b/docs/implementation/message-format.md
index d93cc025b32c5..29078b759c4b7 100644
--- a/docs/implementation/message-format.md
+++ b/docs/implementation/message-format.md
@@ -31,33 +31,34 @@ Messages (aka Records) are always written in batches. The technical term for a b
## Record Batch
The following is the on-disk format of a RecordBatch.
-
-
- baseOffset: int64
- batchLength: int32
- partitionLeaderEpoch: int32
- magic: int8 (current magic value is 2)
- crc: uint32
- attributes: int16
- bit 0~2:
- 0: no compression
- 1: gzip
- 2: snappy
- 3: lz4
- 4: zstd
- bit 3: timestampType
- bit 4: isTransactional (0 means not transactional)
- bit 5: isControlBatch (0 means not a control batch)
- bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)
- bit 7~15: unused
- lastOffsetDelta: int32
- baseTimestamp: int64
- maxTimestamp: int64
- producerId: int64
- producerEpoch: int16
- baseSequence: int32
- recordsCount: int32
- records: [Record]
+
+```text
+baseOffset: int64
+batchLength: int32
+partitionLeaderEpoch: int32
+magic: int8 (current magic value is 2)
+crc: uint32
+attributes: int16
+ bit 0~2:
+ 0: no compression
+ 1: gzip
+ 2: snappy
+ 3: lz4
+ 4: zstd
+ bit 3: timestampType
+ bit 4: isTransactional (0 means not transactional)
+ bit 5: isControlBatch (0 means not a control batch)
+ bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)
+ bit 7~15: unused
+lastOffsetDelta: int32
+baseTimestamp: int64
+maxTimestamp: int64
+producerId: int64
+producerEpoch: int16
+baseSequence: int32
+recordsCount: int32
+records: [Record]
+```
Note that when compression is enabled, the compressed record data is serialized directly following the count of the number of records.
@@ -74,37 +75,40 @@ Compaction may also modify the baseTimestamp if the record batch contains record
A control batch contains a single record called the control record. Control records should not be passed on to applications. Instead, they are used by consumers to filter out aborted transactional messages.
The key of a control record conforms to the following schema:
-
-
- version: int16 (current version is 0)
- type: int16 (0 indicates an abort marker, 1 indicates a commit)
+
+```text
+version: int16 (current version is 0)
+type: int16 (0 indicates an abort marker, 1 indicates a commit)
+```
The schema for the value of a control record is dependent on the type. The value is opaque to clients.
## Record
The on-disk format of each record is delineated below.
-
-
- length: varint
- attributes: int8
- bit 0~7: unused
- timestampDelta: varlong
- offsetDelta: varint
- keyLength: varint
- key: byte[]
- valueLength: varint
- value: byte[]
- headersCount: varint
- Headers => [Header]
+
+```text
+length: varint
+attributes: int8
+ bit 0~7: unused
+timestampDelta: varlong
+offsetDelta: varint
+keyLength: varint
+key: byte[]
+valueLength: varint
+value: byte[]
+headersCount: varint
+Headers => [Header]
+```
### Record Header
-
-
- headerKeyLength: varint
- headerKey: String
- headerValueLength: varint
- Value: byte[]
+
+```text
+headerKeyLength: varint
+headerKey: String
+headerValueLength: varint
+Value: byte[]
+```
The key of a record header is guaranteed to be non-null, while the value of a record header may be null. The order of headers in a record is preserved when producing and consuming.
diff --git a/docs/kafka-connect/administration.md b/docs/kafka-connect/administration.md
index 742a531742d40..0c7da8dfec683 100644
--- a/docs/kafka-connect/administration.md
+++ b/docs/kafka-connect/administration.md
@@ -39,22 +39,23 @@ If a Connect worker leaves the group, intentionally or due to a failure, Connect
The new Connect protocol is enabled when all the workers that form the Connect cluster are configured with `connect.protocol=compatible`, which is also the default value when this property is missing. Therefore, upgrading to the new Connect protocol happens automatically when all the workers upgrade to 2.3.0. A rolling upgrade of the Connect cluster will activate incremental cooperative rebalancing when the last worker joins on version 2.3.0.
You can use the REST API to view the current status of a connector and its tasks, including the ID of the worker to which each was assigned. For example, the `GET /connectors/file-source/status` request shows the status of a connector named `file-source`:
-
-
- {
- "name": "file-source",
- "connector": {
+
+```json
+{
+ "name": "file-source",
+ "connector": {
+ "state": "RUNNING",
+ "worker_id": "192.168.1.208:8083"
+ },
+ "tasks": [
+ {
+ "id": 0,
"state": "RUNNING",
- "worker_id": "192.168.1.208:8083"
- },
- "tasks": [
- {
- "id": 0,
- "state": "RUNNING",
- "worker_id": "192.168.1.209:8083"
- }
- ]
- }
+ "worker_id": "192.168.1.209:8083"
+ }
+ ]
+}
+```
Connectors and their tasks publish status updates to a shared topic (configured with `status.storage.topic`) which all workers in the cluster monitor. Because the workers consume this topic asynchronously, there is typically a (short) delay before a state change is visible through the status API. The following states are possible for a connector or one of its tasks:
diff --git a/docs/kafka-connect/connector-development-guide.md b/docs/kafka-connect/connector-development-guide.md
index d8544666e9620..6b8ab3d910cd7 100644
--- a/docs/kafka-connect/connector-development-guide.md
+++ b/docs/kafka-connect/connector-development-guide.md
@@ -57,58 +57,63 @@ The rest of this section will walk through some code to demonstrate the key step
### Connector Example
We'll cover the `SourceConnector` as a simple example. `SinkConnector` implementations are very similar. Pick a package and class name, these examples will use the `FileStreamSourceConnector` but substitute your own class name where appropriate. In order to make the plugin discoverable at runtime, add a ServiceLoader manifest to your resources in `META-INF/services/org.apache.kafka.connect.source.SourceConnector` with your fully-qualified class name on a single line:
-
-
- com.example.FileStreamSourceConnector
+
+```text
+com.example.FileStreamSourceConnector
+```
Create a class that inherits from `SourceConnector` and add a field that will store the configuration information to be propagated to the task(s) (the topic to send data to, and optionally - the filename to read from and the maximum batch size):
-
-
- package com.example;
-
- public class FileStreamSourceConnector extends SourceConnector {
- private Map props;
+
+```java
+package com.example;
+
+public class FileStreamSourceConnector extends SourceConnector {
+ private Map props;
+```
The easiest method to fill in is `taskClass()`, which defines the class that should be instantiated in worker processes to actually read the data:
-
-
- @Override
- public Class extends Task> taskClass() {
- return FileStreamSourceTask.class;
- }
+
+```java
+@Override
+public Class extends Task> taskClass() {
+ return FileStreamSourceTask.class;
+}
+```
We will define the `FileStreamSourceTask` class below. Next, we add some standard lifecycle methods, `start()` and `stop()`:
-
-
- @Override
- public void start(Map props) {
- // Initialization logic and setting up of resources can take place in this method.
- // This connector doesn't need to do any of that, but we do log a helpful message to the user.
-
- this.props = props;
- AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
- String filename = config.getString(FILE_CONFIG);
- filename = (filename == null || filename.isEmpty()) ? "standard input" : config.getString(FILE_CONFIG);
- log.info("Starting file source connector reading from {}", filename);
- }
-
- @Override
- public void stop() {
- // Nothing to do since no background monitoring is required.
- }
+
+```java
+@Override
+public void start(Map props) {
+ // Initialization logic and setting up of resources can take place in this method.
+ // This connector doesn't need to do any of that, but we do log a helpful message to the user.
+
+ this.props = props;
+ AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
+ String filename = config.getString(FILE_CONFIG);
+ filename = (filename == null || filename.isEmpty()) ? "standard input" : config.getString(FILE_CONFIG);
+ log.info("Starting file source connector reading from {}", filename);
+}
+
+@Override
+public void stop() {
+ // Nothing to do since no background monitoring is required.
+}
+```
Finally, the real core of the implementation is in `taskConfigs()`. In this case we are only handling a single file, so even though we may be permitted to generate more tasks as per the `maxTasks` argument, we return a list with only one entry:
-
-
- @Override
- public List |