Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,29 @@
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.query.Position;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra blank line

@ExtendWith(MockitoExtension.class)
abstract class AbstractColumnFamilyAccessorTest {
Expand Down Expand Up @@ -67,41 +71,75 @@ public void setUp() {

@Test
public void shouldOpenClean() throws RocksDBException {
when(dbAccessor.get(offsetsCF, toBytes("status"))).thenReturn(closedValue);

dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
// Open the ColumnFamily
accessor.open(dbAccessor, false);
verify(dbAccessor).put(eq(offsetsCF), eq(toBytes("status")), eq(openValue));
assertArrayEquals(openValue, dbAccessor.get(offsetsCF, toBytes("status")));

// Now close the ColumnFamily
accessor.close(dbAccessor);
verify(dbAccessor).put(eq(offsetsCF), eq(toBytes("status")), eq(closedValue));
assertArrayEquals(closedValue, dbAccessor.get(offsetsCF, toBytes("status")));

// Open clean again
accessor.open(dbAccessor, false);
assertArrayEquals(openValue, dbAccessor.get(offsetsCF, toBytes("status")));
}

@Test
public void shouldThrowOnOpenAfterAUncleanClose() throws RocksDBException {
when(dbAccessor.get(offsetsCF, toBytes("status"))).thenReturn(openValue);
dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
// First, open clean
accessor.open(dbAccessor, false);

// Try to open again, with ignoreUncleanClose=false, which should throw since the store is already open
final ProcessorStateException thrown = assertThrowsExactly(ProcessorStateException.class, () -> accessor.open(dbAccessor, false));
assertEquals("Invalid state during store open. Expected state to be either empty or closed", thrown.getMessage());
}

@Test
public void shouldIgnoreExceptionAfterUncleanClose() throws RocksDBException {
when(dbAccessor.get(offsetsCF, toBytes("status"))).thenReturn(openValue);
dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
// First, open clean
accessor.open(dbAccessor, false);
// Now reopen in an invalid state
accessor.open(dbAccessor, true);
assertTrue(storeOpen.get());
verify(dbAccessor).put(eq(offsetsCF), eq(toBytes("status")), eq(openValue));
assertArrayEquals(openValue, dbAccessor.get(offsetsCF, toBytes("status")));
}

@Test
public void shouldCommitOffsets() throws RocksDBException {
dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
final TopicPartition tp0 = new TopicPartition("testTopic", 0);
final TopicPartition tp1 = new TopicPartition("testTopic", 1);
final Map<TopicPartition, Long> changelogOffsets = Map.of(tp0, 10L, tp1, 20L);
accessor.commit(dbAccessor, changelogOffsets);
verify(dbAccessor).flush(any(ColumnFamilyHandle[].class));
verify(dbAccessor).put(eq(offsetsCF), eq(toBytes(tp0.toString())), eq(toBytes(10L)));
verify(dbAccessor).put(eq(offsetsCF), eq(toBytes(tp1.toString())), eq(toBytes(20L)));
assertEquals(10L, accessor.getCommittedOffset(dbAccessor, tp0));
assertEquals(20L, accessor.getCommittedOffset(dbAccessor, tp1));
}

@Test
public void shouldCommitPosition() throws RocksDBException {
dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
final String topic = "testTopic";
final TopicPartition tp0 = new TopicPartition(topic, 0);
final TopicPartition tp1 = new TopicPartition(topic, 1);
final Position positionToStore = Position.fromMap(mkMap(mkEntry(topic, mkMap(mkEntry(tp0.partition(), 10L), mkEntry(tp1.partition(), 20L)))));
accessor.commit(dbAccessor, positionToStore);
assertEquals(positionToStore, PositionSerde.deserialize(ByteBuffer.wrap(dbAccessor.get(offsetsCF, toBytes("position")))));
}

@Test
public void shouldWipeCommittedOffsetsOnEmptyCommit() throws RocksDBException {
dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
final TopicPartition tp0 = new TopicPartition("testTopic", 0);
final TopicPartition tp1 = new TopicPartition("testTopic", 1);
accessor.commit(dbAccessor, Map.of(tp0, 10L, tp1, 20L));
assertEquals(10L, accessor.getCommittedOffset(dbAccessor, tp0));
assertEquals(20L, accessor.getCommittedOffset(dbAccessor, tp1));
accessor.commit(dbAccessor, Map.of());
assertNull(accessor.getCommittedOffset(dbAccessor, tp0));
assertNull(accessor.getCommittedOffset(dbAccessor, tp1));
}

private byte[] toBytes(final String s) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1626,7 +1626,7 @@ public void shouldMeasureExpiredRecords() {
}

@Test
public void shouldLoadPositionFromFile() {
public void shouldMigrateExistingPositionFromFile() {
final Position position = Position.fromMap(mkMap(mkEntry("topic", mkMap(mkEntry(0, 1L)))));
final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new File(stateDir, storeName + ".position"));
StoreQueryUtils.checkpointPosition(positionCheckpoint, position);
Expand All @@ -1639,6 +1639,43 @@ public void shouldLoadPositionFromFile() {
bytesStore.close();
}

@Test
public void shouldRestoreMergedPositionFromMultipleSegmentsAfterRestart() {
final AbstractDualSchemaRocksDBSegmentedBytesStore<KeyValueSegment> bytesStore = getBytesStore();
// 0 segments initially.
bytesStore.init(context, bytesStore);

// Writes record to different partitions
context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "t1", new RecordHeaders()));
bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), serializeValue(10));
context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "t1", new RecordHeaders()));
bytesStore.put(serializeKey(new Windowed<>("a", windows[1])), serializeValue(10));
context.setRecordContext(new ProcessorRecordContext(0, 1, 1, "t1", new RecordHeaders()));
bytesStore.put(serializeKey(new Windowed<>("a", windows[2])), serializeValue(10));
context.setRecordContext(new ProcessorRecordContext(0, 3, 1, "t1", new RecordHeaders()));
bytesStore.put(serializeKey(new Windowed<>("a", windows[3])), serializeValue(10));
final Position expected = Position.fromMap(mkMap(mkEntry("t1", mkMap(mkEntry(0, 2L), mkEntry(1, 3L)))));

