From 76f166e122c05ef7c8eced9ca3c61d31f2e9b682 Mon Sep 17 00:00:00 2001 From: Eduwer Camacaro Date: Wed, 25 Mar 2026 12:09:54 -0500 Subject: [PATCH 1/2] RocksDBStores now manage their own offsets by storing them in a offsets ColumnFamily. Also, these stores use a RocksDB atomic flush to guarantee consistency between the default and offsets ColumnFamilies. As described in KIP-1035, this new behavior enables RocksDB stores to avoid explicitly flushing the memtables to the SST files --- .../internals/AbstractColumnFamilyAccessor.java | 12 +----------- .../state/internals/DualColumnFamilyAccessor.java | 5 ----- .../kafka/streams/state/internals/RocksDBStore.java | 5 ----- .../internals/AbstractColumnFamilyAccessorTest.java | 2 -- .../internals/DualColumnFamilyAccessorTest.java | 1 - .../streams/state/internals/RocksDBStoreTest.java | 10 ---------- 6 files changed, 1 insertion(+), 34 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java index 0288872f567d4..79a8cc0cdecf8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java @@ -69,8 +69,7 @@ public final void commit(final RocksDBStore.DBAccessor accessor, final Map changelogOffsets = Map.of(tp0, 10L, tp1, 20L); accessor.commit(dbAccessor, changelogOffsets); - verify(dbAccessor).flush(any(ColumnFamilyHandle[].class)); verify(dbAccessor).put(eq(offsetsCF), eq(toBytes(tp0.toString())), eq(toBytes(10L))); verify(dbAccessor).put(eq(offsetsCF), eq(toBytes(tp1.toString())), eq(toBytes(20L))); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java index 94efbb18a66c5..2f8dea36eaa7a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java @@ -348,7 +348,6 @@ public void shouldFlushBothColumnFamiliesOnCommit() throws RocksDBException { accessor.commit(dbAccessor, offsets); - verify(dbAccessor).flush(oldCF, newCF, offsetsCF); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 35d152bd07f4d..a59dc0a51668f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -86,7 +86,6 @@ import org.rocksdb.Statistics; import java.io.File; -import java.io.IOException; import java.lang.reflect.Field; import java.math.BigInteger; import java.util.ArrayList; @@ -918,15 +917,6 @@ public void shouldReturnValueOnRange() { } } - @Test - public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws IOException { - rocksDBStore.init(context, rocksDBStore); - Utils.delete(dir); - rocksDBStore.put( - new Bytes(stringSerializer.serialize(null, "anyKey")), - stringSerializer.serialize(null, "anyValue")); - assertThrows(ProcessorStateException.class, () -> rocksDBStore.commit(Map.of())); - } @Test public void shouldHandleToggleOfEnablingBloomFilters() { From cc0d3d3e1fb4c4a3b2d9368e70287d2b0cfc2c7a Mon Sep 17 00:00:00 2001 From: Eduwer Camacaro Date: Mon, 30 Mar 2026 16:49:39 -0500 Subject: [PATCH 2/2] Remove deprecated test case --- .../internals/DualColumnFamilyAccessorTest.java | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java index 2f8dea36eaa7a..437352b5c9005 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.ProcessorStateException; @@ -35,9 +34,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.function.Function; import static org.junit.jupiter.api.Assertions.assertArrayEquals; @@ -341,15 +338,6 @@ public void shouldReturnSumOfEntriesFromBothColumnFamilies() throws RocksDBExcep assertEquals(150L, result); } - @Test - public void shouldFlushBothColumnFamiliesOnCommit() throws RocksDBException { - final Map offsets = new HashMap<>(); - offsets.put(new TopicPartition("topic", 0), 100L); - - accessor.commit(dbAccessor, offsets); - - } - @Test public void shouldCreateRangeIterator() { final RocksIterator iterNewFormat = mock(RocksIterator.class);