Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,47 @@ function-shape split:
- **Structure (many rows per PDF) are table functions** (`tables`, `words`,
`pages`), which take the optional `page :=` filter.

## Table-function scan state = the HTTP-continuation cursor (READ THIS)

The structure table functions (`tables`, `words`, `pages`) emit **many rows per
PDF** — `words` is hundreds–thousands per page, `tables` one row per cell — so
the output routinely exceeds a single producer batch. That makes the
externalized **scan cursor** load-bearing, not optional.

Over the **stateless HTTP transport** the framework wire-serializes a producer's
per-scan state after every `process()` tick (`ArrowSerializableDataclass.
serialize_to_bytes()`), the client returns the continuation token, and the worker
resumes by deserializing it — emitting at most **one producer batch per HTTP
response**. A position-less `state: None` generator that did
`out.emit(...ALL rows...); out.finish()` would restart from row 0 on **every**
HTTP resume and **loop forever** once the output exceeds one batch.
subprocess/unix keep the live state in-process so they hide the bug; **only the
http leg (and the serialize-between-ticks unit test) expose it.**

Fix (in `tables.py`, mirrors vgi-search's `ScanState`): every function is a
`TableFunctionGenerator[Args, ScanState]` with `initial_state() -> ScanState()`.
`ScanState(ArrowSerializableDataclass)` carries `started: bool`, `offset: int`,
`rows_ipc: bytes` — all plainly serializable, so they survive the continuation
token. On the **first** tick `process()` reads the PDF, materializes the **full**
result batch into `rows_ipc` (via `result_to_ipc`), and sets `started`; each tick
then emits a **bounded `ROWS_PER_TICK`-row slice** from `offset`, advances
`offset`, and `out.finish()`es once drained (an empty/NULL source materializes 0
rows and finishes immediately: `0 >= 0`). The `_build_*` helpers return the full
RecordBatch; `_stream_slice` does the cursor slicing. **The NULL/empty-source
early `out.finish()` paths stay.** Rows/schema are byte-identical to the old
emit-all path.

Regression guard: `tests/harness.invoke_table_function(..., serialize_state=True)`
round-trips the state through `serialize_to_bytes`/`deserialize_from_bytes`
between every tick (1000-tick guard). `TestScanStateRoundTrip` /
`TestCursorSurvivesContinuation` in `test_tables.py` assert identical rows/order,
no dupes, termination, and bounded chunks (`>= 2` batches each `<= ROWS_PER_TICK`
— this is the **fail-old** assertion; old code emits exactly one batch). The
`structure.test` SQL case pages `manywords.pdf` (200 words > `ROWS_PER_TICK`) and
asserts `count(*) = 200` + an ordered head — over http that only terminates if
the cursor works. NOTE: a table-function arg can't be a `(SELECT ...)` subquery in
`.test`, so `tables`/`words` are driven via the **VARCHAR path overload** there.

## The polymorphic `pdf` input (path OR bytes) — overloads, not AnyArrow

Every function accepts the PDF as a **`VARCHAR` path** *or* a **`BLOB` of
Expand Down
Binary file added test/sql/data/manywords.pdf
Binary file not shown.
25 changes: 25 additions & 0 deletions test/sql/structure.test
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,31 @@ SELECT bool_and(x0 < x1 AND top < bottom) FROM pdf.words('test/sql/data/words.pd
----
true

# ---- words: HTTP-continuation paging guard ----------------------------------
# manywords.pdf has 200 words (> ROWS_PER_TICK = 64), so the scan must page
# across the limit-1 continuation boundary. Over the http transport this only
# terminates (and returns the full count) if the externalized scan cursor
# survives the round-trip; the old emit-all + state:None code would loop forever.

query I
SELECT count(*) FROM pdf.words('test/sql/data/manywords.pdf');
----
200

# ordered head: words laid out in reading order recover the original sequence.
query T
SELECT text FROM pdf.words('test/sql/data/manywords.pdf') ORDER BY top, x0 LIMIT 3;
----
w0000
w0001
w0002

# distinct words == total rows: every word emitted exactly once (no dupes).
query I
SELECT count(DISTINCT text) FROM pdf.words('test/sql/data/manywords.pdf');
----
200

# ---- pages: geometry --------------------------------------------------------

query IRRI
Expand Down
31 changes: 31 additions & 0 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,33 @@ def make_table_pdf() -> bytes:
return buf.getvalue()


def make_many_words_pdf(n: int = 200) -> bytes:
"""A page packed with ``n`` distinct, ordered words.

Each word is ``w0000 w0001 ...`` so the total exceeds ``ROWS_PER_TICK`` and a
scan that pages over the HTTP limit-1 continuation boundary must span several
ticks. Words are laid out top-to-bottom, left-to-right so reading order
(``ORDER BY top, x0``) recovers the original sequence. Deterministic.
"""
buf = io.BytesIO()
c = _canvas(buf)
c.setTitle(KNOWN_TITLE)
# 612x792 US Letter; leave margins. ~10 words per line, lines every 18pt.
per_line = 10
x_step = 52
y_top = 760
y_step = 18
for i in range(n):
line = i // per_line
col = i % per_line
x = 36 + col * x_step
y = y_top - line * y_step
c.drawString(x, y, f"w{i:04d}")
c.showPage()
c.save()
return buf.getvalue()


def make_multipage_pdf() -> bytes:
"""Three pages, so page-count / per-page filters are exercised."""
buf = io.BytesIO()
Expand Down Expand Up @@ -158,6 +185,10 @@ def regenerate_sql_fixtures(data_dir: str) -> None:
files = {
"table.pdf": make_table_pdf(),
"words.pdf": make_words_pdf(),
# manywords.pdf has > ROWS_PER_TICK (64) words so the SQL E2E suite makes
# the scan page across the HTTP limit-1 continuation boundary -- it only
# terminates over the http transport if the externalized cursor works.
"manywords.pdf": make_many_words_pdf(200),
"multipage.pdf": make_multipage_pdf(),
"meta.pdf": make_meta_pdf(),
"form.pdf": make_form_pdf(),
Expand Down
19 changes: 18 additions & 1 deletion tests/harness.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,17 @@ def invoke_table_function(
*,
named: dict[str, pa.Scalar] | None = None,
positional: tuple[pa.Scalar, ...] = (),
serialize_state: bool = False,
) -> pa.Table:
"""Run a (source) table function through bind -> init -> process -> table."""
"""Run a (source) table function through bind -> init -> process -> table.

When ``serialize_state`` is True, the scan state is round-tripped through its
Arrow serialization between every ``process`` tick -- mimicking the stateless
HTTP transport, which wire-serializes the continuation state after each tick
and resumes by deserializing it. This proves the cursor survives batch
boundaries (the old emit-all + ``state: None`` code loops forever here). A
1000-tick guard turns an infinite loop into a clean failure instead of a hang.
"""
args = Arguments(positional=positional, named=named or {})

bind_req = BindRequest(
Expand All @@ -80,9 +89,17 @@ def invoke_table_function(
)

state = func_cls.initial_state(params)
state_type = type(state) if state is not None else None
out = MockOutputCollector(bind_resp.output_schema)

guard = 0
while not out.finished:
guard += 1
if guard > 1000:
raise AssertionError("process did not finish within 1000 ticks")
func_cls.process(params, state, out)
if serialize_state and state is not None and state_type is not None:
# Round-trip the state exactly as the HTTP transport would per tick.
state = state_type.deserialize_from_bytes(state.serialize_to_bytes())

return pa.Table.from_batches(out.batches, schema=bind_resp.output_schema)
98 changes: 98 additions & 0 deletions tests/test_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
from __future__ import annotations

import pyarrow as pa
import pytest

from vgi_pdf import tables as tables_mod
from vgi_pdf.tables import (
PagesBytesFunction,
TablesBytesFunction,
Expand Down Expand Up @@ -68,3 +70,99 @@ def test_geometry(self) -> None:
assert table.column("page").to_pylist() == [1, 2, 3]
assert table.column("width").to_pylist() == [612.0, 612.0, 612.0]
assert table.column("height").to_pylist() == [792.0, 792.0, 792.0]


class TestScanStateRoundTrip:
"""The HTTP-continuation regression guard.

A ``words`` result that exceeds ``ROWS_PER_TICK`` must page across the
stateless-transport limit-1 continuation boundary. Re-serializing the scan
state between every ``process`` tick reproduces what the HTTP transport does
on the wire. On the old emit-all + ``state: None`` code this loops forever
(re-reading the PDF and re-emitting row 0 each resume) and trips the harness
1000-tick guard; on the cursor code it terminates with identical rows.
"""

def test_words_identical_with_and_without_serialization(self) -> None:
# > ROWS_PER_TICK (64) words so the scan spans several ticks.
pdf = fx.make_many_words_pdf(200)
plain = invoke_table_function(WordsBytesFunction, positional=(_blob(pdf),))
rt = invoke_table_function(
WordsBytesFunction,
positional=(_blob(pdf),),
serialize_state=True,
)
assert plain.num_rows > tables_mod.ROWS_PER_TICK
# Identical rows, identical order -- no dupes, no drops, full termination.
assert rt.num_rows == plain.num_rows
assert rt.to_pylist() == plain.to_pylist()
# Each emitted word appears exactly once.
texts = rt.column("text").to_pylist()
assert len(texts) == len(set(texts))

def test_words_chunks_bounded(self) -> None:
# Drive the lifecycle directly to inspect per-tick emit sizes: no batch
# may exceed ROWS_PER_TICK, and every word survives exactly once.
from vgi.arguments import Arguments
from vgi.function_storage import BoundStorage, FunctionStorageSqlite
from vgi.invocation import FunctionType
from vgi.protocol import BindRequest, InitRequest
from vgi.table_function import ProcessParams

from .harness import MockOutputCollector

func = WordsBytesFunction
pdf = fx.make_many_words_pdf(200)
args = Arguments(positional=(_blob(pdf),), named={})
bind_req = BindRequest(function_name=func.Meta.name, arguments=args, function_type=FunctionType.TABLE)
bind_resp = func.bind(bind_req)
init_req = InitRequest(bind_call=bind_req, output_schema=bind_resp.output_schema)
init_resp = func.global_init(init_req)
params = ProcessParams(
args=func._parse_arguments(func.FunctionArguments, args),
init_call=init_req,
init_response=init_resp,
output_schema=bind_resp.output_schema,
settings={},
secrets={},
storage=BoundStorage(FunctionStorageSqlite(":memory:"), init_resp.execution_id),
)
state = func.initial_state(params)
out = MockOutputCollector(bind_resp.output_schema)
while not out.finished:
func.process(params, state, out)
state = type(state).deserialize_from_bytes(state.serialize_to_bytes())
assert len(out.batches) >= 2 # genuinely paged
for b in out.batches:
assert b.num_rows <= tables_mod.ROWS_PER_TICK


class TestCursorSurvivesContinuation:
"""``tables`` (one row per cell) also pages across the continuation boundary."""

def test_tables_identical_with_serialization(self) -> None:
pdf = fx.make_table_pdf()
plain = invoke_table_function(TablesBytesFunction, positional=(_blob(pdf),))
rt = invoke_table_function(TablesBytesFunction, positional=(_blob(pdf),), serialize_state=True)
assert rt.to_pylist() == plain.to_pylist()

def test_small_chunk_spans_batches(self, monkeypatch: pytest.MonkeyPatch) -> None:
# Force ROWS_PER_TICK tiny so the 4-cell table genuinely spans batches,
# then prove the cursor round-trips correctly across each one.
monkeypatch.setattr(tables_mod, "ROWS_PER_TICK", 2)
pdf = fx.make_table_pdf()
plain = invoke_table_function(TablesBytesFunction, positional=(_blob(pdf),))
rt = invoke_table_function(TablesBytesFunction, positional=(_blob(pdf),), serialize_state=True)
assert plain.num_rows == 4
assert rt.num_rows == 4
assert rt.to_pylist() == plain.to_pylist()

def test_empty_result_terminates(self) -> None:
# Page filter selecting a non-existent page -> 0 rows; must still finish.
rt = invoke_table_function(
TablesBytesFunction,
positional=(_blob(fx.make_table_pdf()),),
named={"page": pa.scalar(2, type=pa.int32())},
serialize_state=True,
)
assert rt.num_rows == 0
Loading
Loading