// Each open segment should share the same position.
for (final KeyValueSegment segment : bytesStore.getSegments()) {
assertEquals(expected, segment.getPosition());
}

// Persist the merged position and simulate a full store restart.
bytesStore.commit(Map.of());
bytesStore.segments.writePosition();
bytesStore.close();
bytesStore.init(context, bytesStore);

// The store-level position should be restored from the merged position.
assertEquals(expected, bytesStore.getPosition());

// Restored segments should all have the same merged position.
for (final KeyValueSegment segment : bytesStore.getSegments()) {
assertEquals(expected, segment.getPosition());
}
}

private Set<String> segmentDirs() {
final File windowDir = new File(stateDir, storeName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ public void shouldNotThrowWhenRestoringOnMissingHeaders(final SegmentedBytesStor

@ParameterizedTest
@MethodSource("getKeySchemas")
public void shouldLoadPositionFromFile(final SegmentedBytesStore.KeySchema schema) {
public void shouldMigrateExistingPositionFromFile(final SegmentedBytesStore.KeySchema schema) {
before(schema);
final Position position = Position.fromMap(mkMap(mkEntry("topic", mkMap(mkEntry(0, 1L)))));
final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new File(context.stateDir(), storeName + ".position"));
Expand All @@ -725,6 +725,47 @@ public void shouldLoadPositionFromFile(final SegmentedBytesStore.KeySchema schem
assertEquals(position, bytesStore.getPosition());
}

@ParameterizedTest
@MethodSource("getKeySchemas")
public void shouldRestoreMergedPositionFromMultipleSegmentsAfterRestart(final SegmentedBytesStore.KeySchema schema) {
before(schema);
bytesStore = getBytesStore();
// 0 segments initially.
bytesStore.init(context, bytesStore);

// Writes record to different partitions
context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "t1", new RecordHeaders()));
bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), serializeValue(10));
context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "t1", new RecordHeaders()));
bytesStore.put(serializeKey(new Windowed<>("a", windows[1])), serializeValue(10));
context.setRecordContext(new ProcessorRecordContext(0, 1, 1, "t1", new RecordHeaders()));
bytesStore.put(serializeKey(new Windowed<>("a", windows[2])), serializeValue(10));
context.setRecordContext(new ProcessorRecordContext(0, 3, 1, "t1", new RecordHeaders()));
bytesStore.put(serializeKey(new Windowed<>("a", windows[3])), serializeValue(10));
final Position expected = Position.fromMap(mkMap(mkEntry("t1", mkMap(mkEntry(0, 2L), mkEntry(1, 3L)))));

// Each open segment should share the same position.
for (final S segment : bytesStore.getSegments()) {
assertEquals(expected, segment.getPosition());
}

// Persist the merged position and simulate a full store restart.
bytesStore.commit(Map.of());
for (final S segment : bytesStore.getSegments()) {
segment.writePosition();
}
bytesStore.close();
bytesStore.init(context, bytesStore);

// The store-level position should be restored from the merged position.
assertEquals(expected, bytesStore.getPosition());

// Restored segments should all have the same merged position.
for (final S segment : bytesStore.getSegments()) {
assertEquals(expected, segment.getPosition());
}
}

private List<ConsumerRecord<byte[], byte[]>> getChangelogRecords() {
final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
final Headers headers = new RecordHeaders();
Expand Down
Loading