From af1bb83faea0a1d76bd993e110703992517e7403 Mon Sep 17 00:00:00 2001 From: Eduwer Camacaro Date: Mon, 23 Mar 2026 13:52:40 -0500 Subject: [PATCH 1/8] Use inmemory accessor for column family tests --- .../AbstractColumnFamilyAccessorTest.java | 51 ++++++--- .../internals/InMemoryRocksDBAccessor.java | 106 ++++++++++++++++++ 2 files changed, 143 insertions(+), 14 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java index b1f1a1ec25567..47fac885bb1a3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.query.Position; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -30,16 +31,17 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; +import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrowsExactly; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; + @ExtendWith(MockitoExtension.class) abstract class AbstractColumnFamilyAccessorTest { @@ -67,41 +69,62 @@ public void setUp() { @Test public void shouldOpenClean() throws RocksDBException { - when(dbAccessor.get(offsetsCF, toBytes("status"))).thenReturn(closedValue); - + dbAccessor = new InMemoryRocksDBAccessor(); // Open the ColumnFamily accessor.open(dbAccessor, false); - verify(dbAccessor).put(eq(offsetsCF), eq(toBytes("status")), eq(openValue)); + assertArrayEquals(openValue, dbAccessor.get(offsetsCF, toBytes("status"))); // Now close the ColumnFamily accessor.close(dbAccessor); - verify(dbAccessor).put(eq(offsetsCF), eq(toBytes("status")), eq(closedValue)); + assertArrayEquals(closedValue, dbAccessor.get(offsetsCF, toBytes("status"))); + + // Open clean again + accessor.open(dbAccessor, false); + assertArrayEquals(openValue, dbAccessor.get(offsetsCF, toBytes("status"))); } @Test public void shouldThrowOnOpenAfterAUncleanClose() throws RocksDBException { - when(dbAccessor.get(offsetsCF, toBytes("status"))).thenReturn(openValue); + dbAccessor = new InMemoryRocksDBAccessor(); + // First, open clean + accessor.open(dbAccessor, false); + + // Try to open again, with ignoreUncleanClose=false, which should throw since the store is already open final ProcessorStateException thrown = assertThrowsExactly(ProcessorStateException.class, () -> accessor.open(dbAccessor, false)); assertEquals("Invalid state during store open. Expected state to be either empty or closed", thrown.getMessage()); } @Test public void shouldIgnoreExceptionAfterUncleanClose() throws RocksDBException { - when(dbAccessor.get(offsetsCF, toBytes("status"))).thenReturn(openValue); + dbAccessor = new InMemoryRocksDBAccessor(); + // First, open clean + accessor.open(dbAccessor, false); + // Now reopen in an invalid state accessor.open(dbAccessor, true); assertTrue(storeOpen.get()); - verify(dbAccessor).put(eq(offsetsCF), eq(toBytes("status")), eq(openValue)); + assertArrayEquals(openValue, dbAccessor.get(offsetsCF, toBytes("status"))); } @Test public void shouldCommitOffsets() throws RocksDBException { + dbAccessor = new InMemoryRocksDBAccessor(); final TopicPartition tp0 = new TopicPartition("testTopic", 0); final TopicPartition tp1 = new TopicPartition("testTopic", 1); final Map changelogOffsets = Map.of(tp0, 10L, tp1, 20L); accessor.commit(dbAccessor, changelogOffsets); - verify(dbAccessor).flush(any(ColumnFamilyHandle[].class)); - verify(dbAccessor).put(eq(offsetsCF), eq(toBytes(tp0.toString())), eq(toBytes(10L))); - verify(dbAccessor).put(eq(offsetsCF), eq(toBytes(tp1.toString())), eq(toBytes(20L))); + assertArrayEquals(toBytes(10L), dbAccessor.get(offsetsCF, toBytes(tp0.toString()))); + assertArrayEquals(toBytes(20L), dbAccessor.get(offsetsCF, toBytes(tp1.toString()))); + } + + @Test + public void shouldCommitPosition() throws RocksDBException { + dbAccessor = new InMemoryRocksDBAccessor(); + final String topic = "testTopic"; + final TopicPartition tp0 = new TopicPartition("testTopic", 0); + final TopicPartition tp1 = new TopicPartition("testTopic", 1); + final Position positionToStore = Position.fromMap(mkMap(mkEntry(topic, mkMap(mkEntry(tp0.partition(), 10L), mkEntry(tp1.partition(), 20L))))); + accessor.commit(dbAccessor, positionToStore); + assertEquals(positionToStore, PositionSerde.deserialize(ByteBuffer.wrap(dbAccessor.get(offsetsCF, toBytes("position"))))); } private byte[] toBytes(final String s) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java new file mode 100644 index 0000000000000..79cd7b714054f --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksIterator; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.IdentityHashMap; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +/** + * An in-memory implementation of {@link RocksDBStore.DBAccessor} intended for use in tests, + * eliminating the need for mocks. Data is kept in a {@link NavigableMap} per + * {@link ColumnFamilyHandle} (identified by object identity), providing full key-ordering semantics + * equivalent to RocksDB's default comparator. + */ +public class InMemoryRocksDBAccessor implements RocksDBStore.DBAccessor { + + /** + * Unsigned lexicographic byte-array comparator — matches RocksDB's default comparator. + */ + private static final Comparator BYTES_COMPARATOR = Arrays::compare; + + /** + * Per-CF stores, keyed by {@link ColumnFamilyHandle} object identity. + */ + private final Map> stores = new IdentityHashMap<>(); + + private NavigableMap storeFor(final ColumnFamilyHandle columnFamily) { + return stores.computeIfAbsent(columnFamily, cf -> new TreeMap<>(BYTES_COMPARATOR)); + } + + @Override + public byte[] get(final ColumnFamilyHandle columnFamily, final byte[] key) { + return storeFor(columnFamily).get(key); + } + + @Override + public byte[] get(final ColumnFamilyHandle columnFamily, final ReadOptions readOptions, final byte[] key) { + // ReadOptions (snapshots, fill-cache, etc.) are not meaningful in-memory; delegate to plain get. + return get(columnFamily, key); + } + + @Override + public RocksIterator newIterator(final ColumnFamilyHandle columnFamily) { + throw new UnsupportedOperationException("newIterator not supported in-memory"); + } + + @Override + public void put(final ColumnFamilyHandle columnFamily, final byte[] key, final byte[] value) { + if (value == null) { + delete(columnFamily, key); + } else { + storeFor(columnFamily).put(key, value); + } + } + + @Override + public void delete(final ColumnFamilyHandle columnFamily, final byte[] key) { + storeFor(columnFamily).remove(key); + } + + @Override + public void deleteRange(final ColumnFamilyHandle columnFamily, final byte[] from, final byte[] to) { + throw new UnsupportedOperationException("deleteRange not supported in-memory"); + } + + @Override + public long approximateNumEntries(final ColumnFamilyHandle columnFamily) { + return storeFor(columnFamily).size(); + } + + @Override + public void flush(final ColumnFamilyHandle... columnFamilies) { + // No-op: in-memory writes are immediately durable. + } + + @Override + public void reset() { + stores.clear(); + } + + @Override + public void close() { + // No native resources to release. + } +} From 375b10345171171f95e083473c561d305ebc27f2 Mon Sep 17 00:00:00 2001 From: Eduwer Camacaro Date: Mon, 23 Mar 2026 15:32:37 -0500 Subject: [PATCH 2/8] Add unit tests --- .../AbstractColumnFamilyAccessorTest.java | 34 +++-- .../internals/InMemoryRocksDBAccessor.java | 143 +++++++++++++++++- 2 files changed, 167 insertions(+), 10 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java index 47fac885bb1a3..fc4fc23c305bb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java @@ -29,6 +29,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import java.nio.ByteBuffer; @@ -39,8 +40,10 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrowsExactly; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; @ExtendWith(MockitoExtension.class) @@ -69,7 +72,7 @@ public void setUp() { @Test public void shouldOpenClean() throws RocksDBException { - dbAccessor = new InMemoryRocksDBAccessor(); + dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class)); // Open the ColumnFamily accessor.open(dbAccessor, false); assertArrayEquals(openValue, dbAccessor.get(offsetsCF, toBytes("status"))); @@ -85,7 +88,7 @@ public void shouldOpenClean() throws RocksDBException { @Test public void shouldThrowOnOpenAfterAUncleanClose() throws RocksDBException { - dbAccessor = new InMemoryRocksDBAccessor(); + dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class)); // First, open clean accessor.open(dbAccessor, false); @@ -96,7 +99,7 @@ public void shouldThrowOnOpenAfterAUncleanClose() throws RocksDBException { @Test public void shouldIgnoreExceptionAfterUncleanClose() throws RocksDBException { - dbAccessor = new InMemoryRocksDBAccessor(); + dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class)); // First, open clean accessor.open(dbAccessor, false); // Now reopen in an invalid state @@ -107,26 +110,39 @@ public void shouldIgnoreExceptionAfterUncleanClose() throws RocksDBException { @Test public void shouldCommitOffsets() throws RocksDBException { - dbAccessor = new InMemoryRocksDBAccessor(); + dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class)); final TopicPartition tp0 = new TopicPartition("testTopic", 0); final TopicPartition tp1 = new TopicPartition("testTopic", 1); final Map changelogOffsets = Map.of(tp0, 10L, tp1, 20L); accessor.commit(dbAccessor, changelogOffsets); - assertArrayEquals(toBytes(10L), dbAccessor.get(offsetsCF, toBytes(tp0.toString()))); - assertArrayEquals(toBytes(20L), dbAccessor.get(offsetsCF, toBytes(tp1.toString()))); + assertEquals(10L, accessor.getCommittedOffset(dbAccessor, tp0)); + assertEquals(20L, accessor.getCommittedOffset(dbAccessor, tp1)); } @Test public void shouldCommitPosition() throws RocksDBException { - dbAccessor = new InMemoryRocksDBAccessor(); + dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class)); final String topic = "testTopic"; - final TopicPartition tp0 = new TopicPartition("testTopic", 0); - final TopicPartition tp1 = new TopicPartition("testTopic", 1); + final TopicPartition tp0 = new TopicPartition(topic, 0); + final TopicPartition tp1 = new TopicPartition(topic, 1); final Position positionToStore = Position.fromMap(mkMap(mkEntry(topic, mkMap(mkEntry(tp0.partition(), 10L), mkEntry(tp1.partition(), 20L))))); accessor.commit(dbAccessor, positionToStore); assertEquals(positionToStore, PositionSerde.deserialize(ByteBuffer.wrap(dbAccessor.get(offsetsCF, toBytes("position"))))); } + @Test + public void shouldWipeCommitedOffsetsOnEmptyCommit() throws RocksDBException { + dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class)); + final TopicPartition tp0 = new TopicPartition("testTopic", 0); + final TopicPartition tp1 = new TopicPartition("testTopic", 1); + accessor.commit(dbAccessor, Map.of(tp0, 10L, tp1, 20L)); + assertEquals(10L, accessor.getCommittedOffset(dbAccessor, tp0)); + assertEquals(20L, accessor.getCommittedOffset(dbAccessor, tp1)); + accessor.commit(dbAccessor, Map.of()); + assertNull(accessor.getCommittedOffset(dbAccessor, tp0)); + assertNull(accessor.getCommittedOffset(dbAccessor, tp1)); + } + private byte[] toBytes(final String s) { return keySerializer.serialize("", s); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java index 79cd7b714054f..8d728d228bd5a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java @@ -18,8 +18,11 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Comparator; import java.util.IdentityHashMap; @@ -40,10 +43,16 @@ public class InMemoryRocksDBAccessor implements RocksDBStore.DBAccessor { */ private static final Comparator BYTES_COMPARATOR = Arrays::compare; + /** * Per-CF stores, keyed by {@link ColumnFamilyHandle} object identity. */ private final Map> stores = new IdentityHashMap<>(); + private final RocksDB rocksDB; + + public InMemoryRocksDBAccessor(final RocksDB rocksDB) { + this.rocksDB = rocksDB; + } private NavigableMap storeFor(final ColumnFamilyHandle columnFamily) { return stores.computeIfAbsent(columnFamily, cf -> new TreeMap<>(BYTES_COMPARATOR)); @@ -62,7 +71,7 @@ public byte[] get(final ColumnFamilyHandle columnFamily, final ReadOptions readO @Override public RocksIterator newIterator(final ColumnFamilyHandle columnFamily) { - throw new UnsupportedOperationException("newIterator not supported in-memory"); + return new InMemoryRocksIterator(rocksDB, storeFor(columnFamily)); } @Override @@ -103,4 +112,136 @@ public void reset() { public void close() { // No native resources to release. } + + /** + * In-memory iterator backed by a navigable map. + */ + private static class InMemoryRocksIterator extends RocksIterator { + private final NavigableMap data; + private byte[] currentKey; + private boolean valid; + + InMemoryRocksIterator(final RocksDB rocksDB, final NavigableMap data) { + super(rocksDB, 0L); + this.data = data; + this.currentKey = null; + this.valid = false; + } + + @Override + protected void disposeInternal() { + // No native resources to release. + } + + @Override + public boolean isValid() { + return valid && currentKey != null && data.containsKey(currentKey); + } + + @Override + public void seekToFirst() { + if (data.isEmpty()) { + invalidate(); + } else { + currentKey = data.firstKey(); + valid = true; + } + } + + @Override + public void seekToLast() { + if (data.isEmpty()) { + invalidate(); + } else { + currentKey = data.lastKey(); + valid = true; + } + } + + @Override + public void seek(final byte[] target) { + final Map.Entry entry = data.ceilingEntry(target); + if (entry == null) { + invalidate(); + } else { + currentKey = entry.getKey(); + valid = true; + } + } + + @Override + public void seekForPrev(final byte[] target) { + final Map.Entry entry = data.floorEntry(target); + if (entry == null) { + invalidate(); + } else { + currentKey = entry.getKey(); + valid = true; + } + } + + @Override + public void seek(final ByteBuffer target) { + final ByteBuffer duplicate = target.duplicate(); + final byte[] bytes = new byte[duplicate.remaining()]; + duplicate.get(bytes); + seek(bytes); + } + + @Override + public void seekForPrev(final ByteBuffer target) { + final ByteBuffer duplicate = target.duplicate(); + final byte[] bytes = new byte[duplicate.remaining()]; + duplicate.get(bytes); + seekForPrev(bytes); + } + + @Override + public void next() { + final Map.Entry entry = data.higherEntry(currentKey); + if (entry == null) { + invalidate(); + } else { + currentKey = entry.getKey(); + } + } + + @Override + public void prev() { + if (!isValid()) { + return; + } + final Map.Entry entry = data.lowerEntry(currentKey); + if (entry == null) { + invalidate(); + } else { + currentKey = entry.getKey(); + } + } + + @Override + public byte[] key() { + return currentKey; + } + + @Override + public byte[] value() { + return isValid() ? data.get(currentKey) : null; + } + + @Override + public void status() throws RocksDBException { + // In-memory iterator never enters an error state. + } + + @Override + public void close() { + invalidate(); + } + + private void invalidate() { + valid = false; + currentKey = null; + } + } } From 8e1d425dd7e88040b22411faf4f4c2a65fe4c55a Mon Sep 17 00:00:00 2001 From: Eduwer Camacaro Date: Tue, 24 Mar 2026 10:57:17 -0500 Subject: [PATCH 3/8] Add unit test for dual schema segmented stores --- ...lSchemaRocksDBSegmentedBytesStoreTest.java | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java index 3b2265959fe05..861ed349ed232 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java @@ -1626,7 +1626,7 @@ public void shouldMeasureExpiredRecords() { } @Test - public void shouldLoadPositionFromFile() { + public void shouldMigrateExistingPositionFromFile() { final Position position = Position.fromMap(mkMap(mkEntry("topic", mkMap(mkEntry(0, 1L))))); final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new File(stateDir, storeName + ".position")); StoreQueryUtils.checkpointPosition(positionCheckpoint, position); @@ -1639,6 +1639,43 @@ public void shouldLoadPositionFromFile() { bytesStore.close(); } + @Test + public void shouldRestoreMergedPositionFromMultipleSegmentsAfterRestart() { + final AbstractDualSchemaRocksDBSegmentedBytesStore bytesStore = getBytesStore(); + // 0 segments initially. + bytesStore.init(context, bytesStore); + + // Writes record to different partitions + context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "t1", new RecordHeaders())); + bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), serializeValue(10)); + context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "t1", new RecordHeaders())); + bytesStore.put(serializeKey(new Windowed<>("a", windows[1])), serializeValue(10)); + context.setRecordContext(new ProcessorRecordContext(0, 1, 1, "t1", new RecordHeaders())); + bytesStore.put(serializeKey(new Windowed<>("a", windows[2])), serializeValue(10)); + context.setRecordContext(new ProcessorRecordContext(0, 3, 1, "t1", new RecordHeaders())); + bytesStore.put(serializeKey(new Windowed<>("a", windows[3])), serializeValue(10)); + final Position expected = Position.fromMap(mkMap(mkEntry("t1", mkMap(mkEntry(0, 2L), mkEntry(1, 3L))))); + + // Each open segment should share the same position. + for (final KeyValueSegment segment : bytesStore.getSegments()) { + assertEquals(expected, segment.getPosition()); + } + + // Persist the merged position and simulate a full store restart. + bytesStore.commit(Map.of()); + bytesStore.segments.writePosition(); + bytesStore.close(); + bytesStore.init(context, bytesStore); + + // The store-level position should be restored from the merged position. + assertEquals(expected, bytesStore.getPosition()); + + // Restored segments should all have the same merged position. + for (final KeyValueSegment segment : bytesStore.getSegments()) { + assertEquals(expected, segment.getPosition()); + } + } + private Set segmentDirs() { final File windowDir = new File(stateDir, storeName); From 7a8c80197f01635cd62c3c8e7bf9e758af70df15 Mon Sep 17 00:00:00 2001 From: Eduwer Camacaro Date: Tue, 24 Mar 2026 11:05:08 -0500 Subject: [PATCH 4/8] Add unit tests for segmented bytes store --- .../AbstractRocksDBSegmentedBytesStore.java | 2 + ...bstractRocksDBSegmentedBytesStoreTest.java | 43 ++++++++++++++++++- 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index c6b57411067c2..d200d03f7cb2a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -350,6 +350,8 @@ List getSegments() { return segments.allSegments(false); } + + // Visible for testing void restoreAllInternal(final Collection> records) { synchronized (position) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java index 5206f48ad407f..201db4bebe3d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java @@ -712,7 +712,7 @@ public void shouldNotThrowWhenRestoringOnMissingHeaders(final SegmentedBytesStor @ParameterizedTest @MethodSource("getKeySchemas") - public void shouldLoadPositionFromFile(final SegmentedBytesStore.KeySchema schema) { + public void shouldMigrateExistingPositionFromFile(final SegmentedBytesStore.KeySchema schema) { before(schema); final Position position = Position.fromMap(mkMap(mkEntry("topic", mkMap(mkEntry(0, 1L))))); final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new File(context.stateDir(), storeName + ".position")); @@ -725,6 +725,47 @@ public void shouldLoadPositionFromFile(final SegmentedBytesStore.KeySchema schem assertEquals(position, bytesStore.getPosition()); } + @ParameterizedTest + @MethodSource("getKeySchemas") + public void shouldRestoreMergedPositionFromMultipleSegmentsAfterRestart(final SegmentedBytesStore.KeySchema schema) { + before(schema); + bytesStore = getBytesStore(); + // 0 segments initially. + bytesStore.init(context, bytesStore); + + // Writes record to different partitions + context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "t1", new RecordHeaders())); + bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), serializeValue(10)); + context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "t1", new RecordHeaders())); + bytesStore.put(serializeKey(new Windowed<>("a", windows[1])), serializeValue(10)); + context.setRecordContext(new ProcessorRecordContext(0, 1, 1, "t1", new RecordHeaders())); + bytesStore.put(serializeKey(new Windowed<>("a", windows[2])), serializeValue(10)); + context.setRecordContext(new ProcessorRecordContext(0, 3, 1, "t1", new RecordHeaders())); + bytesStore.put(serializeKey(new Windowed<>("a", windows[3])), serializeValue(10)); + final Position expected = Position.fromMap(mkMap(mkEntry("t1", mkMap(mkEntry(0, 2L), mkEntry(1, 3L))))); + + // Each open segment should share the same position. + for (final S segment : bytesStore.getSegments()) { + assertEquals(expected, segment.getPosition()); + } + + // Persist the merged position and simulate a full store restart. + bytesStore.commit(Map.of()); + for (final S segment : bytesStore.getSegments()) { + segment.writePosition(); + } + bytesStore.close(); + bytesStore.init(context, bytesStore); + + // The store-level position should be restored from the merged position. + assertEquals(expected, bytesStore.getPosition()); + + // Restored segments should all have the same merged position. + for (final S segment : bytesStore.getSegments()) { + assertEquals(expected, segment.getPosition()); + } + } + private List> getChangelogRecords() { final List> records = new ArrayList<>(); final Headers headers = new RecordHeaders(); From ef4627639eb1f60f1479e0d064b4681b40defb15 Mon Sep 17 00:00:00 2001 From: Eduwer Camacaro Date: Tue, 24 Mar 2026 11:18:07 -0500 Subject: [PATCH 5/8] Rename test names --- .../apache/kafka/streams/state/internals/RocksDBStoreTest.java | 2 +- .../streams/state/internals/RocksDBVersionedStoreTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 35d152bd07f4d..1a6a83848d782 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -1255,7 +1255,7 @@ public void shouldNotThrowWhenRestoringOnMissingHeaders() { } @Test - public void shouldLoadPositionFromFile() { + public void shouldMigrateExistingPositionFromFile() { final Position position = Position.fromMap(mkMap(mkEntry("topic", mkMap(mkEntry(0, 1L))))); final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new File(context.stateDir(), rocksDBStore.name + ".position")); StoreQueryUtils.checkpointPosition(positionCheckpoint, position); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java index f843a38455aff..340ca7eb21642 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java @@ -834,7 +834,7 @@ public void shouldAllowZeroHistoryRetention() { } @Test - public void shouldLoadPositionFromFile() { + public void shouldMigrateExistingPositionFromFile() { final Position position = Position.fromMap(mkMap(mkEntry("topic", mkMap(mkEntry(0, 1L))))); final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new File(context.stateDir(), store.name() + ".position")); StoreQueryUtils.checkpointPosition(positionCheckpoint, position); From 3d958978ea8a71542f008d1f8b74f628ab03dfb3 Mon Sep 17 00:00:00 2001 From: Eduwer Camacaro Date: Tue, 24 Mar 2026 11:22:32 -0500 Subject: [PATCH 6/8] rollback changes in segmented bytes store --- .../state/internals/AbstractRocksDBSegmentedBytesStore.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index d200d03f7cb2a..c6b57411067c2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -350,8 +350,6 @@ List getSegments() { return segments.allSegments(false); } - - // Visible for testing void restoreAllInternal(final Collection> records) { synchronized (position) { From 1321cd873a5988038143732397de78aabb9ba0d3 Mon Sep 17 00:00:00 2001 From: Eduwer Camacaro Date: Mon, 30 Mar 2026 15:19:38 -0500 Subject: [PATCH 7/8] Update streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java Co-authored-by: Bill Bejeck --- .../state/internals/AbstractColumnFamilyAccessorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java index fc4fc23c305bb..786ef11266df6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java @@ -131,7 +131,7 @@ public void shouldCommitPosition() throws RocksDBException { } @Test - public void shouldWipeCommitedOffsetsOnEmptyCommit() throws RocksDBException { + public void shouldWipeCommittedOffsetsOnEmptyCommit() throws RocksDBException { dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class)); final TopicPartition tp0 = new TopicPartition("testTopic", 0); final TopicPartition tp1 = new TopicPartition("testTopic", 1); From b2f0ee279595ad063f54c118ccd83b22f0336ae6 Mon Sep 17 00:00:00 2001 From: Eduwer Camacaro Date: Mon, 30 Mar 2026 15:34:10 -0500 Subject: [PATCH 8/8] small suggestions --- .../state/internals/AbstractColumnFamilyAccessorTest.java | 1 - .../kafka/streams/state/internals/InMemoryRocksDBAccessor.java | 1 - 2 files changed, 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java index 786ef11266df6..b06ee50874661 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java @@ -45,7 +45,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; - @ExtendWith(MockitoExtension.class) abstract class AbstractColumnFamilyAccessorTest { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java index 8d728d228bd5a..6e25c97c881da 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java @@ -43,7 +43,6 @@ public class InMemoryRocksDBAccessor implements RocksDBStore.DBAccessor { */ private static final Comparator BYTES_COMPARATOR = Arrays::compare; - /** * Per-CF stores, keyed by {@link ColumnFamilyHandle} object identity. */