fix: externalize finalize cursor so HTTP continuation pages correctly#1
Merged
Merged
Conversation
The buffering finalize streams (ate, propensity_scores, att) used a
position-less DrainState{done}: emit the whole result in one out.emit(),
then set done. Over the stateless HTTP transport the framework wire-
serializes finalize state between ticks and emits at most one batch per
response, resuming from the continuation token. An emit-all+done cursor
re-runs from row 0 on every resume and loops forever once the output
exceeds one response -- which propensity_scores (one row per input
subject, unbounded) routinely does. subprocess/unix keep live state in
process and hide the bug; only http exposes it.
Replace DrainState with an offset cursor: started flag, integer offset,
and the computed result batch as IPC bytes (result_ipc) -- all wire-
serializable. The first finalize tick computes the estimate once into the
cursor; every tick emits at most ROWS_PER_TICK (64) rows from offset,
advances offset, and finishes when drained. Because offset survives the
round-trip, a resumed tick emits the next slice -- never re-runs the
estimator, never restarts from row 0. ate (3 rows) and att (1 row) are
bounded but use the identical cursor for uniformity. Results are byte-
identical to before. SinkBuffer.drain_result holds the shared cursor loop;
each finalize is now a one-line call.
Validation:
- tests/harness.py run_buffering gains serialize_state=True, which re-
serializes finalize state between every tick and caps each tick at one
response worth of rows (rejecting an over-cap emit and resuming from the
pre-tick token), with a 10000-tick overrun guard. This models the HTTP
continuation faithfully: the old emit-all cursor overruns the guard; the
offset cursor pages and terminates.
- TestCursorSurvivesContinuation (test_tables.py) drives propensity_scores
over an 800-row cohort (>> ROWS_PER_TICK) normal vs serialize_state=True
and asserts identical rows/order, no dupes, termination; a second case
monkeypatches ROWS_PER_TICK=2 so ate/att also page.
- causal.test adds a 200-row generate_series cohort and asserts
propensity_scores returns count 200, distinct 200, all valid
probabilities, and an ordered head -- so the http leg genuinely pages.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
The bug
The buffering finalize streams (
ate,propensity_scores,att) carried a position-lessDrainState{done: bool}: emit the whole result in oneout.emit(), then setdone. Over the stateless HTTP transport the framework wire-serializes finalize state between ticks and emits at most one data batch per response, resuming from the continuation token. An emit-all+donecursor restarts from row 0 on every HTTP resume and loops forever once the output exceeds one response — whichpropensity_scores(one row per input subject, genuinely unbounded) routinely does.subprocess/unixkeep live state in-process and hide the bug; onlyhttpexposes it.The fix
Replace
DrainState{done}with an offset cursor:startedflag, integeroffset, and the already-computed result batch as Arrow IPC bytes (result_ipc) — all wire-serializable. The first finalize tick computes the estimate once into the cursor; every tick emits at mostROWS_PER_TICK(64) rows fromoffset, advancesoffset, andout.finish()es when drained. Becauseoffsetsurvives the round-trip, a resumed tick emits the next slice — never re-runs the estimator, never restarts from row 0.ate(3 rows) andatt(1 row) are bounded but use the identical cursor for uniformity. Results are byte-identical to before.SinkBuffer.drain_resultholds the shared cursor loop; eachfinalizeis now a one-liner.Validation (fails on old code, passes on new)
tests/harness.pyrun_buffering(..., serialize_state=True)— re-serializes finalize state between every tick (deserialize_from_bytes(serialize_to_bytes())) and caps each tick at one response worth of rows, rejecting an over-cap emit and resuming from the pre-tick token, with a 10 000-tick overrun guard. Models the HTTP continuation faithfully: the old emit-all cursor overruns the guard; the offset cursor pages and terminates.TestCursorSurvivesContinuation(tests/test_tables.py) — drivespropensity_scoresover an 800-row cohort (>>ROWS_PER_TICK) normal vsserialize_state=True, asserting identical rows/order, no dupes, termination; a second case monkeypatchesROWS_PER_TICK=2soate/attalso page.test/sql/causal.test— adds a 200-rowgenerate_seriescohort and assertspropensity_scoresreturnscount(*) = 200,count(distinct id) = 200, all valid probabilities, and an ordered head — so the http leg genuinely pages.Confirmed locally:
pytest -qgreen,ruff/mypy/pydoclintclean, and the SQL E2E suite passes over all three transports (subprocess, http, unix). Verified fail-old (overrun guard fires) / pass-new (800 rows) directly.🤖 Generated with Claude Code