From 143f081eb860407d78c76871d3b1277e8a066885 Mon Sep 17 00:00:00 2001 From: cychiu Date: Mon, 23 Mar 2026 20:41:18 +0900 Subject: [PATCH 1/9] refactor: move ImplicitLinkedHash* to internals --- .../utils/{ => internals}/ImplicitLinkedHashCollection.java | 2 +- .../{ => internals}/ImplicitLinkedHashMultiCollection.java | 4 ++-- .../{ => internals}/ImplicitLinkedHashCollectionTest.java | 2 +- .../ImplicitLinkedHashMultiCollectionTest.java | 4 ++-- .../main/java/kafka/server/share/SharePartitionManager.java | 2 +- .../java/kafka/server/share/SharePartitionManagerTest.java | 2 +- core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | 3 ++- .../main/java/org/apache/kafka/message/MessageGenerator.java | 4 ++-- .../jmh/common/ImplicitLinkedHashCollectionBenchmark.java | 2 +- .../test/java/org/apache/kafka/metadata/RecordTestUtils.java | 2 +- .../src/main/java/org/apache/kafka/server/FetchContext.java | 2 +- .../src/main/java/org/apache/kafka/server/FetchSession.java | 2 +- .../java/org/apache/kafka/server/FetchSessionCacheShard.java | 2 +- .../org/apache/kafka/server/share/CachedSharePartition.java | 2 +- .../org/apache/kafka/server/share/session/ShareSession.java | 2 +- .../apache/kafka/server/share/session/ShareSessionCache.java | 2 +- .../test/java/org/apache/kafka/server/FetchSessionTest.java | 2 +- .../kafka/server/share/session/ShareSessionCacheTest.java | 2 +- 18 files changed, 22 insertions(+), 21 deletions(-) rename clients/src/main/java/org/apache/kafka/common/utils/{ => internals}/ImplicitLinkedHashCollection.java (99%) rename clients/src/main/java/org/apache/kafka/common/utils/{ => internals}/ImplicitLinkedHashMultiCollection.java (97%) rename clients/src/test/java/org/apache/kafka/common/utils/{ => internals}/ImplicitLinkedHashCollectionTest.java (99%) rename clients/src/test/java/org/apache/kafka/common/utils/{ => internals}/ImplicitLinkedHashMultiCollectionTest.java (97%) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java b/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollection.java similarity index 99% rename from clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java rename to clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollection.java index 524b20b71887d..284e1055b41f9 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollection.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.common.utils; +package org.apache.kafka.common.utils.internals; import java.util.AbstractCollection; import java.util.AbstractSequentialList; diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollection.java b/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollection.java similarity index 97% rename from clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollection.java rename to clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollection.java index 6e0d28adb4ca9..17252a5285c32 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollection.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollection.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.common.utils; +package org.apache.kafka.common.utils.internals; import java.util.ArrayList; import java.util.Collections; @@ -24,7 +24,7 @@ /** * A memory-efficient hash multiset which tracks the order of insertion of elements. - * See org.apache.kafka.common.utils.ImplicitLinkedHashCollection for implementation details. + * See org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection for implementation details. * * This class is a multi-set because it allows multiple elements to be inserted that * have equivalent keys. diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java b/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollectionTest.java similarity index 99% rename from clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java rename to clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollectionTest.java index 4491fc88fc13f..5794b5de9786e 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollectionTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.utils; +package org.apache.kafka.common.utils.internals; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java b/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollectionTest.java similarity index 97% rename from clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java rename to clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollectionTest.java index a04f23fa4efe8..880664e1aa80e 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollectionTest.java @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.utils; +package org.apache.kafka.common.utils.internals; -import org.apache.kafka.common.utils.ImplicitLinkedHashCollectionTest.TestElement; +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollectionTest.TestElement; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 27da4cdafa623..48c1ac3be48f0 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -29,7 +29,7 @@ import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ShareRequestMetadata; -import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.common.utils.Time; import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider; import org.apache.kafka.server.common.ShareVersion; diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 9039d8369ef74..0b73d71abd4f2 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -48,7 +48,7 @@ import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.ShareFetchResponse; import org.apache.kafka.common.requests.ShareRequestMetadata; -import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.common.utils.Time; import org.apache.kafka.coordinator.group.GroupConfigManager; import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider; diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index c5974e7e3179b..4ce1ef84f6a6a 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -75,7 +75,8 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _} import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType} import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource -import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAndEpoch, SecurityUtils, Utils} +import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils} +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_SESSION_TIMEOUT_MS_CONFIG, STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG} import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager, GroupCoordinator, GroupCoordinatorConfig} diff --git a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java index f235bfa17dc74..2c791e1c2caf9 100644 --- a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java +++ b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java @@ -84,10 +84,10 @@ public final class MessageGenerator { static final String ARRAYLIST_CLASS = "java.util.ArrayList"; static final String IMPLICIT_LINKED_HASH_COLLECTION_CLASS = - "org.apache.kafka.common.utils.ImplicitLinkedHashCollection"; + "org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection"; static final String IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS = - "org.apache.kafka.common.utils.ImplicitLinkedHashMultiCollection"; + "org.apache.kafka.common.utils.internals.ImplicitLinkedHashMultiCollection"; static final String UNSUPPORTED_VERSION_EXCEPTION_CLASS = "org.apache.kafka.common.errors.UnsupportedVersionException"; diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ImplicitLinkedHashCollectionBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ImplicitLinkedHashCollectionBenchmark.java index 8861428103a52..0c41b0fc02b81 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ImplicitLinkedHashCollectionBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ImplicitLinkedHashCollectionBenchmark.java @@ -18,7 +18,7 @@ package org.apache.kafka.jmh.common; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; diff --git a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java index bd15406cbb056..913174a6c07b7 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Message; import org.apache.kafka.common.security.auth.SecurityProtocol; -import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.util.MockRandom; diff --git a/server/src/main/java/org/apache/kafka/server/FetchContext.java b/server/src/main/java/org/apache/kafka/server/FetchContext.java index 69c8dffb752b9..85c51072fce86 100644 --- a/server/src/main/java/org/apache/kafka/server/FetchContext.java +++ b/server/src/main/java/org/apache/kafka/server/FetchContext.java @@ -25,7 +25,7 @@ import org.apache.kafka.common.requests.FetchMetadata; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; -import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.FetchSession.CachedPartition; import org.apache.kafka.server.FetchSession.FetchSessionCache; diff --git a/server/src/main/java/org/apache/kafka/server/FetchSession.java b/server/src/main/java/org/apache/kafka/server/FetchSession.java index 45ad79bb02f69..a8f90e4c10cb2 100644 --- a/server/src/main/java/org/apache/kafka/server/FetchSession.java +++ b/server/src/main/java/org/apache/kafka/server/FetchSession.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; -import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.metrics.KafkaMetricsGroup; diff --git a/server/src/main/java/org/apache/kafka/server/FetchSessionCacheShard.java b/server/src/main/java/org/apache/kafka/server/FetchSessionCacheShard.java index b2d808542ec01..c59c222e887a9 100644 --- a/server/src/main/java/org/apache/kafka/server/FetchSessionCacheShard.java +++ b/server/src/main/java/org/apache/kafka/server/FetchSessionCacheShard.java @@ -17,7 +17,7 @@ package org.apache.kafka.server; import org.apache.kafka.common.requests.FetchMetadata; -import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.server.FetchSession.EvictableKey; import org.apache.kafka.server.FetchSession.LastUsedKey; diff --git a/server/src/main/java/org/apache/kafka/server/share/CachedSharePartition.java b/server/src/main/java/org/apache/kafka/server/share/CachedSharePartition.java index 8cfc936503969..bafbf43b3a3d3 100644 --- a/server/src/main/java/org/apache/kafka/server/share/CachedSharePartition.java +++ b/server/src/main/java/org/apache/kafka/server/share/CachedSharePartition.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.message.ShareFetchResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ShareFetchResponse; -import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import java.util.Objects; import java.util.Optional; diff --git a/server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java b/server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java index 7f9a47e29df78..9df63c10b41ac 100644 --- a/server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java +++ b/server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java @@ -18,7 +18,7 @@ package org.apache.kafka.server.share.session; import org.apache.kafka.common.TopicIdPartition; -import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.server.share.CachedSharePartition; import java.util.ArrayList; diff --git a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java index 0105169803942..29a33146543d6 100644 --- a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java +++ b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java @@ -18,7 +18,7 @@ package org.apache.kafka.server.share.session; import org.apache.kafka.common.requests.ShareRequestMetadata; -import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.server.metrics.KafkaMetricsGroup; import org.apache.kafka.server.network.ConnectionDisconnectListener; import org.apache.kafka.server.share.CachedSharePartition; diff --git a/server/src/test/java/org/apache/kafka/server/FetchSessionTest.java b/server/src/test/java/org/apache/kafka/server/FetchSessionTest.java index e929b789aac7c..a9308ed3b9b2a 100644 --- a/server/src/test/java/org/apache/kafka/server/FetchSessionTest.java +++ b/server/src/test/java/org/apache/kafka/server/FetchSessionTest.java @@ -28,7 +28,7 @@ import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchRequest.PartitionData; import org.apache.kafka.common.requests.FetchResponse; -import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.server.FetchContext.FullFetchContext; import org.apache.kafka.server.FetchContext.IncrementalFetchContext; import org.apache.kafka.server.FetchContext.SessionErrorContext; diff --git a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java index 16da3016d294a..c2cc3af450e76 100644 --- a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java +++ b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.server.share.session; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.server.share.CachedSharePartition; import org.apache.kafka.server.share.ShareGroupListener; import org.apache.kafka.test.TestUtils; From 48ee934c632fc10a9eef0f7eb281254f638a699d Mon Sep 17 00:00:00 2001 From: cychiu Date: Tue, 24 Mar 2026 08:53:54 +0900 Subject: [PATCH 2/9] refactor: move Byteutils to internals --- .../org/apache/kafka/common/compress/Lz4BlockOutputStream.java | 2 +- .../java/org/apache/kafka/common/network/SslTransportLayer.java | 2 +- .../org/apache/kafka/common/protocol/ByteBufferAccessor.java | 2 +- .../apache/kafka/common/protocol/DataOutputStreamWritable.java | 2 +- .../main/java/org/apache/kafka/common/protocol/SendBuilder.java | 2 +- .../org/apache/kafka/common/protocol/types/CompactArrayOf.java | 2 +- .../org/apache/kafka/common/protocol/types/TaggedFields.java | 2 +- .../main/java/org/apache/kafka/common/protocol/types/Type.java | 2 +- .../kafka/common/record/internal/AbstractLegacyRecordBatch.java | 2 +- .../org/apache/kafka/common/record/internal/DefaultRecord.java | 2 +- .../apache/kafka/common/record/internal/DefaultRecordBatch.java | 2 +- .../org/apache/kafka/common/record/internal/LegacyRecord.java | 2 +- .../apache/kafka/common/utils/{ => internals}/ByteUtils.java | 2 +- .../apache/kafka/common/message/SimpleExampleMessageTest.java | 2 +- .../kafka/common/protocol/types/ProtocolSerializationTest.java | 2 +- .../apache/kafka/common/record/internal/DefaultRecordTest.java | 2 +- .../kafka/common/record/internal/EndTransactionMarkerTest.java | 2 +- .../test/java/org/apache/kafka/common/utils/ByteUtilsTest.java | 1 + core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala | 2 +- .../apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java | 2 +- .../main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java | 2 +- .../java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java | 2 +- .../main/java/org/apache/kafka/raft/internals/BatchBuilder.java | 2 +- .../java/org/apache/kafka/raft/internals/RecordsIterator.java | 2 +- .../org/apache/kafka/raft/internals/BatchAccumulatorTest.java | 2 +- .../server/common/serialization/AbstractApiMessageSerde.java | 2 +- .../kafka/storage/internals/log/ProducerStateManager.java | 2 +- .../org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java | 2 +- .../kafka/streams/kstream/internals/ChangedDeserializer.java | 2 +- .../kafka/streams/kstream/internals/ChangedSerializer.java | 2 +- .../state/internals/AggregationWithHeadersSerializer.java | 2 +- .../kafka/streams/state/internals/HeadersDeserializer.java | 2 +- .../apache/kafka/streams/state/internals/HeadersSerializer.java | 2 +- .../apache/kafka/streams/state/internals/RecordConverters.java | 2 +- .../state/internals/TimestampedToHeadersWindowStoreAdapter.java | 2 +- .../java/org/apache/kafka/streams/state/internals/Utils.java | 2 +- .../state/internals/ValueTimestampHeadersDeserializer.java | 2 +- .../state/internals/ValueTimestampHeadersSerializer.java | 2 +- .../kafka/streams/kstream/internals/ChangedSerdeTest.java | 2 +- .../org/apache/kafka/streams/state/HeadersBytesStoreTest.java | 2 +- .../state/internals/TimeOrderedSessionStoreUpgradeTest.java | 2 +- .../state/internals/TimeOrderedWindowStoreUpgradeTest.java | 2 +- .../org/apache/kafka/streams/state/internals/UtilsTest.java | 2 +- 43 files changed, 43 insertions(+), 42 deletions(-) rename clients/src/main/java/org/apache/kafka/common/utils/{ => internals}/ByteUtils.java (99%) diff --git a/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java index 8d96afd2737f3..8137fcb9279b7 100644 --- a/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java @@ -17,7 +17,7 @@ package org.apache.kafka.common.compress; import org.apache.kafka.common.record.internal.CompressionType; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import net.jpountz.lz4.LZ4Compressor; import net.jpountz.lz4.LZ4Factory; diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index d047c79ce8a1c..02ce229c407fd 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -19,7 +19,7 @@ import org.apache.kafka.common.errors.SslAuthenticationException; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.utils.ByteBufferUnmapper; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java b/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java index c3e2886e656a7..160cf272c784f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java @@ -17,7 +17,7 @@ package org.apache.kafka.common.protocol; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import java.nio.ByteBuffer; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/DataOutputStreamWritable.java b/clients/src/main/java/org/apache/kafka/common/protocol/DataOutputStreamWritable.java index dcf53c4e52b29..fd4f39fbc0a0a 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/DataOutputStreamWritable.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/DataOutputStreamWritable.java @@ -17,7 +17,7 @@ package org.apache.kafka.common.protocol; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Utils; import java.io.Closeable; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SendBuilder.java b/clients/src/main/java/org/apache/kafka/common/protocol/SendBuilder.java index 11239d8e38eaf..509243b223732 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/SendBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/SendBuilder.java @@ -24,7 +24,7 @@ import org.apache.kafka.common.record.internal.UnalignedMemoryRecords; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.ResponseHeader; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import java.nio.ByteBuffer; import java.util.ArrayDeque; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java index e41a4c0dc7f3c..219858c28ed39 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java @@ -17,7 +17,7 @@ package org.apache.kafka.common.protocol.types; import org.apache.kafka.common.protocol.types.Type.DocumentedType; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import java.nio.ByteBuffer; import java.util.Optional; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java index 521f6346ed916..4ffa3b4bcb552 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java @@ -17,7 +17,7 @@ package org.apache.kafka.common.protocol.types; import org.apache.kafka.common.protocol.types.Type.DocumentedType; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import java.nio.ByteBuffer; import java.util.Collections; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java index 2a9c594cfe4d8..2a633b5b8ee5b 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java @@ -19,7 +19,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.record.internal.BaseRecords; import org.apache.kafka.common.record.internal.MemoryRecords; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; diff --git a/clients/src/main/java/org/apache/kafka/common/record/internal/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/internal/AbstractLegacyRecordBatch.java index aca803cf5f338..5cdb79f4e6a22 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/internal/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/internal/AbstractLegacyRecordBatch.java @@ -25,7 +25,7 @@ import org.apache.kafka.common.utils.AbstractIterator; import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.Utils; diff --git a/clients/src/main/java/org/apache/kafka/common/record/internal/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/internal/DefaultRecord.java index 8a3e37e839acb..983b993c8f820 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/internal/DefaultRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/internal/DefaultRecord.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Utils; import java.io.DataOutputStream; diff --git a/clients/src/main/java/org/apache/kafka/common/record/internal/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/internal/DefaultRecordBatch.java index acc590fd953dc..adc59d159366a 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/internal/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/internal/DefaultRecordBatch.java @@ -24,7 +24,7 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.Crc32C; diff --git a/clients/src/main/java/org/apache/kafka/common/record/internal/LegacyRecord.java b/clients/src/main/java/org/apache/kafka/common/record/internal/LegacyRecord.java index a481380c15bb1..3442c29f63606 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/internal/LegacyRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/internal/LegacyRecord.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Checksums; import org.apache.kafka.common.utils.Utils; diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/internals/ByteUtils.java similarity index 99% rename from clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java rename to clients/src/main/java/org/apache/kafka/common/utils/internals/ByteUtils.java index 4554d83f01557..b784b0584f073 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/internals/ByteUtils.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.utils; +package org.apache.kafka.common.utils.internals; import java.io.DataInput; import java.io.DataOutput; diff --git a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java index ba3bc23bf6054..2277891adc1af 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.common.protocol.ObjectSerializationCache; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import com.fasterxml.jackson.databind.JsonNode; diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java index de4e58f5d9625..14bd2ae2982c5 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.protocol.types; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/clients/src/test/java/org/apache/kafka/common/record/internal/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/internal/DefaultRecordTest.java index 5e73c1df43542..4afe2767b04b5 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/internal/DefaultRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/internal/DefaultRecordTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.junit.jupiter.api.Test; diff --git a/clients/src/test/java/org/apache/kafka/common/record/internal/EndTransactionMarkerTest.java b/clients/src/test/java/org/apache/kafka/common/record/internal/EndTransactionMarkerTest.java index 7438e9bba2cb8..df6478af79270 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/internal/EndTransactionMarkerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/internal/EndTransactionMarkerTest.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Type; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.junit.jupiter.api.Test; diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java index 1388caba568da..76c7bdecc79fc 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.utils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala index cc4d9e377ddfa..186b8d1d26047 100755 --- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala @@ -32,7 +32,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, ByteBufferAccessor, Errors} import org.apache.kafka.common.record.internal.{MemoryRecords, SimpleRecord} import org.apache.kafka.common.requests.{ProduceResponse, ResponseHeader} import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.common.utils.ByteUtils +import org.apache.kafka.common.utils.internals.ByteUtils import org.apache.kafka.common.{TopicPartition, Uuid, requests} import org.apache.kafka.server.config.ServerLogConfigs import org.junit.jupiter.api.Assertions._ diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java index 6fca76f18eb02..09a47d2856ef0 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.state.HeadersBytesStore; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.internals.Utils; diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java index be609087b23f0..1d0ee29edfed3 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java @@ -17,7 +17,7 @@ package org.apache.kafka.jmh.util; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.CompilerControl; diff --git a/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java b/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java index 1e61f10d8fe3b..16cb21ee28924 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.ObjectSerializationCache; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.serialization.MetadataParseException; diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java index d31cb9a1c4539..dda02eb72ae1b 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java @@ -27,7 +27,7 @@ import org.apache.kafka.common.record.internal.MemoryRecords; import org.apache.kafka.common.record.internal.RecordBatch; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.server.common.serialization.RecordSerde; import java.io.DataOutputStream; diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java index b89439c61110e..31c73926ad903 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.record.internal.MutableRecordBatch; import org.apache.kafka.common.record.internal.Records; import org.apache.kafka.common.utils.BufferSupplier; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.Batch; diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java index 7c5b59c4fa5ff..aad0b553ff6d5 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java @@ -28,7 +28,7 @@ import org.apache.kafka.common.record.internal.DefaultRecord; import org.apache.kafka.common.record.internal.MemoryRecordsBuilder; import org.apache.kafka.common.record.internal.RecordBatch; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; diff --git a/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java b/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java index a29b6a00fc005..9651b1e3951a2 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.protocol.Readable; import org.apache.kafka.common.protocol.Writable; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.server.common.ApiMessageAndVersion; /** diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java index 345021d3026bd..41069bcb95050 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.common.record.internal.RecordBatch; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Crc32C; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java index 17800cc27abe6..a6e1f5f9ca3c9 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.storage.internals.log; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java index 3ae46843289c2..6966ec12373e0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java @@ -18,7 +18,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.internals.SerdeGetter; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java index 219115c3b93dc..d7d88f71b9f05 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java @@ -18,7 +18,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.internals.UpgradeFromValues; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java index 4792052aa9e5f..9d982f6ba49f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java @@ -18,7 +18,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer; import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.state.AggregationWithHeaders; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java index 12ab040a68e53..18090042ce864 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java @@ -18,7 +18,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java index 6608a55f67b2f..f0df5900ed9de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java @@ -18,7 +18,7 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java index 058edad52c8b2..c040e831f8b8b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java @@ -18,7 +18,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import java.nio.ByteBuffer; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java index 2b054532a6bf7..5ac6a640a0fe2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStore; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java index c67bfc8dfbe85..0f7b12c32dbab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.StateSerdes; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java index a0f9acc4c9b39..73f41be794ca7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java @@ -19,7 +19,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer; import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.state.ValueTimestampHeaders; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java index f4e6fe07a22d6..df9dcba74ee09 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java @@ -18,7 +18,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer; import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.state.ValueTimestampHeaders; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java index 4d1b2b0b2124c..50dea706aeb56 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java @@ -19,7 +19,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java index 37cacde22c0af..d597b97daed17 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.state; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.junit.jupiter.api.Test; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java index 260567f137c90..865f145b45e97 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.DslStoreFormat; import org.apache.kafka.streams.KeyValue; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java index df278a3597282..dc26920680314 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java @@ -19,7 +19,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.WindowStore; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java index 631f906ee833f..51732bd82833c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.ValueTimestampHeaders; From dab4c839f5ad538a6b07e3a218d6e2f6cb51450b Mon Sep 17 00:00:00 2001 From: cychiu Date: Thu, 19 Mar 2026 16:35:14 +0900 Subject: [PATCH 3/9] refactor: replace BytesUtils with ByteUtils across the codebase --- .../org/apache/kafka/common/utils/Bytes.java | 14 +-- .../common/utils/internals/ByteUtils.java | 64 +++++++++++++ .../common/utils/internals/BytesUtils.java | 93 ------------------- .../apache/kafka/common/utils/BytesTest.java | 10 +- .../kafka/message/MessageGenerator.java | 2 +- gradle/spotbugs-exclude.xml | 2 +- .../kafka/jmh/util/BytesCompareBenchmark.java | 6 +- .../ForeignTableJoinProcessorSupplier.java | 4 +- .../state/internals/CachingKeyValueStore.java | 4 +- .../internals/DualColumnFamilyAccessor.java | 6 +- .../internals/InMemoryKeyValueStore.java | 4 +- .../internals/MemoryNavigableLRUCache.java | 4 +- .../state/internals/RocksDBRangeIterator.java | 4 +- .../streams/state/internals/RocksDBStore.java | 8 +- .../internals/SegmentedCacheFunction.java | 4 +- .../ChangeLoggingKeyValueBytesStoreTest.java | 4 +- .../internals/RocksDBRangeIteratorTest.java | 6 +- 17 files changed, 105 insertions(+), 134 deletions(-) delete mode 100644 clients/src/main/java/org/apache/kafka/common/utils/internals/BytesUtils.java diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java index 5791deec3d95e..24cfe3c8079df 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.utils; -import org.apache.kafka.common.utils.internals.BytesUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import java.io.Serializable; import java.util.Arrays; @@ -113,7 +113,7 @@ public boolean equals(Object other) { @Override public int compareTo(Bytes that) { - return BytesUtils.BYTES_LEXICO_COMPARATOR.compare(this.bytes, that.bytes); + return ByteUtils.BYTES_LEXICO_COMPARATOR.compare(this.bytes, that.bytes); } @Override @@ -166,17 +166,17 @@ private static String toString(final byte[] b, int off, int len) { * @return A new copy of the incremented byte array. * @throws IndexOutOfBoundsException if incrementing causes the underlying input byte array to overflow. * @deprecated This method is not part of the public API and will be removed in version 5.0. - * Internal Kafka code should use {@link org.apache.kafka.common.utils.internals.BytesUtils#increment(Bytes)} instead. + * Internal Kafka code should use {@link org.apache.kafka.common.utils.internals.ByteUtils#increment(Bytes)} instead. */ @Deprecated(since = "4.3", forRemoval = true) public static Bytes increment(Bytes input) throws IndexOutOfBoundsException { - return BytesUtils.increment(input); + return ByteUtils.increment(input); } /** * A byte array comparator based on lexicographic ordering. * @deprecated This field is not part of the public API and will be removed in version 5.0. - * Internal Kafka code should use {@link org.apache.kafka.common.utils.internals.BytesUtils#BYTES_LEXICO_COMPARATOR} instead. + * Internal Kafka code should use {@link org.apache.kafka.common.utils.internals.ByteUtils#BYTES_LEXICO_COMPARATOR} instead. */ @Deprecated(since = "4.3", forRemoval = true) public static final ByteArrayComparator BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator(); @@ -185,7 +185,7 @@ public static Bytes increment(Bytes input) throws IndexOutOfBoundsException { * A byte array comparator interface. * * @deprecated This interface is not part of the public API and will be removed in version 5.0. - * Internal Kafka code should use {@link org.apache.kafka.common.utils.internals.BytesUtils.ByteArrayComparator} instead. + * Internal Kafka code should use {@link org.apache.kafka.common.utils.internals.ByteUtils.ByteArrayComparator} instead. */ @Deprecated(since = "4.3", forRemoval = true) public interface ByteArrayComparator extends Comparator, Serializable { @@ -194,6 +194,6 @@ int compare(final byte[] buffer1, int offset1, int length1, final byte[] buffer2, int offset2, int length2); } - private static class LexicographicByteArrayComparator extends BytesUtils.LexicographicByteArrayComparator implements ByteArrayComparator { + private static class LexicographicByteArrayComparator extends ByteUtils.LexicographicByteArrayComparator implements ByteArrayComparator { } } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/internals/ByteUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/internals/ByteUtils.java index b784b0584f073..9a20b8cb91fa2 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/internals/ByteUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/internals/ByteUtils.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.common.utils.internals; +import org.apache.kafka.common.utils.Bytes; + import java.io.DataInput; import java.io.DataOutput; import java.io.EOFException; @@ -23,6 +25,8 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Comparator; /** * This classes exposes low-level methods for reading/writing from byte streams or buffers. @@ -36,6 +40,66 @@ public final class ByteUtils { private ByteUtils() {} + /** + * Increment the underlying byte array by adding 1. + * + * @param input - The byte array to increment + * @return A new copy of the incremented byte array + * @throws IndexOutOfBoundsException if incrementing causes the underlying input byte array to overflow + */ + public static Bytes increment(Bytes input) throws IndexOutOfBoundsException { + byte[] inputArr = input.get(); + byte[] ret = new byte[inputArr.length]; + int carry = 1; + for (int i = inputArr.length - 1; i >= 0; i--) { + if (inputArr[i] == (byte) 0xFF && carry == 1) { + ret[i] = (byte) 0x00; + } else { + ret[i] = (byte) (inputArr[i] + carry); + carry = 0; + } + } + if (carry == 0) { + return Bytes.wrap(ret); + } else { + throw new IndexOutOfBoundsException(); + } + } + + /** + * A byte array comparator based on lexicographic ordering. + */ + public static final ByteArrayComparator BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator(); + + public interface ByteArrayComparator extends Comparator { + + int compare(final byte[] buffer1, int offset1, int length1, + final byte[] buffer2, int offset2, int length2); + } + + public static class LexicographicByteArrayComparator implements ByteArrayComparator { + + @Override + public int compare(byte[] buffer1, byte[] buffer2) { + return compare(buffer1, 0, buffer1.length, buffer2, 0, buffer2.length); + } + + public int compare(final byte[] buffer1, int offset1, int length1, + final byte[] buffer2, int offset2, int length2) { + + // short circuit equal case + if (buffer1 == buffer2 && + offset1 == offset2 && + length1 == length2) { + return 0; + } + + int end1 = offset1 + length1; + int end2 = offset2 + length2; + return Arrays.compareUnsigned(buffer1, offset1, end1, buffer2, offset2, end2); + } + } + /** * Read an unsigned integer from the current position in the buffer, incrementing the position by 4 bytes * diff --git a/clients/src/main/java/org/apache/kafka/common/utils/internals/BytesUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/internals/BytesUtils.java deleted file mode 100644 index 4823905aa4bf1..0000000000000 --- a/clients/src/main/java/org/apache/kafka/common/utils/internals/BytesUtils.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.utils.internals; - -import org.apache.kafka.common.utils.Bytes; - -import java.util.Arrays; -import java.util.Comparator; - -/** - * Internal utility class for Bytes-related operations. - * This class is for internal Kafka use only and is not part of the public API. - */ -public final class BytesUtils { - - private BytesUtils() { - // Utility class, prevent instantiation - } - - /** - * Increment the underlying byte array by adding 1. - * - * @param input - The byte array to increment - * @return A new copy of the incremented byte array - * @throws IndexOutOfBoundsException if incrementing causes the underlying input byte array to overflow - */ - public static Bytes increment(Bytes input) throws IndexOutOfBoundsException { - byte[] inputArr = input.get(); - byte[] ret = new byte[inputArr.length]; - int carry = 1; - for (int i = inputArr.length - 1; i >= 0; i--) { - if (inputArr[i] == (byte) 0xFF && carry == 1) { - ret[i] = (byte) 0x00; - } else { - ret[i] = (byte) (inputArr[i] + carry); - carry = 0; - } - } - if (carry == 0) { - return Bytes.wrap(ret); - } else { - throw new IndexOutOfBoundsException(); - } - } - - /** - * A byte array comparator based on lexicographic ordering. - */ - public static final ByteArrayComparator BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator(); - - public interface ByteArrayComparator extends Comparator { - - int compare(final byte[] buffer1, int offset1, int length1, - final byte[] buffer2, int offset2, int length2); - } - - public static class LexicographicByteArrayComparator implements ByteArrayComparator { - - @Override - public int compare(byte[] buffer1, byte[] buffer2) { - return compare(buffer1, 0, buffer1.length, buffer2, 0, buffer2.length); - } - - public int compare(final byte[] buffer1, int offset1, int length1, - final byte[] buffer2, int offset2, int length2) { - - // short circuit equal case - if (buffer1 == buffer2 && - offset1 == offset2 && - length1 == length2) { - return 0; - } - - int end1 = offset1 + length1; - int end2 = offset2 + length2; - return Arrays.compareUnsigned(buffer1, offset1, end1, buffer2, offset2, end2); - } - } -} diff --git a/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java b/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java index c68527243cde8..d3aee800ed20f 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.utils; -import org.apache.kafka.common.utils.internals.BytesUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.junit.jupiter.api.Test; @@ -36,14 +36,14 @@ public class BytesTest { public void testIncrement() { byte[] input = new byte[]{(byte) 0xAB, (byte) 0xCD, (byte) 0xFF}; byte[] expected = new byte[]{(byte) 0xAB, (byte) 0xCE, (byte) 0x00}; - Bytes output = BytesUtils.increment(Bytes.wrap(input)); + Bytes output = ByteUtils.increment(Bytes.wrap(input)); assertArrayEquals(output.get(), expected); } @Test public void testIncrementUpperBoundary() { byte[] input = new byte[]{(byte) 0xFF, (byte) 0xFF, (byte) 0xFF}; - assertThrows(IndexOutOfBoundsException.class, () -> BytesUtils.increment(Bytes.wrap(input))); + assertThrows(IndexOutOfBoundsException.class, () -> ByteUtils.increment(Bytes.wrap(input))); } @Test @@ -66,7 +66,7 @@ public void testIncrementWithSubmap() { map.put(key5, val); Bytes prefix = key1; - Bytes prefixEnd = BytesUtils.increment(prefix); + Bytes prefixEnd = ByteUtils.increment(prefix); Comparator comparator = map.comparator(); final int result = comparator == null ? prefix.compareTo(prefixEnd) : comparator.compare(prefix, prefixEnd); @@ -114,7 +114,7 @@ public void testBytesLexicographicCases() { } private int cmp(String l, String r) { - return BytesUtils.BYTES_LEXICO_COMPARATOR.compare( + return ByteUtils.BYTES_LEXICO_COMPARATOR.compare( l.getBytes(StandardCharsets.UTF_8), r.getBytes(StandardCharsets.UTF_8)); } diff --git a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java index 2c791e1c2caf9..f09baf230ab8a 100644 --- a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java +++ b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java @@ -120,7 +120,7 @@ public final class MessageGenerator { static final String RESPONSE_SUFFIX = "Response"; - static final String BYTE_UTILS_CLASS = "org.apache.kafka.common.utils.ByteUtils"; + static final String BYTE_UTILS_CLASS = "org.apache.kafka.common.utils.internals.ByteUtils"; static final String STANDARD_CHARSETS = "java.nio.charset.StandardCharsets"; diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 455c9814c7835..6581d229ba6f3 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -678,7 +678,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - + diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java index 23889576ca1a0..7fda479cc8796 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.jmh.util; -import org.apache.kafka.common.utils.internals.BytesUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Fork; @@ -45,7 +45,7 @@ public class BytesCompareBenchmark { private byte[][] tv; private TreeMap oldMap = new TreeMap<>(new HandwrittenLexicoComparator()); - private TreeMap newMap = new TreeMap<>(BytesUtils.BYTES_LEXICO_COMPARATOR); + private TreeMap newMap = new TreeMap<>(ByteUtils.BYTES_LEXICO_COMPARATOR); @Setup public void setup() { @@ -74,7 +74,7 @@ public void samePrefixLexicoJdk(Blackhole bh) { } } - static class HandwrittenLexicoComparator implements BytesUtils.ByteArrayComparator { + static class HandwrittenLexicoComparator implements ByteUtils.ByteArrayComparator { @Override public int compare(byte[] buffer1, byte[] buffer2) { return compare(buffer1, 0, buffer1.length, buffer2, 0, buffer2.length); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java index fd6038afda090..55660826efbc4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java @@ -18,7 +18,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.common.utils.internals.BytesUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.processor.api.ContextualProcessor; @@ -124,7 +124,7 @@ public void process(final Record> record) { //Perform the prefixScan and propagate the results try (final KeyValueIterator>> prefixScanResults = - subscriptionStore.range(prefixBytes, BytesUtils.increment(prefixBytes))) { + subscriptionStore.range(prefixBytes, ByteUtils.increment(prefixBytes))) { while (prefixScanResults.hasNext()) { final KeyValue>> next = prefixScanResults.next(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 3d6efc243a0c8..8ef68e0d06c12 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -19,7 +19,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.common.utils.internals.BytesUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.processor.StateStore; @@ -427,7 +427,7 @@ public , P> KeyValueIterator prefixScan( validateStoreOpen(); final KeyValueIterator storeIterator = wrapped().prefixScan(prefix, prefixKeySerializer); final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix)); - final Bytes to = BytesUtils.increment(from); + final Bytes to = ByteUtils.increment(from); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = internalContext.cache().range(cacheName, from, to, false); return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java index 4d0ae0222ca43..db4274cc62582 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.common.utils.internals.BytesUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.state.internals.RocksDBStore.DBAccessor; @@ -271,7 +271,7 @@ private static class RocksDBDualCFIterator // RocksDB's JNI interface does not expose getters/setters that allow the // comparator to be pluggable, and the default is lexicographic, so it's // safe to just force lexicographic comparator here for now. - private final Comparator comparator = BytesUtils.BYTES_LEXICO_COMPARATOR; + private final Comparator comparator = ByteUtils.BYTES_LEXICO_COMPARATOR; private final String storeName; private final RocksIterator iterNewFormat; @@ -401,7 +401,7 @@ private static class RocksDBDualCFRangeIterator extends RocksDBDualCFIterator { // RocksDB's JNI interface does not expose getters/setters that allow the // comparator to be pluggable, and the default is lexicographic, so it's // safe to just force lexicographic comparator here for now. - private final Comparator comparator = BytesUtils.BYTES_LEXICO_COMPARATOR; + private final Comparator comparator = ByteUtils.BYTES_LEXICO_COMPARATOR; private final byte[] rawLastKey; private final boolean forward; private final boolean toInclusive; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index f74d8e6387a01..4b5336169bfb0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.common.utils.internals.BytesUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateStore; @@ -174,7 +174,7 @@ public synchronized void putAll(final List> entries) { public synchronized , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix)); - final Bytes to = BytesUtils.increment(from); + final Bytes to = ByteUtils.increment(from); return new InMemoryKeyValueIterator(map.subMap(from, true, to, false).keySet(), true); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java index 1e5e102c0a13c..0406f217aecae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java @@ -18,7 +18,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.common.utils.internals.BytesUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; @@ -90,7 +90,7 @@ private Iterator getIterator(final TreeMap treeMap, final public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix)); - final Bytes to = BytesUtils.increment(from); + final Bytes to = ByteUtils.increment(from); final TreeMap treeMap = toTreeMap(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java index a8f7c6fcf79ee..0617349c72417 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.common.utils.internals.BytesUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.KeyValue; import org.rocksdb.RocksIterator; @@ -28,7 +28,7 @@ class RocksDBRangeIterator extends RocksDbIterator { // RocksDB's JNI interface does not expose getters/setters that allow the // comparator to be pluggable, and the default is lexicographic, so it's // safe to just force lexicographic comparator here for now. - private final Comparator comparator = BytesUtils.BYTES_LEXICO_COMPARATOR; + private final Comparator comparator = ByteUtils.BYTES_LEXICO_COMPARATOR; private final byte[] rawLastKey; private final boolean forward; private final boolean toInclusive; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 8de21f4d40f66..7af8879ed9bc5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.common.utils.internals.BytesUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.InvalidStateStoreException; @@ -564,7 +564,7 @@ void deleteRange(final Bytes keyFrom, final Bytes keyTo) { // RocksDB's deleteRange() does not support a null upper bound so in the event // of overflow from increment(), the operation cannot be performed and an // IndexOutOfBoundsException will be thrown. - cfAccessor.deleteRange(dbAccessor, keyFrom.get(), BytesUtils.increment(keyTo).get()); + cfAccessor.deleteRange(dbAccessor, keyFrom.get(), ByteUtils.increment(keyTo).get()); } @Override @@ -1095,7 +1095,7 @@ public Position getPosition() { } /** - * Same as {@link BytesUtils#increment(Bytes)} but {@code null} is returned instead of throwing + * Same as {@link ByteUtils#increment(Bytes)} but {@code null} is returned instead of throwing * {@code IndexOutOfBoundsException} in the event of overflow. * * @param input bytes to increment @@ -1104,7 +1104,7 @@ public Position getPosition() { */ static Bytes incrementWithoutOverflow(final Bytes input) { try { - return BytesUtils.increment(input); + return ByteUtils.increment(input); } catch (final IndexOutOfBoundsException e) { return null; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java index 1d3cbea4a9f46..089c680126985 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java @@ -18,7 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.common.utils.internals.BytesUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.state.internals.SegmentedBytesStore.KeySchema; import java.nio.ByteBuffer; @@ -78,7 +78,7 @@ int compareSegmentedKeys(final Bytes cacheKey, final Bytes storeKey) { if (segmentCompare == 0) { final byte[] cacheKeyBytes = cacheKey.get(); final byte[] storeKeyBytes = storeKey.get(); - return BytesUtils.BYTES_LEXICO_COMPARATOR.compare( + return ByteUtils.BYTES_LEXICO_COMPARATOR.compare( cacheKeyBytes, SEGMENT_ID_BYTES, cacheKeyBytes.length - SEGMENT_ID_BYTES, storeKeyBytes, 0, storeKeyBytes.length ); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java index 05d64e31ed6d1..504b7b3e07c79 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java @@ -25,7 +25,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.internals.BytesUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig; @@ -221,7 +221,7 @@ public void shouldReturnValueOnGetWhenExists() { @Test public void shouldGetRecordsWithPrefixKey() { store.put(hi, there); - store.put(BytesUtils.increment(hi), world); + store.put(ByteUtils.increment(hi), world); final List keys = new ArrayList<>(); final List values = new ArrayList<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java index 8ef4046794dc9..0ed49a3c75c27 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.common.utils.internals.BytesUtils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -125,7 +125,7 @@ public void shouldReturnAllKeysInTheRangeReverseDirection() { @Test public void shouldReturnAllKeysWhenLastKeyIsGreaterThanLargestKeyInStateStoreInForwardDirection() { - final Bytes toBytes = BytesUtils.increment(key4Bytes); + final Bytes toBytes = ByteUtils.increment(key4Bytes); final RocksIterator rocksIterator = mock(RocksIterator.class); doNothing().when(rocksIterator).seek(key1Bytes.get()); when(rocksIterator.isValid()) @@ -341,7 +341,7 @@ public void shouldReturnTheCurrentKeyOnInvokingPeekNextKeyInForwardDirection() { @Test public void shouldReturnTheCurrentKeyOnInvokingPeekNextKeyInReverseDirection() { final RocksIterator rocksIterator = mock(RocksIterator.class); - final Bytes toBytes = BytesUtils.increment(key4Bytes); + final Bytes toBytes = ByteUtils.increment(key4Bytes); doNothing().when(rocksIterator).seekForPrev(toBytes.get()); when(rocksIterator.isValid()) .thenReturn(true) From 8873d5560e662b00e1e6255ac86076bfe0e831e5 Mon Sep 17 00:00:00 2001 From: cychiu Date: Thu, 19 Mar 2026 16:08:13 +0900 Subject: [PATCH 4/9] fix typo --- .../utils/internals/ImplicitLinkedHashCollection.java | 7 ++----- .../utils/internals/ImplicitLinkedHashCollectionTest.java | 8 ++++---- .../internals/ImplicitLinkedHashMultiCollectionTest.java | 8 ++++---- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollection.java b/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollection.java index 284e1055b41f9..a21e9cab1867e 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollection.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollection.java @@ -594,12 +594,11 @@ public final void clear(int expectedNumElements) { // Optimize away object allocations for empty sets. this.head = HeadElement.EMPTY; this.elements = EMPTY_ELEMENTS; - this.size = 0; } else { this.head = new HeadElement(); this.elements = new Element[calculateCapacity(expectedNumElements)]; - this.size = 0; } + this.size = 0; } /** @@ -689,8 +688,6 @@ public void sort(Comparator comparator) { array.add(e); } array.sort(comparator); - for (E e : array) { - add(e); - } + this.addAll(array); } } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollectionTest.java b/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollectionTest.java index 5794b5de9786e..1dbc928e8df36 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollectionTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollectionTest.java @@ -138,12 +138,12 @@ static void expectTraversal(Iterator iterator, Integer... sequence) int i = 0; while (iterator.hasNext()) { TestElement element = iterator.next(); - assertTrue(i < sequence.length, "Iterator yieled " + (i + 1) + " elements, but only " + + assertTrue(i < sequence.length, "Iterator yield " + (i + 1) + " elements, but only " + sequence.length + " were expected."); assertEquals(sequence[i].intValue(), element.key, "Iterator value number " + (i + 1) + " was incorrect."); i = i + 1; } - assertEquals(sequence.length, i, "Iterator yieled " + (i + 1) + " elements, but " + + assertEquals(sequence.length, i, "Iterator yield " + (i + 1) + " elements, but " + sequence.length + " were expected."); } @@ -151,13 +151,13 @@ static void expectTraversal(Iterator iter, Iterator expect int i = 0; while (iter.hasNext()) { TestElement element = iter.next(); - assertTrue(expectedIter.hasNext(), "Iterator yieled " + (i + 1) + " elements, but only " + i + + assertTrue(expectedIter.hasNext(), "Iterator yield " + (i + 1) + " elements, but only " + i + " were expected."); Integer expected = expectedIter.next(); assertEquals(expected.intValue(), element.key, "Iterator value number " + (i + 1) + " was incorrect."); i = i + 1; } - assertFalse(expectedIter.hasNext(), "Iterator yieled " + i + " elements, but at least " + (i + 1) + + assertFalse(expectedIter.hasNext(), "Iterator yield " + i + " elements, but at least " + (i + 1) + " were expected."); } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollectionTest.java b/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollectionTest.java index 880664e1aa80e..287283d2d58da 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollectionTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollectionTest.java @@ -92,14 +92,14 @@ static void expectExactTraversal(Iterator iterator, TestElement... int i = 0; while (iterator.hasNext()) { TestElement element = iterator.next(); - assertTrue(i < sequence.length, "Iterator yieled " + (i + 1) + " elements, but only " + + assertTrue(i < sequence.length, "Iterator yield " + (i + 1) + " elements, but only " + sequence.length + " were expected."); if (sequence[i] != element) { fail("Iterator value number " + (i + 1) + " was incorrect."); } i = i + 1; } - assertEquals(sequence.length, i, "Iterator yieled " + (i + 1) + " elements, but " + + assertEquals(sequence.length, i, "Iterator yield " + (i + 1) + " elements, but " + sequence.length + " were expected."); } @@ -158,12 +158,12 @@ void expectTraversal(Iterator iter, Iterator expectedI while (iter.hasNext()) { TestElement element = iter.next(); assertTrue(expectedIter.hasNext(), - "Iterator yieled " + (i + 1) + " elements, but only " + i + " were expected."); + "Iterator yield " + (i + 1) + " elements, but only " + i + " were expected."); TestElement expected = expectedIter.next(); assertSame(expected, element, "Iterator value number " + (i + 1) + " was incorrect."); i = i + 1; } assertFalse(expectedIter.hasNext(), - "Iterator yieled " + i + " elements, but at least " + (i + 1) + " were expected."); + "Iterator yield " + i + " elements, but at least " + (i + 1) + " were expected."); } } From a6491164383565911383ba5928d8d0d2768cc16f Mon Sep 17 00:00:00 2001 From: cychiu Date: Tue, 24 Mar 2026 09:00:53 +0900 Subject: [PATCH 5/9] refactor: move Byteutils to internals --- .../common/utils/{ => internals}/ByteUtilsTest.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) rename clients/src/test/java/org/apache/kafka/common/utils/{ => internals}/ByteUtilsTest.java (98%) diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/internals/ByteUtilsTest.java similarity index 98% rename from clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java rename to clients/src/test/java/org/apache/kafka/common/utils/internals/ByteUtilsTest.java index 76c7bdecc79fc..916df6f565586 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/internals/ByteUtilsTest.java @@ -14,9 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.utils; +package org.apache.kafka.common.utils.internals; -import org.apache.kafka.common.utils.internals.ByteUtils; +import org.apache.kafka.common.utils.ByteBufferInputStream; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.Utils; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -422,7 +425,7 @@ private void assertUnsignedVarintSerde(int value, byte[] expectedEncoding) throw ByteBuffer buf = ByteBuffer.allocate(MAX_LENGTH_VARINT); ByteUtils.writeUnsignedVarint(value, buf); buf.flip(); - assertArrayEquals(expectedEncoding, Utils.toArray(buf)); + Assertions.assertArrayEquals(expectedEncoding, Utils.toArray(buf)); assertEquals(value, ByteUtils.readUnsignedVarint(buf.duplicate())); buf.rewind(); From 4bbcaef522ec0a56252d08961bd267478701ff1a Mon Sep 17 00:00:00 2001 From: cychiu Date: Thu, 26 Mar 2026 08:38:04 +0900 Subject: [PATCH 6/9] spotlessApply # Conflicts: # core/src/main/java/kafka/server/share/SharePartitionManager.java --- .../java/org/apache/kafka/common/network/SslTransportLayer.java | 2 +- .../apache/kafka/common/protocol/DataOutputStreamWritable.java | 2 +- .../main/java/org/apache/kafka/common/protocol/types/Type.java | 2 +- .../kafka/common/record/internal/AbstractLegacyRecordBatch.java | 2 +- .../org/apache/kafka/common/record/internal/DefaultRecord.java | 2 +- .../apache/kafka/common/record/internal/DefaultRecordBatch.java | 2 +- .../org/apache/kafka/common/record/internal/LegacyRecord.java | 2 +- .../org/apache/kafka/common/utils/internals/ByteUtilsTest.java | 1 + .../src/main/java/kafka/server/share/SharePartitionManager.java | 2 +- .../test/java/kafka/server/share/SharePartitionManagerTest.java | 2 +- .../java/org/apache/kafka/raft/internals/RecordsIterator.java | 2 +- .../org/apache/kafka/raft/internals/BatchAccumulatorTest.java | 2 +- server/src/main/java/org/apache/kafka/server/FetchContext.java | 2 +- server/src/main/java/org/apache/kafka/server/FetchSession.java | 2 +- .../java/org/apache/kafka/server/FetchSessionCacheShard.java | 2 +- .../kafka/storage/internals/log/ProducerStateManager.java | 2 +- .../org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java | 2 +- .../state/internals/TimestampedToHeadersWindowStoreAdapter.java | 2 +- .../java/org/apache/kafka/streams/state/internals/Utils.java | 2 +- .../state/internals/TimeOrderedSessionStoreUpgradeTest.java | 2 +- .../state/internals/TimeOrderedWindowStoreUpgradeTest.java | 2 +- 21 files changed, 21 insertions(+), 20 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index 02ce229c407fd..a52f19469a14e 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -19,9 +19,9 @@ import org.apache.kafka.common.errors.SslAuthenticationException; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.utils.ByteBufferUnmapper; -import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.slf4j.Logger; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/DataOutputStreamWritable.java b/clients/src/main/java/org/apache/kafka/common/protocol/DataOutputStreamWritable.java index fd4f39fbc0a0a..e47e966e1a717 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/DataOutputStreamWritable.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/DataOutputStreamWritable.java @@ -17,8 +17,8 @@ package org.apache.kafka.common.protocol; -import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.utils.internals.ByteUtils; import java.io.Closeable; import java.io.DataOutputStream; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java index 2a633b5b8ee5b..b2919dc52f24d 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java @@ -19,8 +19,8 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.record.internal.BaseRecords; import org.apache.kafka.common.record.internal.MemoryRecords; -import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.utils.internals.ByteUtils; import java.nio.ByteBuffer; import java.util.Optional; diff --git a/clients/src/main/java/org/apache/kafka/common/record/internal/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/internal/AbstractLegacyRecordBatch.java index 5cdb79f4e6a22..7738f4059d8c3 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/internal/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/internal/AbstractLegacyRecordBatch.java @@ -25,9 +25,9 @@ import org.apache.kafka.common.utils.AbstractIterator; import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.utils.internals.ByteUtils; import java.io.DataOutputStream; import java.io.IOException; diff --git a/clients/src/main/java/org/apache/kafka/common/record/internal/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/internal/DefaultRecord.java index 983b993c8f820..e8241eb0b7085 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/internal/DefaultRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/internal/DefaultRecord.java @@ -20,8 +20,8 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.utils.internals.ByteUtils; import java.io.DataOutputStream; import java.io.IOException; diff --git a/clients/src/main/java/org/apache/kafka/common/record/internal/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/internal/DefaultRecordBatch.java index adc59d159366a..2b6bc7ee11f54 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/internal/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/internal/DefaultRecordBatch.java @@ -24,9 +24,9 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.internals.ByteUtils; import java.io.IOException; import java.io.InputStream; diff --git a/clients/src/main/java/org/apache/kafka/common/record/internal/LegacyRecord.java b/clients/src/main/java/org/apache/kafka/common/record/internal/LegacyRecord.java index 3442c29f63606..1f5d7d825ff53 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/internal/LegacyRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/internal/LegacyRecord.java @@ -20,9 +20,9 @@ import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Checksums; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.utils.internals.ByteUtils; import java.io.DataOutputStream; import java.io.IOException; diff --git a/clients/src/test/java/org/apache/kafka/common/utils/internals/ByteUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/internals/ByteUtilsTest.java index 916df6f565586..5b51a8106c57c 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/internals/ByteUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/internals/ByteUtilsTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.Utils; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 48c1ac3be48f0..a6311c44bd896 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -29,8 +29,8 @@ import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ShareRequestMetadata; -import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider; import org.apache.kafka.server.common.ShareVersion; import org.apache.kafka.server.partition.PartitionListener; diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 0b73d71abd4f2..d4699b5dc0999 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -48,8 +48,8 @@ import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.ShareFetchResponse; import org.apache.kafka.common.requests.ShareRequestMetadata; -import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.coordinator.group.GroupConfigManager; import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider; import org.apache.kafka.server.common.ShareVersion; diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java index 31c73926ad903..d18d6742ef65c 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java @@ -23,9 +23,9 @@ import org.apache.kafka.common.record.internal.MutableRecordBatch; import org.apache.kafka.common.record.internal.Records; import org.apache.kafka.common.utils.BufferSupplier; -import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.raft.Batch; import org.apache.kafka.raft.ControlRecord; import org.apache.kafka.server.common.serialization.RecordSerde; diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java index aad0b553ff6d5..9b1694bfbe489 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java @@ -28,9 +28,9 @@ import org.apache.kafka.common.record.internal.DefaultRecord; import org.apache.kafka.common.record.internal.MemoryRecordsBuilder; import org.apache.kafka.common.record.internal.RecordBatch; -import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.junit.jupiter.api.Test; import org.mockito.Mockito; diff --git a/server/src/main/java/org/apache/kafka/server/FetchContext.java b/server/src/main/java/org/apache/kafka/server/FetchContext.java index 85c51072fce86..dc240a01174b2 100644 --- a/server/src/main/java/org/apache/kafka/server/FetchContext.java +++ b/server/src/main/java/org/apache/kafka/server/FetchContext.java @@ -25,8 +25,8 @@ import org.apache.kafka.common.requests.FetchMetadata; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; -import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.server.FetchSession.CachedPartition; import org.apache.kafka.server.FetchSession.FetchSessionCache; diff --git a/server/src/main/java/org/apache/kafka/server/FetchSession.java b/server/src/main/java/org/apache/kafka/server/FetchSession.java index a8f90e4c10cb2..db4af175c80bb 100644 --- a/server/src/main/java/org/apache/kafka/server/FetchSession.java +++ b/server/src/main/java/org/apache/kafka/server/FetchSession.java @@ -22,8 +22,8 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; -import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.server.metrics.KafkaMetricsGroup; import java.util.ArrayList; diff --git a/server/src/main/java/org/apache/kafka/server/FetchSessionCacheShard.java b/server/src/main/java/org/apache/kafka/server/FetchSessionCacheShard.java index c59c222e887a9..fd59b0ca1a337 100644 --- a/server/src/main/java/org/apache/kafka/server/FetchSessionCacheShard.java +++ b/server/src/main/java/org/apache/kafka/server/FetchSessionCacheShard.java @@ -17,8 +17,8 @@ package org.apache.kafka.server; import org.apache.kafka.common.requests.FetchMetadata; -import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection; import org.apache.kafka.server.FetchSession.EvictableKey; import org.apache.kafka.server.FetchSession.LastUsedKey; diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java index 41069bcb95050..6f9f736b8cee2 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java @@ -20,11 +20,11 @@ import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.common.record.internal.RecordBatch; -import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Crc32C; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.server.log.remote.metadata.storage.generated.ProducerSnapshot; import org.slf4j.Logger; diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java index a6e1f5f9ca3c9..3e7fb195c7b56 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java @@ -16,8 +16,8 @@ */ package org.apache.kafka.storage.internals.log; -import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.utils.internals.ByteUtils; import java.nio.ByteBuffer; import java.security.DigestException; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java index 5ac6a640a0fe2..47eb66c581073 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java @@ -17,8 +17,8 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java index 0f7b12c32dbab..6e9ad91f8107d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java @@ -20,8 +20,8 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.StateSerdes; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java index 865f145b45e97..f254aaf8440eb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java @@ -20,8 +20,8 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.DslStoreFormat; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java index dc26920680314..7c9d79fb10713 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java @@ -19,8 +19,8 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.InternalMockProcessorContext; From 11c38095701763e89cdaa5e0b8f26513abad79c8 Mon Sep 17 00:00:00 2001 From: cychiu Date: Tue, 24 Mar 2026 15:37:24 +0900 Subject: [PATCH 7/9] fix: unnecessary change, typo --- .../kafka/common/utils/internals/ByteUtilsTest.java | 3 +-- .../utils/internals/ImplicitLinkedHashCollectionTest.java | 8 ++++---- .../internals/ImplicitLinkedHashMultiCollectionTest.java | 8 ++++---- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/common/utils/internals/ByteUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/internals/ByteUtilsTest.java index 5b51a8106c57c..e8d92362d47e9 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/internals/ByteUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/internals/ByteUtilsTest.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.Utils; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -426,7 +425,7 @@ private void assertUnsignedVarintSerde(int value, byte[] expectedEncoding) throw ByteBuffer buf = ByteBuffer.allocate(MAX_LENGTH_VARINT); ByteUtils.writeUnsignedVarint(value, buf); buf.flip(); - Assertions.assertArrayEquals(expectedEncoding, Utils.toArray(buf)); + assertArrayEquals(expectedEncoding, Utils.toArray(buf)); assertEquals(value, ByteUtils.readUnsignedVarint(buf.duplicate())); buf.rewind(); diff --git a/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollectionTest.java b/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollectionTest.java index 1dbc928e8df36..fc43fa9487870 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollectionTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollectionTest.java @@ -138,12 +138,12 @@ static void expectTraversal(Iterator iterator, Integer... sequence) int i = 0; while (iterator.hasNext()) { TestElement element = iterator.next(); - assertTrue(i < sequence.length, "Iterator yield " + (i + 1) + " elements, but only " + + assertTrue(i < sequence.length, "Iterator yielded " + (i + 1) + " elements, but only " + sequence.length + " were expected."); assertEquals(sequence[i].intValue(), element.key, "Iterator value number " + (i + 1) + " was incorrect."); i = i + 1; } - assertEquals(sequence.length, i, "Iterator yield " + (i + 1) + " elements, but " + + assertEquals(sequence.length, i, "Iterator yielded " + (i + 1) + " elements, but " + sequence.length + " were expected."); } @@ -151,13 +151,13 @@ static void expectTraversal(Iterator iter, Iterator expect int i = 0; while (iter.hasNext()) { TestElement element = iter.next(); - assertTrue(expectedIter.hasNext(), "Iterator yield " + (i + 1) + " elements, but only " + i + + assertTrue(expectedIter.hasNext(), "Iterator yielded " + (i + 1) + " elements, but only " + i + " were expected."); Integer expected = expectedIter.next(); assertEquals(expected.intValue(), element.key, "Iterator value number " + (i + 1) + " was incorrect."); i = i + 1; } - assertFalse(expectedIter.hasNext(), "Iterator yield " + i + " elements, but at least " + (i + 1) + + assertFalse(expectedIter.hasNext(), "Iterator yielded " + i + " elements, but at least " + (i + 1) + " were expected."); } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollectionTest.java b/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollectionTest.java index 287283d2d58da..b8dc2b7d5de19 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollectionTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollectionTest.java @@ -92,14 +92,14 @@ static void expectExactTraversal(Iterator iterator, TestElement... int i = 0; while (iterator.hasNext()) { TestElement element = iterator.next(); - assertTrue(i < sequence.length, "Iterator yield " + (i + 1) + " elements, but only " + + assertTrue(i < sequence.length, "Iterator yielded " + (i + 1) + " elements, but only " + sequence.length + " were expected."); if (sequence[i] != element) { fail("Iterator value number " + (i + 1) + " was incorrect."); } i = i + 1; } - assertEquals(sequence.length, i, "Iterator yield " + (i + 1) + " elements, but " + + assertEquals(sequence.length, i, "Iterator yielded " + (i + 1) + " elements, but " + sequence.length + " were expected."); } @@ -158,12 +158,12 @@ void expectTraversal(Iterator iter, Iterator expectedI while (iter.hasNext()) { TestElement element = iter.next(); assertTrue(expectedIter.hasNext(), - "Iterator yield " + (i + 1) + " elements, but only " + i + " were expected."); + "Iterator yielded " + (i + 1) + " elements, but only " + i + " were expected."); TestElement expected = expectedIter.next(); assertSame(expected, element, "Iterator value number " + (i + 1) + " was incorrect."); i = i + 1; } assertFalse(expectedIter.hasNext(), - "Iterator yield " + i + " elements, but at least " + (i + 1) + " were expected."); + "Iterator yielded " + i + " elements, but at least " + (i + 1) + " were expected."); } } From 4d1561458156561275842f72cc0b423a6e3cdd5b Mon Sep 17 00:00:00 2001 From: cychiu Date: Fri, 27 Mar 2026 10:17:05 +0900 Subject: [PATCH 8/9] Revert "fix typo" This reverts commit 8873d5560e662b00e1e6255ac86076bfe0e831e5. # Conflicts: # clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollectionTest.java # clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollectionTest.java --- .../utils/internals/ImplicitLinkedHashCollection.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollection.java b/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollection.java index a21e9cab1867e..284e1055b41f9 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollection.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollection.java @@ -594,11 +594,12 @@ public final void clear(int expectedNumElements) { // Optimize away object allocations for empty sets. this.head = HeadElement.EMPTY; this.elements = EMPTY_ELEMENTS; + this.size = 0; } else { this.head = new HeadElement(); this.elements = new Element[calculateCapacity(expectedNumElements)]; + this.size = 0; } - this.size = 0; } /** @@ -688,6 +689,8 @@ public void sort(Comparator comparator) { array.add(e); } array.sort(comparator); - this.addAll(array); + for (E e : array) { + add(e); + } } } From 86a95f304840e7c5f2660c687e4886623dd30a32 Mon Sep 17 00:00:00 2001 From: cychiu Date: Sun, 29 Mar 2026 20:46:31 +0900 Subject: [PATCH 9/9] spotlessApply This reverts commit 8873d556 --- .../state/internals/TimestampedToHeadersWindowStoreAdapter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java index 9e6c3a6e6da2e..cde8ac607d87d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java @@ -18,7 +18,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext;