Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static java.lang.String.format;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore;
import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.OFFSET_UNKNOWN;

/**
* ProcessorStateManager is the source of truth for the current offset for each state store,
Expand Down Expand Up @@ -307,7 +308,7 @@ void initializeStoreOffsets(final boolean storeDirIsEmpty) {
final Long offset = store.stateStore.committedOffset(store.changelogPartition);

if (offset != null) {
store.setOffset(offset);
store.setOffset(changelogOffsetFromCommittedOffset(offset));
log.info("State store {} initialized from checkpoint with offset {} at changelog {}",
store.stateStore.name(), store.offset, store.changelogPartition);
} else {
Expand Down Expand Up @@ -509,10 +510,11 @@ public void commit() {
final StateStore store = metadata.stateStore;
log.trace("Committing store {}", store.name());
try {
if (metadata.changelogPartition == null || metadata.offset == null || metadata.corrupted || !store.persistent()) {
if (metadata.changelogPartition == null || metadata.corrupted || !store.persistent()) {
store.commit(Map.of());
} else {
store.commit(Map.of(metadata.changelogPartition, metadata.offset));
// logged store, persistent and valid end offset
store.commit(Map.of(metadata.changelogPartition, committableOffsetFromChangelogOffset(metadata.offset)));
}

if (!metadata.corrupted && metadata.commitCallback != null) {
Expand Down Expand Up @@ -701,6 +703,16 @@ public void updateChangelogOffsets(final Map<TopicPartition, Long> writtenOffset
stateDirectory.updateTaskOffsets(taskId, changelogOffsets());
}

// Commit a sentinel value when the changelog offset is not yet initialized/known
private long committableOffsetFromChangelogOffset(final Long offset) {
return offset != null ? offset : OFFSET_UNKNOWN;
}

// Convert the written offsets in the checkpoint file back to the changelog offset
private Long changelogOffsetFromCommittedOffset(final long offset) {
return offset != OFFSET_UNKNOWN ? offset : null;
}

private TopicPartition getStorePartition(final String storeName) {
// NOTE we assume the partition of the topic can always be inferred from the task id;
// if user ever use a custom partition grouper (deprecated in KIP-528) this would break and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,49 @@ public void shouldCommitAndCloseLegacyStores() throws IOException {
}
}

@Test
public void shouldCommitAndCloseLegacyStoresWithUnknownOffsetPositions() throws Exception {
checkpoint.write(emptyMap());

final File storeCheckpointFile = new File(stateDirectory.getOrCreateDirectoryForTask(taskId), CHECKPOINT_FILE_NAME + "_" + persistentStore.name());

// set up ack'ed offsets
final HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
ackedOffsets.put(persistentStorePartition, null);
ackedOffsets.put(nonPersistentStorePartition, 456L);
ackedOffsets.put(new TopicPartition("nonRegisteredTopic", 1), 789L);

final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE);
contextRegistersStateStore(stateMgr);
try {
// make sure the checkpoint file is not written yet
assertFalse(storeCheckpointFile.exists());

stateMgr.registerStateStores(Arrays.asList(persistentStore, nonPersistentStore), context);
} finally {
stateMgr.commit();

assertTrue(persistentStore.committed);
assertTrue(nonPersistentStore.committed);

// make sure that flush is called in the proper order
assertThat(persistentStore.getLastCommitCount(), Matchers.lessThan(nonPersistentStore.getLastCommitCount()));

stateMgr.updateChangelogOffsets(ackedOffsets);
stateMgr.commit();
stateMgr.close();
assertTrue(persistentStore.closed);
assertTrue(nonPersistentStore.closed);

assertTrue(storeCheckpointFile.exists());

// the checkpoint file should contain an offset from the persistent store only.
final OffsetCheckpoint storeCheckpoint = new OffsetCheckpoint(storeCheckpointFile);
final Map<TopicPartition, Long> checkpointedOffsets = storeCheckpoint.read();
assertThat(checkpointedOffsets, is(singletonMap(new TopicPartition(persistentStoreTopicName, 1), -4L)));
}
}

@Test
public void shouldOverrideOffsetsWhenRestoreAndProcess() throws IOException {
final Map<TopicPartition, Long> offsets = singletonMap(persistentStorePartition, 99L);
Expand Down
Loading