diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java index b1f1a1ec25567..b06ee50874661 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.query.Position; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -28,18 +29,21 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; +import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrowsExactly; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mock; @ExtendWith(MockitoExtension.class) abstract class AbstractColumnFamilyAccessorTest { @@ -67,41 +71,75 @@ public void setUp() { @Test public void shouldOpenClean() throws RocksDBException { - when(dbAccessor.get(offsetsCF, toBytes("status"))).thenReturn(closedValue); - + dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class)); // Open the ColumnFamily accessor.open(dbAccessor, false); - verify(dbAccessor).put(eq(offsetsCF), eq(toBytes("status")), eq(openValue)); + assertArrayEquals(openValue, dbAccessor.get(offsetsCF, toBytes("status"))); // Now close the ColumnFamily accessor.close(dbAccessor); - verify(dbAccessor).put(eq(offsetsCF), eq(toBytes("status")), eq(closedValue)); + assertArrayEquals(closedValue, dbAccessor.get(offsetsCF, toBytes("status"))); + + // Open clean again + accessor.open(dbAccessor, false); + assertArrayEquals(openValue, dbAccessor.get(offsetsCF, toBytes("status"))); } @Test public void shouldThrowOnOpenAfterAUncleanClose() throws RocksDBException { - when(dbAccessor.get(offsetsCF, toBytes("status"))).thenReturn(openValue); + dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class)); + // First, open clean + accessor.open(dbAccessor, false); + + // Try to open again, with ignoreUncleanClose=false, which should throw since the store is already open final ProcessorStateException thrown = assertThrowsExactly(ProcessorStateException.class, () -> accessor.open(dbAccessor, false)); assertEquals("Invalid state during store open. Expected state to be either empty or closed", thrown.getMessage()); } @Test public void shouldIgnoreExceptionAfterUncleanClose() throws RocksDBException { - when(dbAccessor.get(offsetsCF, toBytes("status"))).thenReturn(openValue); + dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class)); + // First, open clean + accessor.open(dbAccessor, false); + // Now reopen in an invalid state accessor.open(dbAccessor, true); assertTrue(storeOpen.get()); - verify(dbAccessor).put(eq(offsetsCF), eq(toBytes("status")), eq(openValue)); + assertArrayEquals(openValue, dbAccessor.get(offsetsCF, toBytes("status"))); } @Test public void shouldCommitOffsets() throws RocksDBException { + dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class)); final TopicPartition tp0 = new TopicPartition("testTopic", 0); final TopicPartition tp1 = new TopicPartition("testTopic", 1); final Map changelogOffsets = Map.of(tp0, 10L, tp1, 20L); accessor.commit(dbAccessor, changelogOffsets); - verify(dbAccessor).flush(any(ColumnFamilyHandle[].class)); - verify(dbAccessor).put(eq(offsetsCF), eq(toBytes(tp0.toString())), eq(toBytes(10L))); - verify(dbAccessor).put(eq(offsetsCF), eq(toBytes(tp1.toString())), eq(toBytes(20L))); + assertEquals(10L, accessor.getCommittedOffset(dbAccessor, tp0)); + assertEquals(20L, accessor.getCommittedOffset(dbAccessor, tp1)); + } + + @Test + public void shouldCommitPosition() throws RocksDBException { + dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class)); + final String topic = "testTopic"; + final TopicPartition tp0 = new TopicPartition(topic, 0); + final TopicPartition tp1 = new TopicPartition(topic, 1); + final Position positionToStore = Position.fromMap(mkMap(mkEntry(topic, mkMap(mkEntry(tp0.partition(), 10L), mkEntry(tp1.partition(), 20L))))); + accessor.commit(dbAccessor, positionToStore); + assertEquals(positionToStore, PositionSerde.deserialize(ByteBuffer.wrap(dbAccessor.get(offsetsCF, toBytes("position"))))); + } + + @Test + public void shouldWipeCommittedOffsetsOnEmptyCommit() throws RocksDBException { + dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class)); + final TopicPartition tp0 = new TopicPartition("testTopic", 0); + final TopicPartition tp1 = new TopicPartition("testTopic", 1); + accessor.commit(dbAccessor, Map.of(tp0, 10L, tp1, 20L)); + assertEquals(10L, accessor.getCommittedOffset(dbAccessor, tp0)); + assertEquals(20L, accessor.getCommittedOffset(dbAccessor, tp1)); + accessor.commit(dbAccessor, Map.of()); + assertNull(accessor.getCommittedOffset(dbAccessor, tp0)); + assertNull(accessor.getCommittedOffset(dbAccessor, tp1)); } private byte[] toBytes(final String s) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java index 3b2265959fe05..861ed349ed232 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java @@ -1626,7 +1626,7 @@ public void shouldMeasureExpiredRecords() { } @Test - public void shouldLoadPositionFromFile() { + public void shouldMigrateExistingPositionFromFile() { final Position position = Position.fromMap(mkMap(mkEntry("topic", mkMap(mkEntry(0, 1L))))); final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new File(stateDir, storeName + ".position")); StoreQueryUtils.checkpointPosition(positionCheckpoint, position); @@ -1639,6 +1639,43 @@ public void shouldLoadPositionFromFile() { bytesStore.close(); } + @Test + public void shouldRestoreMergedPositionFromMultipleSegmentsAfterRestart() { + final AbstractDualSchemaRocksDBSegmentedBytesStore bytesStore = getBytesStore(); + // 0 segments initially. + bytesStore.init(context, bytesStore); + + // Writes record to different partitions + context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "t1", new RecordHeaders())); + bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), serializeValue(10)); + context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "t1", new RecordHeaders())); + bytesStore.put(serializeKey(new Windowed<>("a", windows[1])), serializeValue(10)); + context.setRecordContext(new ProcessorRecordContext(0, 1, 1, "t1", new RecordHeaders())); + bytesStore.put(serializeKey(new Windowed<>("a", windows[2])), serializeValue(10)); + context.setRecordContext(new ProcessorRecordContext(0, 3, 1, "t1", new RecordHeaders())); + bytesStore.put(serializeKey(new Windowed<>("a", windows[3])), serializeValue(10)); + final Position expected = Position.fromMap(mkMap(mkEntry("t1", mkMap(mkEntry(0, 2L), mkEntry(1, 3L))))); + + // Each open segment should share the same position. + for (final KeyValueSegment segment : bytesStore.getSegments()) { + assertEquals(expected, segment.getPosition()); + } + + // Persist the merged position and simulate a full store restart. + bytesStore.commit(Map.of()); + bytesStore.segments.writePosition(); + bytesStore.close(); + bytesStore.init(context, bytesStore); + + // The store-level position should be restored from the merged position. + assertEquals(expected, bytesStore.getPosition()); + + // Restored segments should all have the same merged position. + for (final KeyValueSegment segment : bytesStore.getSegments()) { + assertEquals(expected, segment.getPosition()); + } + } + private Set segmentDirs() { final File windowDir = new File(stateDir, storeName); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java index 5206f48ad407f..201db4bebe3d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java @@ -712,7 +712,7 @@ public void shouldNotThrowWhenRestoringOnMissingHeaders(final SegmentedBytesStor @ParameterizedTest @MethodSource("getKeySchemas") - public void shouldLoadPositionFromFile(final SegmentedBytesStore.KeySchema schema) { + public void shouldMigrateExistingPositionFromFile(final SegmentedBytesStore.KeySchema schema) { before(schema); final Position position = Position.fromMap(mkMap(mkEntry("topic", mkMap(mkEntry(0, 1L))))); final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new File(context.stateDir(), storeName + ".position")); @@ -725,6 +725,47 @@ public void shouldLoadPositionFromFile(final SegmentedBytesStore.KeySchema schem assertEquals(position, bytesStore.getPosition()); } + @ParameterizedTest + @MethodSource("getKeySchemas") + public void shouldRestoreMergedPositionFromMultipleSegmentsAfterRestart(final SegmentedBytesStore.KeySchema schema) { + before(schema); + bytesStore = getBytesStore(); + // 0 segments initially. + bytesStore.init(context, bytesStore); + + // Writes record to different partitions + context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "t1", new RecordHeaders())); + bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), serializeValue(10)); + context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "t1", new RecordHeaders())); + bytesStore.put(serializeKey(new Windowed<>("a", windows[1])), serializeValue(10)); + context.setRecordContext(new ProcessorRecordContext(0, 1, 1, "t1", new RecordHeaders())); + bytesStore.put(serializeKey(new Windowed<>("a", windows[2])), serializeValue(10)); + context.setRecordContext(new ProcessorRecordContext(0, 3, 1, "t1", new RecordHeaders())); + bytesStore.put(serializeKey(new Windowed<>("a", windows[3])), serializeValue(10)); + final Position expected = Position.fromMap(mkMap(mkEntry("t1", mkMap(mkEntry(0, 2L), mkEntry(1, 3L))))); + + // Each open segment should share the same position. + for (final S segment : bytesStore.getSegments()) { + assertEquals(expected, segment.getPosition()); + } + + // Persist the merged position and simulate a full store restart. + bytesStore.commit(Map.of()); + for (final S segment : bytesStore.getSegments()) { + segment.writePosition(); + } + bytesStore.close(); + bytesStore.init(context, bytesStore); + + // The store-level position should be restored from the merged position. + assertEquals(expected, bytesStore.getPosition()); + + // Restored segments should all have the same merged position. + for (final S segment : bytesStore.getSegments()) { + assertEquals(expected, segment.getPosition()); + } + } + private List> getChangelogRecords() { final List> records = new ArrayList<>(); final Headers headers = new RecordHeaders(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java new file mode 100644 index 0000000000000..6e25c97c881da --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Comparator; +import java.util.IdentityHashMap; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +/** + * An in-memory implementation of {@link RocksDBStore.DBAccessor} intended for use in tests, + * eliminating the need for mocks. Data is kept in a {@link NavigableMap} per + * {@link ColumnFamilyHandle} (identified by object identity), providing full key-ordering semantics + * equivalent to RocksDB's default comparator. + */ +public class InMemoryRocksDBAccessor implements RocksDBStore.DBAccessor { + + /** + * Unsigned lexicographic byte-array comparator — matches RocksDB's default comparator. + */ + private static final Comparator BYTES_COMPARATOR = Arrays::compare; + + /** + * Per-CF stores, keyed by {@link ColumnFamilyHandle} object identity. + */ + private final Map> stores = new IdentityHashMap<>(); + private final RocksDB rocksDB; + + public InMemoryRocksDBAccessor(final RocksDB rocksDB) { + this.rocksDB = rocksDB; + } + + private NavigableMap storeFor(final ColumnFamilyHandle columnFamily) { + return stores.computeIfAbsent(columnFamily, cf -> new TreeMap<>(BYTES_COMPARATOR)); + } + + @Override + public byte[] get(final ColumnFamilyHandle columnFamily, final byte[] key) { + return storeFor(columnFamily).get(key); + } + + @Override + public byte[] get(final ColumnFamilyHandle columnFamily, final ReadOptions readOptions, final byte[] key) { + // ReadOptions (snapshots, fill-cache, etc.) are not meaningful in-memory; delegate to plain get. + return get(columnFamily, key); + } + + @Override + public RocksIterator newIterator(final ColumnFamilyHandle columnFamily) { + return new InMemoryRocksIterator(rocksDB, storeFor(columnFamily)); + } + + @Override + public void put(final ColumnFamilyHandle columnFamily, final byte[] key, final byte[] value) { + if (value == null) { + delete(columnFamily, key); + } else { + storeFor(columnFamily).put(key, value); + } + } + + @Override + public void delete(final ColumnFamilyHandle columnFamily, final byte[] key) { + storeFor(columnFamily).remove(key); + } + + @Override + public void deleteRange(final ColumnFamilyHandle columnFamily, final byte[] from, final byte[] to) { + throw new UnsupportedOperationException("deleteRange not supported in-memory"); + } + + @Override + public long approximateNumEntries(final ColumnFamilyHandle columnFamily) { + return storeFor(columnFamily).size(); + } + + @Override + public void flush(final ColumnFamilyHandle... columnFamilies) { + // No-op: in-memory writes are immediately durable. + } + + @Override + public void reset() { + stores.clear(); + } + + @Override + public void close() { + // No native resources to release. + } + + /** + * In-memory iterator backed by a navigable map. + */ + private static class InMemoryRocksIterator extends RocksIterator { + private final NavigableMap data; + private byte[] currentKey; + private boolean valid; + + InMemoryRocksIterator(final RocksDB rocksDB, final NavigableMap data) { + super(rocksDB, 0L); + this.data = data; + this.currentKey = null; + this.valid = false; + } + + @Override + protected void disposeInternal() { + // No native resources to release. + } + + @Override + public boolean isValid() { + return valid && currentKey != null && data.containsKey(currentKey); + } + + @Override + public void seekToFirst() { + if (data.isEmpty()) { + invalidate(); + } else { + currentKey = data.firstKey(); + valid = true; + } + } + + @Override + public void seekToLast() { + if (data.isEmpty()) { + invalidate(); + } else { + currentKey = data.lastKey(); + valid = true; + } + } + + @Override + public void seek(final byte[] target) { + final Map.Entry entry = data.ceilingEntry(target); + if (entry == null) { + invalidate(); + } else { + currentKey = entry.getKey(); + valid = true; + } + } + + @Override + public void seekForPrev(final byte[] target) { + final Map.Entry entry = data.floorEntry(target); + if (entry == null) { + invalidate(); + } else { + currentKey = entry.getKey(); + valid = true; + } + } + + @Override + public void seek(final ByteBuffer target) { + final ByteBuffer duplicate = target.duplicate(); + final byte[] bytes = new byte[duplicate.remaining()]; + duplicate.get(bytes); + seek(bytes); + } + + @Override + public void seekForPrev(final ByteBuffer target) { + final ByteBuffer duplicate = target.duplicate(); + final byte[] bytes = new byte[duplicate.remaining()]; + duplicate.get(bytes); + seekForPrev(bytes); + } + + @Override + public void next() { + final Map.Entry entry = data.higherEntry(currentKey); + if (entry == null) { + invalidate(); + } else { + currentKey = entry.getKey(); + } + } + + @Override + public void prev() { + if (!isValid()) { + return; + } + final Map.Entry entry = data.lowerEntry(currentKey); + if (entry == null) { + invalidate(); + } else { + currentKey = entry.getKey(); + } + } + + @Override + public byte[] key() { + return currentKey; + } + + @Override + public byte[] value() { + return isValid() ? data.get(currentKey) : null; + } + + @Override + public void status() throws RocksDBException { + // In-memory iterator never enters an error state. + } + + @Override + public void close() { + invalidate(); + } + + private void invalidate() { + valid = false; + currentKey = null; + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 35d152bd07f4d..1a6a83848d782 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -1255,7 +1255,7 @@ public void shouldNotThrowWhenRestoringOnMissingHeaders() { } @Test - public void shouldLoadPositionFromFile() { + public void shouldMigrateExistingPositionFromFile() { final Position position = Position.fromMap(mkMap(mkEntry("topic", mkMap(mkEntry(0, 1L))))); final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new File(context.stateDir(), rocksDBStore.name + ".position")); StoreQueryUtils.checkpointPosition(positionCheckpoint, position); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java index f843a38455aff..340ca7eb21642 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java @@ -834,7 +834,7 @@ public void shouldAllowZeroHistoryRetention() { } @Test - public void shouldLoadPositionFromFile() { + public void shouldMigrateExistingPositionFromFile() { final Position position = Position.fromMap(mkMap(mkEntry("topic", mkMap(mkEntry(0, 1L))))); final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new File(context.stateDir(), store.name() + ".position")); StoreQueryUtils.checkpointPosition(positionCheckpoint, position);