From ce340a0ff0d559041187e4314add48bb44cada46 Mon Sep 17 00:00:00 2001 From: Rusty Conover Date: Tue, 23 Jun 2026 23:11:58 -0400 Subject: [PATCH] Fix HTTP-continuation hang: externalize table-function scan state as a cursor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The structure table functions (tables/words/pages) were TableFunctionGenerator[Args] with `process(params, state: None, out)` that did `out.emit(...ALL rows...); out.finish()` in a single tick. Over the stateless HTTP transport the framework wire-serializes the per-scan state after each tick and resumes by deserializing it, emitting at most one producer batch per response — so a position-less `state: None` generator restarts from row 0 on every HTTP resume and loops forever once the output exceeds one batch. `words` (hundreds–thousands of rows/PDF) and `tables` (one row/cell) are genuinely unbounded, so this is a real hang on the http leg. subprocess/unix hide it by keeping state in-process. Convert all six functions to TableFunctionGenerator[Args, ScanState], mirroring vgi-search's ScanState pattern: - Add ROWS_PER_TICK = 64 and ScanState(ArrowSerializableDataclass) with started/offset/rows_ipc (all plainly serializable), plus result_to_ipc / ipc_to_table / _stream_slice helpers. - Add initial_state() -> ScanState(); refactor _emit_* into _build_* that return the full RecordBatch. process() materializes the full batch into rows_ipc on the first tick, then emits a bounded ROWS_PER_TICK slice from offset, advancing offset and finishing when drained. NULL/empty-source early finish paths stay; rows/schema are byte-identical to before. Validation: - tests/harness.invoke_table_function gains serialize_state=True, round-tripping the state through serialize_to_bytes/deserialize_from_bytes between every tick (1000-tick guard) — mimicking the HTTP wire. - TestScanStateRoundTrip / TestCursorSurvivesContinuation assert identical rows/order, no dupes, termination, and bounded chunks (>= 2 batches each <= ROWS_PER_TICK — fails on old emit-all code, which emits one 200-row batch). - New manywords.pdf fixture (200 words > ROWS_PER_TICK) + structure.test paging case (count = 200, ordered head, distinct = 200) — over http this only terminates if the cursor works. All three transports (subprocess/http/unix) pass locally; CLAUDE.md documents the cursor and why. Co-Authored-By: Claude Opus 4.8 (1M context) --- CLAUDE.md | 41 +++++ test/sql/data/manywords.pdf | Bin 0 -> 2139 bytes test/sql/structure.test | 25 ++++ tests/fixtures.py | 31 ++++ tests/harness.py | 19 ++- tests/test_tables.py | 98 ++++++++++++ vgi_pdf/tables.py | 288 +++++++++++++++++++++++++----------- 7 files changed, 414 insertions(+), 88 deletions(-) create mode 100644 test/sql/data/manywords.pdf diff --git a/CLAUDE.md b/CLAUDE.md index 4b5987e..cb58068 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -48,6 +48,47 @@ function-shape split: - **Structure (many rows per PDF) are table functions** (`tables`, `words`, `pages`), which take the optional `page :=` filter. +## Table-function scan state = the HTTP-continuation cursor (READ THIS) + +The structure table functions (`tables`, `words`, `pages`) emit **many rows per +PDF** — `words` is hundreds–thousands per page, `tables` one row per cell — so +the output routinely exceeds a single producer batch. That makes the +externalized **scan cursor** load-bearing, not optional. + +Over the **stateless HTTP transport** the framework wire-serializes a producer's +per-scan state after every `process()` tick (`ArrowSerializableDataclass. +serialize_to_bytes()`), the client returns the continuation token, and the worker +resumes by deserializing it — emitting at most **one producer batch per HTTP +response**. A position-less `state: None` generator that did +`out.emit(...ALL rows...); out.finish()` would restart from row 0 on **every** +HTTP resume and **loop forever** once the output exceeds one batch. +subprocess/unix keep the live state in-process so they hide the bug; **only the +http leg (and the serialize-between-ticks unit test) expose it.** + +Fix (in `tables.py`, mirrors vgi-search's `ScanState`): every function is a +`TableFunctionGenerator[Args, ScanState]` with `initial_state() -> ScanState()`. +`ScanState(ArrowSerializableDataclass)` carries `started: bool`, `offset: int`, +`rows_ipc: bytes` — all plainly serializable, so they survive the continuation +token. On the **first** tick `process()` reads the PDF, materializes the **full** +result batch into `rows_ipc` (via `result_to_ipc`), and sets `started`; each tick +then emits a **bounded `ROWS_PER_TICK`-row slice** from `offset`, advances +`offset`, and `out.finish()`es once drained (an empty/NULL source materializes 0 +rows and finishes immediately: `0 >= 0`). The `_build_*` helpers return the full +RecordBatch; `_stream_slice` does the cursor slicing. **The NULL/empty-source +early `out.finish()` paths stay.** Rows/schema are byte-identical to the old +emit-all path. + +Regression guard: `tests/harness.invoke_table_function(..., serialize_state=True)` +round-trips the state through `serialize_to_bytes`/`deserialize_from_bytes` +between every tick (1000-tick guard). `TestScanStateRoundTrip` / +`TestCursorSurvivesContinuation` in `test_tables.py` assert identical rows/order, +no dupes, termination, and bounded chunks (`>= 2` batches each `<= ROWS_PER_TICK` +— this is the **fail-old** assertion; old code emits exactly one batch). The +`structure.test` SQL case pages `manywords.pdf` (200 words > `ROWS_PER_TICK`) and +asserts `count(*) = 200` + an ordered head — over http that only terminates if +the cursor works. NOTE: a table-function arg can't be a `(SELECT ...)` subquery in +`.test`, so `tables`/`words` are driven via the **VARCHAR path overload** there. + ## The polymorphic `pdf` input (path OR bytes) — overloads, not AnyArrow Every function accepts the PDF as a **`VARCHAR` path** *or* a **`BLOB` of diff --git a/test/sql/data/manywords.pdf b/test/sql/data/manywords.pdf new file mode 100644 index 0000000000000000000000000000000000000000..e8e5819eb31b1f931f281bbcbc28cf44752a4322 GIT binary patch literal 2139 zcmah~+jgQz6n*C_$}wKhD2M{0#v6(nZv-`(s4GDzl>{lez?sfG%n!`G&HH@o6dF5@ zo#|$|Eb7#$UHj~Fs1s2eN-C4iQjx#^`tu*4K^I}ar>{T@IuPqVGy(AdCNlOr==dOx zT(%I=7 zSbrbo_pt+(gce{l4vqnIy`I2M5R@FM-KB z5j`p(>%MNI%^3!NhEd!5TZBPecaZb8Lwk<~jEte~TgYhy%;Jp#$8yys_omg!0{9$3DdN+k2e`D_D13QlyNc+upgLK~hhG%4mN`CeyV@ z0CNyjixw2XQOSe2XF3LTbK0s{HaRiKd~GCmI(fbUNzDcYXcHCRCj_YnosGW*d5H~x zJfBFtqguL`6>`QR6pBL*HA};1=H_8sL>ckMmDSwUa*)vG)5$dX9B)0kP`u5F%UsEv zd*ZP25J$4_@>)PnXal?_PPO}$li)~IqukHJ*LRF5I*AEprt_kgGQK$~W zk)kjoE}Iz_w^Z#1E{eM2gx)0n1DE!7o^G-|rr1wCEW(fND$8A&DW1LcuElN|r8hZx z93D38`D?w=dJMmwlWgRoAMEG&MICSO^p5+C?tNv_sW%gXs6Gv6IDH5qur>bcoDQ=y z4ael6%U#eYZriKnfnR24JIdW&X^xJvPt$q&A|K=ZFJkQ(3W}bG(Fd6PNWizO?qZ&; zZVkzbH6O;k>3AclqPmi63woGxmvj{BJdfm2YY}IpReAMg#$P1h$i65y)CS$o&u^6q~BcNiQM;l#eDw5&&RrD zLwx9|od%%hC4&{$P|O&4VZ|AFRt(++gO?;pT#0;+EC1YMNkz_D`{wi3o=O4JB5mfP t$x}?nStAfM7mXr)4f!BwR^)wT{SJwuhx?9P%m;}xqL8N|k>*gL{sn*?RwV!c literal 0 HcmV?d00001 diff --git a/test/sql/structure.test b/test/sql/structure.test index c7cd58f..3102378 100644 --- a/test/sql/structure.test +++ b/test/sql/structure.test @@ -83,6 +83,31 @@ SELECT bool_and(x0 < x1 AND top < bottom) FROM pdf.words('test/sql/data/words.pd ---- true +# ---- words: HTTP-continuation paging guard ---------------------------------- +# manywords.pdf has 200 words (> ROWS_PER_TICK = 64), so the scan must page +# across the limit-1 continuation boundary. Over the http transport this only +# terminates (and returns the full count) if the externalized scan cursor +# survives the round-trip; the old emit-all + state:None code would loop forever. + +query I +SELECT count(*) FROM pdf.words('test/sql/data/manywords.pdf'); +---- +200 + +# ordered head: words laid out in reading order recover the original sequence. +query T +SELECT text FROM pdf.words('test/sql/data/manywords.pdf') ORDER BY top, x0 LIMIT 3; +---- +w0000 +w0001 +w0002 + +# distinct words == total rows: every word emitted exactly once (no dupes). +query I +SELECT count(DISTINCT text) FROM pdf.words('test/sql/data/manywords.pdf'); +---- +200 + # ---- pages: geometry -------------------------------------------------------- query IRRI diff --git a/tests/fixtures.py b/tests/fixtures.py index 623a716..078ada5 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -87,6 +87,33 @@ def make_table_pdf() -> bytes: return buf.getvalue() +def make_many_words_pdf(n: int = 200) -> bytes: + """A page packed with ``n`` distinct, ordered words. + + Each word is ``w0000 w0001 ...`` so the total exceeds ``ROWS_PER_TICK`` and a + scan that pages over the HTTP limit-1 continuation boundary must span several + ticks. Words are laid out top-to-bottom, left-to-right so reading order + (``ORDER BY top, x0``) recovers the original sequence. Deterministic. + """ + buf = io.BytesIO() + c = _canvas(buf) + c.setTitle(KNOWN_TITLE) + # 612x792 US Letter; leave margins. ~10 words per line, lines every 18pt. + per_line = 10 + x_step = 52 + y_top = 760 + y_step = 18 + for i in range(n): + line = i // per_line + col = i % per_line + x = 36 + col * x_step + y = y_top - line * y_step + c.drawString(x, y, f"w{i:04d}") + c.showPage() + c.save() + return buf.getvalue() + + def make_multipage_pdf() -> bytes: """Three pages, so page-count / per-page filters are exercised.""" buf = io.BytesIO() @@ -158,6 +185,10 @@ def regenerate_sql_fixtures(data_dir: str) -> None: files = { "table.pdf": make_table_pdf(), "words.pdf": make_words_pdf(), + # manywords.pdf has > ROWS_PER_TICK (64) words so the SQL E2E suite makes + # the scan page across the HTTP limit-1 continuation boundary -- it only + # terminates over the http transport if the externalized cursor works. + "manywords.pdf": make_many_words_pdf(200), "multipage.pdf": make_multipage_pdf(), "meta.pdf": make_meta_pdf(), "form.pdf": make_form_pdf(), diff --git a/tests/harness.py b/tests/harness.py index 8b7a374..1df52f9 100644 --- a/tests/harness.py +++ b/tests/harness.py @@ -54,8 +54,17 @@ def invoke_table_function( *, named: dict[str, pa.Scalar] | None = None, positional: tuple[pa.Scalar, ...] = (), + serialize_state: bool = False, ) -> pa.Table: - """Run a (source) table function through bind -> init -> process -> table.""" + """Run a (source) table function through bind -> init -> process -> table. + + When ``serialize_state`` is True, the scan state is round-tripped through its + Arrow serialization between every ``process`` tick -- mimicking the stateless + HTTP transport, which wire-serializes the continuation state after each tick + and resumes by deserializing it. This proves the cursor survives batch + boundaries (the old emit-all + ``state: None`` code loops forever here). A + 1000-tick guard turns an infinite loop into a clean failure instead of a hang. + """ args = Arguments(positional=positional, named=named or {}) bind_req = BindRequest( @@ -80,9 +89,17 @@ def invoke_table_function( ) state = func_cls.initial_state(params) + state_type = type(state) if state is not None else None out = MockOutputCollector(bind_resp.output_schema) + guard = 0 while not out.finished: + guard += 1 + if guard > 1000: + raise AssertionError("process did not finish within 1000 ticks") func_cls.process(params, state, out) + if serialize_state and state is not None and state_type is not None: + # Round-trip the state exactly as the HTTP transport would per tick. + state = state_type.deserialize_from_bytes(state.serialize_to_bytes()) return pa.Table.from_batches(out.batches, schema=bind_resp.output_schema) diff --git a/tests/test_tables.py b/tests/test_tables.py index 4fbadcf..746e4bf 100644 --- a/tests/test_tables.py +++ b/tests/test_tables.py @@ -8,7 +8,9 @@ from __future__ import annotations import pyarrow as pa +import pytest +from vgi_pdf import tables as tables_mod from vgi_pdf.tables import ( PagesBytesFunction, TablesBytesFunction, @@ -68,3 +70,99 @@ def test_geometry(self) -> None: assert table.column("page").to_pylist() == [1, 2, 3] assert table.column("width").to_pylist() == [612.0, 612.0, 612.0] assert table.column("height").to_pylist() == [792.0, 792.0, 792.0] + + +class TestScanStateRoundTrip: + """The HTTP-continuation regression guard. + + A ``words`` result that exceeds ``ROWS_PER_TICK`` must page across the + stateless-transport limit-1 continuation boundary. Re-serializing the scan + state between every ``process`` tick reproduces what the HTTP transport does + on the wire. On the old emit-all + ``state: None`` code this loops forever + (re-reading the PDF and re-emitting row 0 each resume) and trips the harness + 1000-tick guard; on the cursor code it terminates with identical rows. + """ + + def test_words_identical_with_and_without_serialization(self) -> None: + # > ROWS_PER_TICK (64) words so the scan spans several ticks. + pdf = fx.make_many_words_pdf(200) + plain = invoke_table_function(WordsBytesFunction, positional=(_blob(pdf),)) + rt = invoke_table_function( + WordsBytesFunction, + positional=(_blob(pdf),), + serialize_state=True, + ) + assert plain.num_rows > tables_mod.ROWS_PER_TICK + # Identical rows, identical order -- no dupes, no drops, full termination. + assert rt.num_rows == plain.num_rows + assert rt.to_pylist() == plain.to_pylist() + # Each emitted word appears exactly once. + texts = rt.column("text").to_pylist() + assert len(texts) == len(set(texts)) + + def test_words_chunks_bounded(self) -> None: + # Drive the lifecycle directly to inspect per-tick emit sizes: no batch + # may exceed ROWS_PER_TICK, and every word survives exactly once. + from vgi.arguments import Arguments + from vgi.function_storage import BoundStorage, FunctionStorageSqlite + from vgi.invocation import FunctionType + from vgi.protocol import BindRequest, InitRequest + from vgi.table_function import ProcessParams + + from .harness import MockOutputCollector + + func = WordsBytesFunction + pdf = fx.make_many_words_pdf(200) + args = Arguments(positional=(_blob(pdf),), named={}) + bind_req = BindRequest(function_name=func.Meta.name, arguments=args, function_type=FunctionType.TABLE) + bind_resp = func.bind(bind_req) + init_req = InitRequest(bind_call=bind_req, output_schema=bind_resp.output_schema) + init_resp = func.global_init(init_req) + params = ProcessParams( + args=func._parse_arguments(func.FunctionArguments, args), + init_call=init_req, + init_response=init_resp, + output_schema=bind_resp.output_schema, + settings={}, + secrets={}, + storage=BoundStorage(FunctionStorageSqlite(":memory:"), init_resp.execution_id), + ) + state = func.initial_state(params) + out = MockOutputCollector(bind_resp.output_schema) + while not out.finished: + func.process(params, state, out) + state = type(state).deserialize_from_bytes(state.serialize_to_bytes()) + assert len(out.batches) >= 2 # genuinely paged + for b in out.batches: + assert b.num_rows <= tables_mod.ROWS_PER_TICK + + +class TestCursorSurvivesContinuation: + """``tables`` (one row per cell) also pages across the continuation boundary.""" + + def test_tables_identical_with_serialization(self) -> None: + pdf = fx.make_table_pdf() + plain = invoke_table_function(TablesBytesFunction, positional=(_blob(pdf),)) + rt = invoke_table_function(TablesBytesFunction, positional=(_blob(pdf),), serialize_state=True) + assert rt.to_pylist() == plain.to_pylist() + + def test_small_chunk_spans_batches(self, monkeypatch: pytest.MonkeyPatch) -> None: + # Force ROWS_PER_TICK tiny so the 4-cell table genuinely spans batches, + # then prove the cursor round-trips correctly across each one. + monkeypatch.setattr(tables_mod, "ROWS_PER_TICK", 2) + pdf = fx.make_table_pdf() + plain = invoke_table_function(TablesBytesFunction, positional=(_blob(pdf),)) + rt = invoke_table_function(TablesBytesFunction, positional=(_blob(pdf),), serialize_state=True) + assert plain.num_rows == 4 + assert rt.num_rows == 4 + assert rt.to_pylist() == plain.to_pylist() + + def test_empty_result_terminates(self) -> None: + # Page filter selecting a non-existent page -> 0 rows; must still finish. + rt = invoke_table_function( + TablesBytesFunction, + positional=(_blob(fx.make_table_pdf()),), + named={"page": pa.scalar(2, type=pa.int32())}, + serialize_state=True, + ) + assert rt.num_rows == 0 diff --git a/vgi_pdf/tables.py b/vgi_pdf/tables.py index 3934d62..e4c8cde 100644 --- a/vgi_pdf/tables.py +++ b/vgi_pdf/tables.py @@ -39,12 +39,87 @@ bind_fixed_schema, init_single_worker, ) +from vgi_rpc import ArrowSerializableDataclass from vgi_rpc.rpc import OutputCollector from . import core from .core import PdfSource from .schema_utils import field +# --------------------------------------------------------------------------- +# Externalized scan cursor (HTTP-continuation fix) +# --------------------------------------------------------------------------- +# Over the stateless HTTP transport the framework wire-serializes a producer's +# per-scan state after every ``process`` tick and resumes by deserializing it, +# emitting at most one producer batch per response. A position-less ``state: +# None`` generator that emits ALL rows in one ``out.emit`` then finishes would +# restart from row 0 on every HTTP resume and loop forever once the output +# exceeds one producer batch (``words`` is hundreds-thousands of rows per PDF, +# ``tables`` one row per cell -- both genuinely unbounded). subprocess/unix +# keep state in-process so they hide the bug; only http (and the +# serialize-between-ticks unit test) expose it. +# +# Fix: carry an explicit cursor in the serializable ``ScanState`` -- the +# materialized full result batch (as IPC bytes) plus an integer ``offset``. +# Each tick emits a bounded ``ROWS_PER_TICK`` slice from ``offset``, advances +# ``offset``, and finishes when drained. Rows/schema are byte-identical to the +# old emit-all path. + +ROWS_PER_TICK = 64 # bounded slice per tick; cursor observable across HTTP limit-1 + + +@dataclass(kw_only=True) +class ScanState(ArrowSerializableDataclass): + """Externalized scan cursor, round-tripped across every ``process`` tick. + + ``started`` flips once the (possibly heavy) PDF source has been read and the + full result materialized; ``rows_ipc`` holds those result rows as IPC bytes; + ``offset`` is the next unemitted row. All fields wire-serialize through the + HTTP continuation token so a resumed tick sees the advanced offset and emits + the next slice (or finishes) -- never re-reads the PDF from row 0. + + ``started`` distinguishes "not yet read" from "read an empty/NULL source". + """ + + started: bool = False + offset: int = 0 + rows_ipc: bytes = b"" + + +def result_to_ipc(batch: pa.RecordBatch) -> bytes: + """Serialize a single RecordBatch to Arrow IPC stream bytes.""" + sink = pa.BufferOutputStream() + with pa.ipc.new_stream(sink, batch.schema) as writer: # type: ignore[no-untyped-call] + writer.write_batch(batch) + result: bytes = sink.getvalue().to_pybytes() + return result + + +def ipc_to_table(value: bytes) -> pa.Table: + """Read Arrow IPC stream bytes back into a Table.""" + reader = pa.ipc.open_stream(pa.BufferReader(value)) # type: ignore[no-untyped-call] + return reader.read_all() + + +def _stream_slice(state: ScanState, schema: pa.Schema, out: OutputCollector) -> None: + """Emit one bounded slice from ``state.offset``; finish when drained. + + The materialized full batch lives in ``state.rows_ipc`` (the source of + truth across the wire). This emits at most ``ROWS_PER_TICK`` rows starting + at ``state.offset``, advances ``offset``, and calls ``out.finish()`` once + ``offset >= total`` (an empty result terminates immediately: 0 >= 0). + """ + table = ipc_to_table(state.rows_ipc) + total = table.num_rows + if state.offset >= total: + out.finish() + return + end = min(state.offset + ROWS_PER_TICK, total) + chunk = table.slice(state.offset, end - state.offset) + out.emit(chunk.combine_chunks().to_batches()[0]) + state.offset = end + + # Optional 1-based page filter shared by ``tables`` and ``words``. NULL means # "all pages". Explicit ``arrow_type`` so a supplied INTEGER binds correctly # (without it the ``None`` default makes the SDK infer a NULL Arrow type). @@ -80,25 +155,22 @@ class _PagesBytesArgs: pdf: Annotated[bytes | None, Arg(0, arrow_type=pa.binary(), doc="Raw PDF bytes.")] -def _emit_pages(src: PdfSource, out: OutputCollector, schema: pa.Schema) -> None: +def _build_pages(src: PdfSource, schema: pa.Schema) -> pa.RecordBatch: rows = core.pages(src) - out.emit( - pa.RecordBatch.from_pydict( - { - "page": [r[0] for r in rows], - "width": [r[1] for r in rows], - "height": [r[2] for r in rows], - "rotation": [r[3] for r in rows], - }, - schema=schema, - ) + return pa.RecordBatch.from_pydict( + { + "page": [r[0] for r in rows], + "width": [r[1] for r in rows], + "height": [r[2] for r in rows], + "rotation": [r[3] for r in rows], + }, + schema=schema, ) - out.finish() @init_single_worker @bind_fixed_schema -class PagesPathFunction(TableFunctionGenerator[_PagesPathArgs]): +class PagesPathFunction(TableFunctionGenerator[_PagesPathArgs, ScanState]): """``pages(path)`` -- per-page geometry of a PDF at a filesystem path.""" FIXED_SCHEMA: ClassVar[pa.Schema] = _PAGES_SCHEMA @@ -122,18 +194,26 @@ def cardinality(cls, params: BindParams[_PagesPathArgs]) -> TableCardinality: return TableCardinality(estimate=10, max=None) @classmethod - def process(cls, params: ProcessParams[_PagesPathArgs], state: None, out: OutputCollector) -> None: - """Emit output rows for the bound PDF input.""" - src = PdfSource.from_path(params.args.pdf) - if src is None: - out.finish() - return - _emit_pages(src, out, params.output_schema) + def initial_state(cls, params: ProcessParams[_PagesPathArgs]) -> ScanState: + """Return a fresh scan-state cursor for a new execution.""" + return ScanState() + + @classmethod + def process(cls, params: ProcessParams[_PagesPathArgs], state: ScanState, out: OutputCollector) -> None: + """Materialize the page batch once, then stream bounded slices.""" + if not state.started: + src = PdfSource.from_path(params.args.pdf) + if src is None: + out.finish() + return + state.rows_ipc = result_to_ipc(_build_pages(src, params.output_schema)) + state.started = True + _stream_slice(state, params.output_schema, out) @init_single_worker @bind_fixed_schema -class PagesBytesFunction(TableFunctionGenerator[_PagesBytesArgs]): +class PagesBytesFunction(TableFunctionGenerator[_PagesBytesArgs, ScanState]): """``pages(blob)`` -- per-page geometry of a PDF passed as bytes.""" FIXED_SCHEMA: ClassVar[pa.Schema] = _PAGES_SCHEMA @@ -157,13 +237,21 @@ def cardinality(cls, params: BindParams[_PagesBytesArgs]) -> TableCardinality: return TableCardinality(estimate=10, max=None) @classmethod - def process(cls, params: ProcessParams[_PagesBytesArgs], state: None, out: OutputCollector) -> None: - """Emit output rows for the bound PDF input.""" - src = PdfSource.from_bytes(params.args.pdf) - if src is None: - out.finish() - return - _emit_pages(src, out, params.output_schema) + def initial_state(cls, params: ProcessParams[_PagesBytesArgs]) -> ScanState: + """Return a fresh scan-state cursor for a new execution.""" + return ScanState() + + @classmethod + def process(cls, params: ProcessParams[_PagesBytesArgs], state: ScanState, out: OutputCollector) -> None: + """Materialize the page batch once, then stream bounded slices.""" + if not state.started: + src = PdfSource.from_bytes(params.args.pdf) + if src is None: + out.finish() + return + state.rows_ipc = result_to_ipc(_build_pages(src, params.output_schema)) + state.started = True + _stream_slice(state, params.output_schema, out) # --------------------------------------------------------------------------- @@ -194,27 +282,24 @@ class _WordsBytesArgs: page: Annotated[int | None, _PAGE] -def _emit_words(src: PdfSource, page: int | None, out: OutputCollector, schema: pa.Schema) -> None: +def _build_words(src: PdfSource, page: int | None, schema: pa.Schema) -> pa.RecordBatch: rows = core.words(src, page) - out.emit( - pa.RecordBatch.from_pydict( - { - "page": [r[0] for r in rows], - "text": [r[1] for r in rows], - "x0": [r[2] for r in rows], - "top": [r[3] for r in rows], - "x1": [r[4] for r in rows], - "bottom": [r[5] for r in rows], - }, - schema=schema, - ) + return pa.RecordBatch.from_pydict( + { + "page": [r[0] for r in rows], + "text": [r[1] for r in rows], + "x0": [r[2] for r in rows], + "top": [r[3] for r in rows], + "x1": [r[4] for r in rows], + "bottom": [r[5] for r in rows], + }, + schema=schema, ) - out.finish() @init_single_worker @bind_fixed_schema -class WordsPathFunction(TableFunctionGenerator[_WordsPathArgs]): +class WordsPathFunction(TableFunctionGenerator[_WordsPathArgs, ScanState]): """``words(path[, page := ...])`` -- per-word boxes for a PDF at a path.""" FIXED_SCHEMA: ClassVar[pa.Schema] = _WORDS_SCHEMA @@ -242,18 +327,26 @@ def cardinality(cls, params: BindParams[_WordsPathArgs]) -> TableCardinality: return TableCardinality(estimate=500, max=None) @classmethod - def process(cls, params: ProcessParams[_WordsPathArgs], state: None, out: OutputCollector) -> None: - """Emit output rows for the bound PDF input.""" - src = PdfSource.from_path(params.args.pdf) - if src is None: - out.finish() - return - _emit_words(src, params.args.page, out, params.output_schema) + def initial_state(cls, params: ProcessParams[_WordsPathArgs]) -> ScanState: + """Return a fresh scan-state cursor for a new execution.""" + return ScanState() + + @classmethod + def process(cls, params: ProcessParams[_WordsPathArgs], state: ScanState, out: OutputCollector) -> None: + """Materialize the word batch once, then stream bounded slices.""" + if not state.started: + src = PdfSource.from_path(params.args.pdf) + if src is None: + out.finish() + return + state.rows_ipc = result_to_ipc(_build_words(src, params.args.page, params.output_schema)) + state.started = True + _stream_slice(state, params.output_schema, out) @init_single_worker @bind_fixed_schema -class WordsBytesFunction(TableFunctionGenerator[_WordsBytesArgs]): +class WordsBytesFunction(TableFunctionGenerator[_WordsBytesArgs, ScanState]): """``words(blob[, page := ...])`` -- per-word boxes for a PDF as bytes.""" FIXED_SCHEMA: ClassVar[pa.Schema] = _WORDS_SCHEMA @@ -277,13 +370,21 @@ def cardinality(cls, params: BindParams[_WordsBytesArgs]) -> TableCardinality: return TableCardinality(estimate=500, max=None) @classmethod - def process(cls, params: ProcessParams[_WordsBytesArgs], state: None, out: OutputCollector) -> None: - """Emit output rows for the bound PDF input.""" - src = PdfSource.from_bytes(params.args.pdf) - if src is None: - out.finish() - return - _emit_words(src, params.args.page, out, params.output_schema) + def initial_state(cls, params: ProcessParams[_WordsBytesArgs]) -> ScanState: + """Return a fresh scan-state cursor for a new execution.""" + return ScanState() + + @classmethod + def process(cls, params: ProcessParams[_WordsBytesArgs], state: ScanState, out: OutputCollector) -> None: + """Materialize the word batch once, then stream bounded slices.""" + if not state.started: + src = PdfSource.from_bytes(params.args.pdf) + if src is None: + out.finish() + return + state.rows_ipc = result_to_ipc(_build_words(src, params.args.page, params.output_schema)) + state.started = True + _stream_slice(state, params.output_schema, out) # --------------------------------------------------------------------------- @@ -316,26 +417,23 @@ class _TablesBytesArgs: page: Annotated[int | None, _PAGE] -def _emit_tables(src: PdfSource, page: int | None, out: OutputCollector, schema: pa.Schema) -> None: +def _build_tables(src: PdfSource, page: int | None, schema: pa.Schema) -> pa.RecordBatch: rows = core.tables(src, page) - out.emit( - pa.RecordBatch.from_pydict( - { - "page": [r[0] for r in rows], - "table_index": [r[1] for r in rows], - "row": [r[2] for r in rows], - "col": [r[3] for r in rows], - "value": [r[4] for r in rows], - }, - schema=schema, - ) + return pa.RecordBatch.from_pydict( + { + "page": [r[0] for r in rows], + "table_index": [r[1] for r in rows], + "row": [r[2] for r in rows], + "col": [r[3] for r in rows], + "value": [r[4] for r in rows], + }, + schema=schema, ) - out.finish() @init_single_worker @bind_fixed_schema -class TablesPathFunction(TableFunctionGenerator[_TablesPathArgs]): +class TablesPathFunction(TableFunctionGenerator[_TablesPathArgs, ScanState]): """``tables(path[, page := ...])`` -- long-format table cells (PDF at a path).""" FIXED_SCHEMA: ClassVar[pa.Schema] = _TABLES_SCHEMA @@ -363,18 +461,26 @@ def cardinality(cls, params: BindParams[_TablesPathArgs]) -> TableCardinality: return TableCardinality(estimate=100, max=None) @classmethod - def process(cls, params: ProcessParams[_TablesPathArgs], state: None, out: OutputCollector) -> None: - """Emit output rows for the bound PDF input.""" - src = PdfSource.from_path(params.args.pdf) - if src is None: - out.finish() - return - _emit_tables(src, params.args.page, out, params.output_schema) + def initial_state(cls, params: ProcessParams[_TablesPathArgs]) -> ScanState: + """Return a fresh scan-state cursor for a new execution.""" + return ScanState() + + @classmethod + def process(cls, params: ProcessParams[_TablesPathArgs], state: ScanState, out: OutputCollector) -> None: + """Materialize the cell batch once, then stream bounded slices.""" + if not state.started: + src = PdfSource.from_path(params.args.pdf) + if src is None: + out.finish() + return + state.rows_ipc = result_to_ipc(_build_tables(src, params.args.page, params.output_schema)) + state.started = True + _stream_slice(state, params.output_schema, out) @init_single_worker @bind_fixed_schema -class TablesBytesFunction(TableFunctionGenerator[_TablesBytesArgs]): +class TablesBytesFunction(TableFunctionGenerator[_TablesBytesArgs, ScanState]): """``tables(blob[, page := ...])`` -- long-format table cells (PDF as bytes).""" FIXED_SCHEMA: ClassVar[pa.Schema] = _TABLES_SCHEMA @@ -398,13 +504,21 @@ def cardinality(cls, params: BindParams[_TablesBytesArgs]) -> TableCardinality: return TableCardinality(estimate=100, max=None) @classmethod - def process(cls, params: ProcessParams[_TablesBytesArgs], state: None, out: OutputCollector) -> None: - """Emit output rows for the bound PDF input.""" - src = PdfSource.from_bytes(params.args.pdf) - if src is None: - out.finish() - return - _emit_tables(src, params.args.page, out, params.output_schema) + def initial_state(cls, params: ProcessParams[_TablesBytesArgs]) -> ScanState: + """Return a fresh scan-state cursor for a new execution.""" + return ScanState() + + @classmethod + def process(cls, params: ProcessParams[_TablesBytesArgs], state: ScanState, out: OutputCollector) -> None: + """Materialize the cell batch once, then stream bounded slices.""" + if not state.started: + src = PdfSource.from_bytes(params.args.pdf) + if src is None: + out.finish() + return + state.rows_ipc = result_to_ipc(_build_tables(src, params.args.page, params.output_schema)) + state.started = True + _stream_slice(state, params.output_schema, out) TABLE_FUNCTIONS: list[type] = [