diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index db03332079596..7b3483f8e688f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -54,6 +54,7 @@ import static java.lang.String.format; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore; import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt; +import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.OFFSET_UNKNOWN; /** * ProcessorStateManager is the source of truth for the current offset for each state store, @@ -307,7 +308,7 @@ void initializeStoreOffsets(final boolean storeDirIsEmpty) { final Long offset = store.stateStore.committedOffset(store.changelogPartition); if (offset != null) { - store.setOffset(offset); + store.setOffset(changelogOffsetFromCommittedOffset(offset)); log.info("State store {} initialized from checkpoint with offset {} at changelog {}", store.stateStore.name(), store.offset, store.changelogPartition); } else { @@ -509,10 +510,11 @@ public void commit() { final StateStore store = metadata.stateStore; log.trace("Committing store {}", store.name()); try { - if (metadata.changelogPartition == null || metadata.offset == null || metadata.corrupted || !store.persistent()) { + if (metadata.changelogPartition == null || metadata.corrupted || !store.persistent()) { store.commit(Map.of()); } else { - store.commit(Map.of(metadata.changelogPartition, metadata.offset)); + // logged store, persistent and valid end offset + store.commit(Map.of(metadata.changelogPartition, committableOffsetFromChangelogOffset(metadata.offset))); } if (!metadata.corrupted && metadata.commitCallback != null) { @@ -701,6 +703,16 @@ public void updateChangelogOffsets(final Map writtenOffset stateDirectory.updateTaskOffsets(taskId, changelogOffsets()); } + // Commit a sentinel value when the changelog offset is not yet initialized/known + private long committableOffsetFromChangelogOffset(final Long offset) { + return offset != null ? offset : OFFSET_UNKNOWN; + } + + // Convert the written offsets in the checkpoint file back to the changelog offset + private Long changelogOffsetFromCommittedOffset(final long offset) { + return offset != OFFSET_UNKNOWN ? offset : null; + } + private TopicPartition getStorePartition(final String storeName) { // NOTE we assume the partition of the topic can always be inferred from the task id; // if user ever use a custom partition grouper (deprecated in KIP-528) this would break and diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 76a10cf192a8f..be2c6830e3d8e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -593,6 +593,49 @@ public void shouldCommitAndCloseLegacyStores() throws IOException { } } + @Test + public void shouldCommitAndCloseLegacyStoresWithUnknownOffsetPositions() throws Exception { + checkpoint.write(emptyMap()); + + final File storeCheckpointFile = new File(stateDirectory.getOrCreateDirectoryForTask(taskId), CHECKPOINT_FILE_NAME + "_" + persistentStore.name()); + + // set up ack'ed offsets + final HashMap ackedOffsets = new HashMap<>(); + ackedOffsets.put(persistentStorePartition, null); + ackedOffsets.put(nonPersistentStorePartition, 456L); + ackedOffsets.put(new TopicPartition("nonRegisteredTopic", 1), 789L); + + final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); + contextRegistersStateStore(stateMgr); + try { + // make sure the checkpoint file is not written yet + assertFalse(storeCheckpointFile.exists()); + + stateMgr.registerStateStores(Arrays.asList(persistentStore, nonPersistentStore), context); + } finally { + stateMgr.commit(); + + assertTrue(persistentStore.committed); + assertTrue(nonPersistentStore.committed); + + // make sure that flush is called in the proper order + assertThat(persistentStore.getLastCommitCount(), Matchers.lessThan(nonPersistentStore.getLastCommitCount())); + + stateMgr.updateChangelogOffsets(ackedOffsets); + stateMgr.commit(); + stateMgr.close(); + assertTrue(persistentStore.closed); + assertTrue(nonPersistentStore.closed); + + assertTrue(storeCheckpointFile.exists()); + + // the checkpoint file should contain an offset from the persistent store only. + final OffsetCheckpoint storeCheckpoint = new OffsetCheckpoint(storeCheckpointFile); + final Map checkpointedOffsets = storeCheckpoint.read(); + assertThat(checkpointedOffsets, is(singletonMap(new TopicPartition(persistentStoreTopicName, 1), -4L))); + } + } + @Test public void shouldOverrideOffsetsWhenRestoreAndProcess() throws IOException { final Map offsets = singletonMap(persistentStorePartition, 99L);