KAFKA-18972: Use custom processor during read-only state store restoration#21882
Open
daguimu wants to merge 1 commit intoapache:trunkfrom
Open
KAFKA-18972: Use custom processor during read-only state store restoration#21882daguimu wants to merge 1 commit intoapache:trunkfrom
daguimu wants to merge 1 commit intoapache:trunkfrom
Conversation
…ation When a read-only state store is added via Topology.addReadOnlyStateStore() with a custom ProcessorSupplier, the processor is only used during normal processing but bypassed during state restoration. Records from the changelog topic are written directly to the store via StateRestoreCallback, skipping the custom processor logic entirely. This fix registers a ReprocessFactory for read-only stores (mirroring the existing GlobalStore approach) and modifies ProcessorStateManager.restore() to route records through the custom processor when a ReprocessFactory is available, ensuring consistent behavior between normal processing and state restoration.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
When a read-only state store is added via
Topology.addReadOnlyStateStore()with a customProcessorSupplier, the custom processor is only invoked during normal processing (RUNNING state). During state restoration from the changelog topic, records bypass the processor entirely and are written directly to the store viaStateRestoreCallback, discarding any custom transformation logic.This means if the processor applies transformations (e.g., filtering, value mapping, aggregation) before writing to the store, those transformations are lost during restoration, causing the restored state to diverge from what normal processing would have produced.
Root Cause
Topology.addReadOnlyStateStore()connects the source topic as a changelog viaconnectSourceStoreAndTopic(), which causesStoreChangelogReaderto restore records usingProcessorStateManager.restore(). This method uses only the store's built-inStateRestoreCallback(which writes raw key-value pairs directly), never routing records through the custom processor.In contrast,
addGlobalStore()correctly handles this by registering aReprocessFactoryinstoreNameToReprocessOnRestore, whichGlobalStateManagerImpluses to instantiate the processor and callprocess()for each restored record.Fix
ReprocessFactoryfor read-only stores inInternalTopologyBuilder(via newregisterReadOnlyStoreReprocessFactory()method), mirroring the existing GlobalStore patternstoreNameToReprocessOnRestoremap fromProcessorTopologytoProcessorStateManager(viaActiveTaskCreatorandStandbyTaskCreator)ProcessorStateManager.restore(), check for aReprocessFactoryfor the store being restored; if present, instantiate the custom processor and route each record throughprocessor.process()instead of the defaultStateRestoreCallbackTests Added
ReprocessFactoryregistered for read-only stores, processor used during restoreProcessorStateManagerTest.shouldRestoreViaReprocessFactoryWhenPresent()— verifies the custom processor is called with deserialized records during restoreStateRestoreCallbackwhen noReprocessFactoryexistsProcessorStateManagerTest.shouldFallbackToCallbackWhenNoReprocessFactory()— verifies existing stores withoutReprocessFactorystill use the callback pathReadOnlyStoreTest.shouldUseCustomProcessorDuringRestorationWithTransformation()— verifies records in store have the processor's "processed-" prefix transformation appliedReadOnlyStoreTest.shouldHandleNullKeyRecordsDuringReprocessRestore()— verifies basic record handling with non-null keysImpact
addReadOnlyStateStore()with a custom processor will now have their processor invoked during restoration, matching the documented behaviorReprocessFactoryregistered, fallback to existing callback path)ReprocessFactoryviaGlobalStateManagerImpl)ProcessorStateManagerconstructor still works, defaulting to an emptyReprocessFactorymapFixes KAFKA-18972