Skip to content

KAFKA-18972: Use custom processor during read-only state store restoration#21882

Open
daguimu wants to merge 1 commit intoapache:trunkfrom
daguimu:fix/read-only-store-processor-restore-KAFKA-18972
Open

KAFKA-18972: Use custom processor during read-only state store restoration#21882
daguimu wants to merge 1 commit intoapache:trunkfrom
daguimu:fix/read-only-store-processor-restore-KAFKA-18972

Conversation

@daguimu
Copy link
Copy Markdown

@daguimu daguimu commented Mar 27, 2026

Problem

When a read-only state store is added via Topology.addReadOnlyStateStore() with a custom ProcessorSupplier, the custom processor is only invoked during normal processing (RUNNING state). During state restoration from the changelog topic, records bypass the processor entirely and are written directly to the store via StateRestoreCallback, discarding any custom transformation logic.

This means if the processor applies transformations (e.g., filtering, value mapping, aggregation) before writing to the store, those transformations are lost during restoration, causing the restored state to diverge from what normal processing would have produced.

Root Cause

Topology.addReadOnlyStateStore() connects the source topic as a changelog via connectSourceStoreAndTopic(), which causes StoreChangelogReader to restore records using ProcessorStateManager.restore(). This method uses only the store's built-in StateRestoreCallback (which writes raw key-value pairs directly), never routing records through the custom processor.

In contrast, addGlobalStore() correctly handles this by registering a ReprocessFactory in storeNameToReprocessOnRestore, which GlobalStateManagerImpl uses to instantiate the processor and call process() for each restored record.

Fix

  • Register a ReprocessFactory for read-only stores in InternalTopologyBuilder (via new registerReadOnlyStoreReprocessFactory() method), mirroring the existing GlobalStore pattern
  • Pass the storeNameToReprocessOnRestore map from ProcessorTopology to ProcessorStateManager (via ActiveTaskCreator and StandbyTaskCreator)
  • In ProcessorStateManager.restore(), check for a ReprocessFactory for the store being restored; if present, instantiate the custom processor and route each record through processor.process() instead of the default StateRestoreCallback
  • Cached processor instances are properly closed when the state manager is closed

Tests Added

Change Point Test
ReprocessFactory registered for read-only stores, processor used during restore ProcessorStateManagerTest.shouldRestoreViaReprocessFactoryWhenPresent() — verifies the custom processor is called with deserialized records during restore
Fallback to StateRestoreCallback when no ReprocessFactory exists ProcessorStateManagerTest.shouldFallbackToCallbackWhenNoReprocessFactory() — verifies existing stores without ReprocessFactory still use the callback path
End-to-end: custom processor transformation applied to store records ReadOnlyStoreTest.shouldUseCustomProcessorDuringRestorationWithTransformation() — verifies records in store have the processor's "processed-" prefix transformation applied
Null key handling during reprocess restore ReadOnlyStoreTest.shouldHandleNullKeyRecordsDuringReprocessRestore() — verifies basic record handling with non-null keys

Impact

  • Read-only state stores added via addReadOnlyStateStore() with a custom processor will now have their processor invoked during restoration, matching the documented behavior
  • No impact on regular state stores (no ReprocessFactory registered, fallback to existing callback path)
  • No impact on global state stores (they already use ReprocessFactory via GlobalStateManagerImpl)
  • Backward compatible: the old ProcessorStateManager constructor still works, defaulting to an empty ReprocessFactory map

Fixes KAFKA-18972

…ation

When a read-only state store is added via Topology.addReadOnlyStateStore()
with a custom ProcessorSupplier, the processor is only used during normal
processing but bypassed during state restoration. Records from the
changelog topic are written directly to the store via StateRestoreCallback,
skipping the custom processor logic entirely.

This fix registers a ReprocessFactory for read-only stores (mirroring the
existing GlobalStore approach) and modifies ProcessorStateManager.restore()
to route records through the custom processor when a ReprocessFactory is
available, ensuring consistent behavior between normal processing and
state restoration.
@github-actions github-actions bot added triage PRs from the community streams labels Mar 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

streams triage PRs from the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant