Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ public final void commit(final RocksDBStore.DBAccessor accessor, final Map<Topic
}
}
}
// We need to remove this flush call when implementing KAFKA-19712
this.flush(accessor, offsetColumnFamilyHandle);

}

@Override
Expand Down Expand Up @@ -112,15 +111,6 @@ public final Long getCommittedOffset(final RocksDBStore.DBAccessor accessor, fin
return null;
}

/**
* Invokes commit in the underlying ColumnFamilyAccessor.
* Subclasses should implement this method to define specific commit behavior.
* This method will be removed when implementing KAFKA-19712
*
* @param accessor the RocksDB accessor used to interact with the database
* @throws RocksDBException if an error occurs during the commit operation
*/
protected abstract void flush(final RocksDBStore.DBAccessor accessor, final ColumnFamilyHandle offsetColumnFamilyHandle) throws RocksDBException;

private void wipeOffsets(final RocksDBStore.DBAccessor accessor) throws RocksDBException {
try (final RocksIterator iter = accessor.newIterator(offsetColumnFamilyHandle)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,6 @@ public long approximateNumEntries(final DBAccessor accessor)
+ accessor.approximateNumEntries(newColumnFamily);
}

@Override
public void flush(final DBAccessor accessor, final ColumnFamilyHandle offsetColumnFamilyHandle) throws RocksDBException {
accessor.flush(oldColumnFamily, newColumnFamily, offsetColumnFamilyHandle);
}

@Override
public void addToBatch(final byte[] key,
final byte[] value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1042,11 +1042,6 @@ public long approximateNumEntries(final DBAccessor accessor) throws RocksDBExcep
return accessor.approximateNumEntries(columnFamily);
}

@Override
public void flush(final DBAccessor accessor, final ColumnFamilyHandle offsetColumnFamilyHandle) throws RocksDBException {
accessor.flush(columnFamily, offsetColumnFamilyHandle);
}

@Override
public void addToBatch(final byte[] key,
final byte[] value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -99,7 +98,6 @@ public void shouldCommitOffsets() throws RocksDBException {
final TopicPartition tp1 = new TopicPartition("testTopic", 1);
final Map<TopicPartition, Long> changelogOffsets = Map.of(tp0, 10L, tp1, 20L);
accessor.commit(dbAccessor, changelogOffsets);
verify(dbAccessor).flush(any(ColumnFamilyHandle[].class));
verify(dbAccessor).put(eq(offsetsCF), eq(toBytes(tp0.toString())), eq(toBytes(10L)));
verify(dbAccessor).put(eq(offsetsCF), eq(toBytes(tp1.toString())), eq(toBytes(20L)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ public void shouldFlushBothColumnFamiliesOnCommit() throws RocksDBException {

accessor.commit(dbAccessor, offsets);

verify(dbAccessor).flush(oldCF, newCF, offsetsCF);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add another assertion here? Otherwise it's an empty test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it is safe to remove this test entirely because the flush operation has been removed. Instead, the CF accessors now do commits, and we already test that in the AbstractColumnFamilyAccessorTest

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, go ahead and remove it

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
import org.rocksdb.Statistics;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.math.BigInteger;
import java.util.ArrayList;
Expand Down Expand Up @@ -918,15 +917,6 @@ public void shouldReturnValueOnRange() {
}
}

@Test
public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws IOException {
rocksDBStore.init(context, rocksDBStore);
Utils.delete(dir);
rocksDBStore.put(
new Bytes(stringSerializer.serialize(null, "anyKey")),
stringSerializer.serialize(null, "anyValue"));
assertThrows(ProcessorStateException.class, () -> rocksDBStore.commit(Map.of()));
}

@Test
public void shouldHandleToggleOfEnablingBloomFilters() {
Expand Down
Loading