Skip to content

KAFKA-20297: move ImplicitLinkedHash, ByteUtils from utils to internals#21856

Open
cychiu8 wants to merge 10 commits intoapache:trunkfrom
cychiu8:20297-move-to-internals
Open

KAFKA-20297: move ImplicitLinkedHash, ByteUtils from utils to internals#21856
cychiu8 wants to merge 10 commits intoapache:trunkfrom
cychiu8:20297-move-to-internals

Conversation

@cychiu8
Copy link
Copy Markdown
Contributor

@cychiu8 cychiu8 commented Mar 24, 2026

Summary: Move ImplicitLinkedHash*, ByteUtils from common.utils to
common.utils.internals.

Reviewers: Chia-Ping Tsai
chia7712@gmail.com

@github-actions github-actions bot added triage PRs from the community streams core Kafka Broker performance kraft storage Pull requests that target the storage module KIP-932 Queues for Kafka build Gradle build or GitHub Actions generator RPC and Record code generator clients labels Mar 24, 2026
ByteUtils.writeUnsignedVarint(value, buf);
buf.flip();
assertArrayEquals(expectedEncoding, Utils.toArray(buf));
Assertions.assertArrayEquals(expectedEncoding, Utils.toArray(buf));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change

import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.Utils;

import org.junit.jupiter.api.Assertions;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@github-actions github-actions bot removed the triage PRs from the community label Mar 24, 2026
while (iterator.hasNext()) {
TestElement element = iterator.next();
assertTrue(i < sequence.length, "Iterator yieled " + (i + 1) + " elements, but only " +
assertTrue(i < sequence.length, "Iterator yield " + (i + 1) + " elements, but only " +
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yielded?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for the feedbacks!

@cychiu8 cychiu8 force-pushed the 20297-move-to-internals branch from 0607e88 to 11c3809 Compare March 25, 2026 23:44
this.elements = new Element[calculateCapacity(expectedNumElements)];
this.size = 0;
}
this.size = 0;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we keep the original implementation? Those changes are out of scope for this PR :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course! thank you for the feedback!!

This reverts commit 8873d55.

# Conflicts:
#	clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollectionTest.java
#	clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollectionTest.java
Copilot AI review requested due to automatic review settings March 27, 2026 01:17
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR relocates low-level utility types (ByteUtils, ImplicitLinkedHash*) from org.apache.kafka.common.utils into org.apache.kafka.common.utils.internals, and updates Kafka’s internal modules, tests, benchmarks, and code generator to use the new internal package.

Changes:

  • Move ByteUtils and ImplicitLinkedHash{Collection,MultiCollection} into org.apache.kafka.common.utils.internals and update usages across modules.
  • Remove BytesUtils and consolidate its increment/comparator functionality into internal ByteUtils.
  • Update codegen constants and SpotBugs exclusions to reflect the new class names/packages.

Reviewed changes

Copilot reviewed 76 out of 76 changed files in this pull request and generated no comments.

Show a summary per file
File Description
streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java Update import to internal ByteUtils.
streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java Update import to internal ByteUtils.
streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java Update import to internal ByteUtils.
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java Replace BytesUtils usage with internal ByteUtils.
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java Replace BytesUtils usage with internal ByteUtils.
streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java Update import to internal ByteUtils.
streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java Update import to internal ByteUtils.
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java Update import to internal ByteUtils.
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java Update import to internal ByteUtils.
streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java Update import to internal ByteUtils.
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java Update import to internal ByteUtils.
streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java Switch comparator usage from BytesUtils to internal ByteUtils.
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java Switch increment usage and javadoc links to internal ByteUtils.
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java Switch comparator usage to internal ByteUtils.
streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java Update import to internal ByteUtils.
streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java Switch increment usage to internal ByteUtils.
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java Switch increment usage to internal ByteUtils.
streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java Update import to internal ByteUtils.
streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java Update import to internal ByteUtils.
streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java Switch comparator usage to internal ByteUtils.
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java Switch increment usage to internal ByteUtils.
streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java Update import to internal ByteUtils.
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java Switch range upper bound increment to internal ByteUtils.
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java Update import to internal ByteUtils.
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java Update import to internal ByteUtils.
storage/src/main/java/org/apache/kafka/storage/internals/log/SkimpyOffsetMap.java Update import to internal ByteUtils.
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java Update import to internal ByteUtils.
server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java Update import to internal ImplicitLinkedHashCollection.
server/src/test/java/org/apache/kafka/server/FetchSessionTest.java Update import to internal ImplicitLinkedHashCollection.
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java Update import to internal ImplicitLinkedHashCollection.
server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java Update import to internal ImplicitLinkedHashCollection.
server/src/main/java/org/apache/kafka/server/share/CachedSharePartition.java Update import to internal ImplicitLinkedHashCollection.
server/src/main/java/org/apache/kafka/server/FetchSessionCacheShard.java Update import to internal ImplicitLinkedHashCollection.
server/src/main/java/org/apache/kafka/server/FetchSession.java Update import to internal ImplicitLinkedHashCollection.
server/src/main/java/org/apache/kafka/server/FetchContext.java Update import to internal ImplicitLinkedHashCollection.
server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java Update import to internal ByteUtils.
raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java Update import to internal ByteUtils.
raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java Update import to internal ByteUtils.
raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java Update import to internal ByteUtils.
metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java Update import to internal ImplicitLinkedHashCollection.
metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java Update import to internal ByteUtils.
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java Switch comparator interface and constant to internal ByteUtils.
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java Update import to internal ByteUtils.
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java Update import to internal ByteUtils.
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ImplicitLinkedHashCollectionBenchmark.java Update import to internal ImplicitLinkedHashCollection.
gradle/spotbugs-exclude.xml Update exclusion to ByteUtils$LexicographicByteArrayComparator.
generator/src/main/java/org/apache/kafka/message/MessageGenerator.java Update generated FQCN constants to internal packages.
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala Update Scala imports to internal ImplicitLinkedHashCollection.
core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala Update Scala import to internal ByteUtils.
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java Update import to internal ImplicitLinkedHashCollection.
core/src/main/java/kafka/server/share/SharePartitionManager.java Update import to internal ImplicitLinkedHashCollection.
clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollectionTest.java Move test package to internals; fix “yieled” typo to “yielded”.
clients/src/test/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollectionTest.java Move test package to internals; fix “yieled” typo to “yielded”.
clients/src/test/java/org/apache/kafka/common/utils/internals/ByteUtilsTest.java Move test package to internals; add imports needed after package move.
clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java Switch to internal ByteUtils for increment/comparator usage.
clients/src/test/java/org/apache/kafka/common/record/internal/EndTransactionMarkerTest.java Update import to internal ByteUtils.
clients/src/test/java/org/apache/kafka/common/record/internal/DefaultRecordTest.java Update import to internal ByteUtils.
clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java Update import to internal ByteUtils.
clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java Update import to internal ByteUtils.
clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollection.java Change package to internals; update internal reference in javadoc text.
clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollection.java Change package to internals.
clients/src/main/java/org/apache/kafka/common/utils/internals/BytesUtils.java Remove class; functionality is migrated elsewhere.
clients/src/main/java/org/apache/kafka/common/utils/internals/ByteUtils.java Change package to internals; incorporate increment/comparator utilities.
clients/src/main/java/org/apache/kafka/common/utils/Bytes.java Update internal import/reference to internals.ByteUtils for comparator/increment delegation.
clients/src/main/java/org/apache/kafka/common/record/internal/LegacyRecord.java Update import to internal ByteUtils.
clients/src/main/java/org/apache/kafka/common/record/internal/DefaultRecordBatch.java Update import to internal ByteUtils.
clients/src/main/java/org/apache/kafka/common/record/internal/DefaultRecord.java Update import to internal ByteUtils.
clients/src/main/java/org/apache/kafka/common/record/internal/AbstractLegacyRecordBatch.java Update import to internal ByteUtils.
clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java Update import to internal ByteUtils.
clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java Update import to internal ByteUtils.
clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java Update import to internal ByteUtils.
clients/src/main/java/org/apache/kafka/common/protocol/SendBuilder.java Update import to internal ByteUtils.
clients/src/main/java/org/apache/kafka/common/protocol/DataOutputStreamWritable.java Update import to internal ByteUtils.
clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java Update import to internal ByteUtils.
clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java Update import to internal ByteUtils.
clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java Update import to internal ByteUtils.
Comments suppressed due to low confidence (3)

clients/src/main/java/org/apache/kafka/common/utils/internals/ByteUtils.java:20

  • Changing ByteUtils' package from org.apache.kafka.common.utils to org.apache.kafka.common.utils.internals is a source/binary incompatible change for any downstream code depending on the public FQCN. Since this repo is on 4.4.0-SNAPSHOT, consider keeping a deprecated forwarding class in the old package (delegating to the new internals implementation) to preserve compatibility within the 4.x line, or otherwise explicitly handle the migration strategy.
    clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashCollection.java:20
  • Moving ImplicitLinkedHashCollection into the .internals package is a breaking change for any external code referencing org.apache.kafka.common.utils.ImplicitLinkedHashCollection. To avoid unexpected source/binary incompatibility in 4.x, consider adding a deprecated wrapper/forwarder type in the old package that delegates to (or extends) the new internal implementation.
    clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollection.java:20
  • Moving ImplicitLinkedHashMultiCollection into the .internals package is a breaking change for any external code referencing org.apache.kafka.common.utils.ImplicitLinkedHashMultiCollection. If compatibility within the 4.x line is required, consider providing a deprecated wrapper/forwarder in the old package to preserve the original FQCN.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@chia7712
Copy link
Copy Markdown
Member

@cychiu8 please fix following build error

> The following files had format violations:
      src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
          @@ -18,7 +18,6 @@
           
           import·org.apache.kafka.common.TopicPartition;
           import·org.apache.kafka.common.utils.Bytes;
          -import·org.apache.kafka.common.utils.internals.ByteUtils;
           import·org.apache.kafka.streams.kstream.Windowed;
           import·org.apache.kafka.streams.processor.StateStore;
           import·org.apache.kafka.streams.processor.StateStoreContext;
  Run './gradlew spotlessApply' to fix all violations.

This reverts commit 8873d55
@cychiu8
Copy link
Copy Markdown
Contributor Author

cychiu8 commented Mar 30, 2026

@chia7712 thank you for noticing me the build error! I have updated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

build Gradle build or GitHub Actions ci-approved clients core Kafka Broker generator RPC and Record code generator KIP-932 Queues for Kafka kraft performance storage Pull requests that target the storage module streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants