Fix InMemory stream Version not advancing on append#275
Merged
MPapst merged 2 commits intoPapstIO:mainfrom May 6, 2026
Merged
Conversation
The InMemoryEventStream's Version is set in the constructor and never updated, so each AppendAsync writes events with the same (initial) Version + 1 and the stream tip never advances. This breaks incremental aggregation, which uses stream.Version as the target version. Pin the contract: - Direct AppendAsync produces events at versions 1, 2, 3, ... - AppendAsync advances stream.Version - AppendSnapshotAsync advances stream.Version - ListAsync(2, ...) returns the second event after two appends The Cosmos implementation already satisfies all of these.
Version was a get-only property set in the constructor and never updated, so AppendAsync wrote each event with `Version + 1` against the unchanging initial value — every direct append produced version 1, and the stream tip stayed at 0. Incremental aggregation (`AggregateAsync(stream, target)`) reads stream.Version as its target version and skips empty event sets when stream.Version doesn't reflect recent appends. Derive Version from the events list (matching how LatestSnapshotVersion works in the same class), falling back to the constructor-supplied initial version when the stream is empty. Per-event Version assignment in AppendAsync / AppendSnapshotAsync now reads the live tip, and the transactional batch path stays self-contained. The Cosmos implementation already advances Version via the patch response on each append, so this only affected the in-memory store.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
InMemoryEventStream.Versionwas a get-only property set in the constructor and never updated.AppendAsyncwrites each event withVersion + 1, but sinceVersionnever moved off its initial value, every direct append produced an event at version 1 and the stream's reported tip stayed at 0.This breaks any caller that relies on
stream.Versionreflecting the actual tip — most visibly the incremental aggregation overloadAggregateAsync(stream, target, ct), which readsstream.Versionas itstargetVersion. With a stale tip, the loop'starget.Version == targetVersionbreak can fire before any newly-appended events are applied, so incremental aggregation silently no-ops after the first append.The Cosmos implementation already keeps
Versionin sync via thePatchItemAsyncresponse on each append; the issue was specific to the in-memory store.Fix
Derive
Versionfrom the events list, mirroring howLatestSnapshotVersionalready works in the same class:The constructor-supplied
versionis kept as_initialVersionso an empty stream still reports the seed value.AppendAsyncandAppendSnapshotAsynckeep theirVersion = Version + 1lines unchanged — they now read the live tip.InMemoryTransactionalBatchis unaffected.Test plan
InMemoryEventStreamTestspin the contract:AppendAsyncproduces events at versions 1, 2, 3, …AppendAsyncadvancesstream.VersionAppendSnapshotAsyncadvancesstream.VersionListAsync(2, …)returns the second event after two appends