diff --git a/CLAUDE.md b/CLAUDE.md index a352d79..7a784c8 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -65,8 +65,10 @@ w.RunStdio() // or w.RunHttp("127.0.0.1:0") behind a --http flag - **Table**: a `vgi.TypedTableFunc[S]` wrapped with `vgi.AsTableFunction[S]`. Methods: `Name`, `Metadata`, `ArgumentSpecs` (from `vgi.DeriveArgSpecs(args{})`), `OnBind` (→ `vgi.BindSchema(schema)`), `NewState` (bind args with - `vgi.BindArgs`; do the network fetch here), `Process` (emit with `out.Emit`, - then `out.Finish()`). + `vgi.BindArgs`; do the network fetch here, store `Rows`+`Offset`), `Process` + (emit the next cursor slice with `out.Emit`, advance `Offset`, then + `out.Finish()` when the cursor is drained — see Sharp edge #1 for why the + cursor is mandatory for HTTP). **Argument struct tags** (`vgi.DeriveArgSpecs` / `vgi.BindArgs`): @@ -90,13 +92,31 @@ column (the NULL `cvss_score`), build a `array.Float64Builder` directly and call ## Sharp edges (learned the hard way) 1. **Table-function state is `gob`-encoded by the SDK** between `NewState` and - `Process` (it may cross a process boundary), and the SDK now **panics at + `Process` (it may cross a process boundary), and the SDK **panics at registration** if state isn't gob-encodable. So state `S` must have **exported, gob-encodable fields only** — no `arrow.Record`, no interfaces, - channels, funcs, or unexported fields. The pattern every table function here - uses: **fetch rows eagerly in `NewState`**, store plain exported Go slices - (`Rows []CVERow`) plus a `Done bool`, and **rebuild the Arrow batch in - `Process`**. `TestRegisterDoesNotPanic` guards against regressions. + channels, funcs, or unexported fields. (The exported-field check counts named + exported fields, so an embedded helper type must itself be **exported** — + `CursorState`, not `cursorState` — or the worker panics at startup.) The + pattern every table function here uses: **fetch rows eagerly in `NewState`**, + store a plain exported Go slice (`Rows []CVERow`) plus an explicit cursor, and + **rebuild the Arrow batch in `Process`**. `TestRegisterDoesNotPanic` guards + against regressions. + + **Streaming state MUST carry an explicit cursor, not a bare `Done bool`** + (the HTTP-continuation invariant). Over the **stateless HTTP transport** the + worker keeps no live state between `Process` ticks — the framework + round-trips the producer state through a continuation token (gob-snapshotting + the user state each tick, emitting ≤1 data batch per response, resuming from + the token). A `Done` flag flipped *after* the single `Emit` observes the + pre-`Emit` snapshot on resume, re-emits the same rows forever, and pins the + worker in an infinite loop (subprocess/unix hold live state in memory, so they + never hit it). The fix: an exported cursor `Rows []CVERow; Offset int` + (`CursorState`). `Process` emits a bounded slice from `Offset`, advances + `Offset` **before** yielding, and `out.Finish()`es when `Offset >= len(Rows)`. + The framework snapshots `Offset` into the token, so HTTP resumes correctly and + terminates. `TestCursorSurvivesContinuation` (gob round-trips between ticks) + guards this. 2. **`haybarn-unittest` silently SKIPS `require vgi`.** Under haybarn the extension is not autoloaded for `require`, so a `.test` using `require vgi` diff --git a/ci/README.md b/ci/README.md index 76490c6..eebee24 100644 --- a/ci/README.md +++ b/ci/README.md @@ -34,33 +34,39 @@ regardless of how DuckDB talks to the worker, so the script always builds and starts `mockserver`, exports `VGI_CVE_TEST_URL`, and trap-kills it (plus any out-of-band worker process) on exit. +The **full** suite (offline scalars **and** the table-function `cve_api.test`) +runs over all three transports — **no tests are gated**. + ### HTTP transport specifics -Two things are required for the **http** leg, both handled by -`run-integration.sh` automatically: - -1. **`httpfs` must be loaded.** The vgi extension drives the worker-RPC HTTP - POSTs through DuckDB's `HTTPUtil`, which is only registered once the signed - core `httpfs` extension is loaded. The `.test` files only `LOAD vgi`, so for - the http leg the script injects `INSTALL httpfs FROM core; LOAD httpfs;` - after each `LOAD vgi;` in the staged copies. Without it every worker request - fails with an `HTTP`-flavoured error that the runner silently skips. - -2. **`cve_api.test` is GATED on http** (runs on subprocess/unix only). - The `cve` / `cve_search` / `cpe_cves` table functions stream their result - across multiple `Process` exchanges and signal end-of-stream with - per-execution state (`state.Done`): the first `Process` emits the batch, the - next returns `Finish()`. The vgi extension's HTTP transport is **stateless** - — each RPC is an independent request, so the worker's per-execution state - does not persist between the two exchanges (the SDK itself disables its - deferred storage cleanup in HTTP mode: *"no reliable stream-end signal"*). - The `Done` flag resets on every request, `Process` re-emits the same batch - forever, and the scan never reaches `Finish()` — the worker spins - re-binding indefinitely. This is the documented *"partition-local state - across exchanges"* HTTP limitation, **not** a flaky failure, so we gate the - file rather than fake a pass. The offline CVSS scalars (`cvss_offline.test`) - are plain request/response with no streaming state and **do** run over http. - The gate list is `HTTP_GATED_TESTS` in `run-integration.sh`. +The **http** leg needs `httpfs` loaded, handled by `run-integration.sh` +automatically: the vgi extension drives the worker-RPC HTTP POSTs through +DuckDB's `HTTPUtil`, which is only registered once the signed core `httpfs` +extension is loaded. The `.test` files only `LOAD vgi`, so for the http leg the +script injects `INSTALL httpfs FROM core; LOAD httpfs;` after each `LOAD vgi;` +in the staged copies. Without it every worker request fails with an +`HTTP`-flavoured error that the runner silently skips. + +#### Streaming table functions over HTTP (the cursor pattern) + +The `cve` / `cve_search` / `cpe_cves` table functions stream their result across +multiple `Process` exchanges. Over the **stateless** HTTP transport the worker +holds no live state between ticks — the framework round-trips the producer state +through an opaque continuation token (gob-encoding the user state after each +tick, emitting at most one data batch per response, then resuming from the +token). The position therefore **must live in the serialized state**: a bare +post-`Emit` `Done bool` observes the pre-`Emit` snapshot on resume, re-emits the +same rows forever, and pins the worker in an infinite loop (subprocess/unix keep +the live state in memory, so they were unaffected and hid the bug). + +The fix is an explicit gob-encodable **cursor** in the state — `Rows []CVERow; +Offset int` (`CursorState` in `internal/cveworker/functions.go`). `Process` +emits a bounded slice starting at `Offset`, advances `Offset` **before** +yielding, and calls `out.Finish()` once `Offset >= len(Rows)`. Because the +framework snapshots `Offset` into each continuation token, HTTP resumes from the +right row and terminates. `TestCursorSurvivesContinuation` guards this by +gob-round-tripping the state between every simulated tick. This is the reference +pattern for every streaming Go table function that must work over HTTP. ### Silent-skip guard (no fake passes) diff --git a/ci/run-integration.sh b/ci/run-integration.sh index 5bae259..78e0954 100755 --- a/ci/run-integration.sh +++ b/ci/run-integration.sh @@ -160,38 +160,16 @@ case "$TRANSPORT" in ;; esac -# Tests GATED for the http transport (run on subprocess/unix only). See the -# block below and ci/README.md for the protocol reason — these are real -# stateless-HTTP limitations, not flaky failures, so we never fake a pass. -HTTP_GATED_TESTS="cve_api.test" - # --- Stage the preprocessed tests ------------------------------------------- +# The FULL suite runs over every transport, including http. The cve/cve_search/ +# cpe_cves streaming table functions work over the stateless HTTP transport +# because their state carries an explicit gob-encodable cursor (Rows + Offset) +# that the framework snapshots into the continuation token each tick — see the +# "WHY A CURSOR" comment in internal/cveworker/functions.go. No tests are gated. echo "Staging preprocessed tests into $STAGE ..." mkdir -p "$STAGE/test/sql" for f in "$REPO"/test/sql/*.test; do - base="$(basename "$f")" - # Gate stateful-streaming table-function tests out of the http leg. The cve/ - # cve_search/cpe_cves table functions stream their result across multiple - # Process exchanges, signalling end-of-stream with per-execution state - # (state.Done): the FIRST Process emits the batch, the NEXT returns Finish(). - # The vgi extension's HTTP transport is STATELESS — each RPC is an independent - # request, so the worker's per-execution state object does not persist across - # the two exchanges (the SDK itself disables deferred storage cleanup in HTTP - # mode: "no reliable stream-end signal"). The Done flag therefore resets every - # request, Process re-emits the same batch forever, and the scan never reaches - # Finish() — the worker spins re-binding indefinitely. This is the recipe's - # documented "partition-local state across exchanges" HTTP limitation. Run the - # offline-scalar coverage over http (it is request/response, no streaming - # state) and gate the table-function file to subprocess/unix. - if [ "$TRANSPORT" = "http" ]; then - gated="" - for g in $HTTP_GATED_TESTS; do [ "$g" = "$base" ] && gated=1; done - if [ -n "$gated" ]; then - echo "::notice::GATED on http: $base (stateful streaming table functions cannot stream over the stateless HTTP transport)" - continue - fi - fi - awk -f "$HERE/preprocess-require.awk" "$f" > "$STAGE/test/sql/$base" + awk -f "$HERE/preprocess-require.awk" "$f" > "$STAGE/test/sql/$(basename "$f")" done # The HTTP transport needs DuckDB's HTTP client, which the vgi extension drives diff --git a/go.mod b/go.mod index 1ecab42..7c3a8b4 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/Query-farm/vgi-cve go 1.25.0 require ( - github.com/Query-farm/vgi-go v0.1.2 + github.com/Query-farm/vgi-go v0.2.0 github.com/Query-farm/vgi-rpc-go v0.9.4 github.com/apache/arrow-go/v18 v18.5.2 ) diff --git a/go.sum b/go.sum index a1e1d5f..732e37e 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/Query-farm/vgi-go v0.1.2 h1:RLT7YjPwAj/+MH2nHEmxfxzfnKEZgp/hj7U9rXMz09g= -github.com/Query-farm/vgi-go v0.1.2/go.mod h1:LuHl5mhOtDaGj/j5sE0kacXG7cVrcr7E2iCgR3TBVTk= +github.com/Query-farm/vgi-go v0.2.0 h1:S0v+4nWvys7DBdzTVkkwqsSp2EHG91RdF9QEotsZ9tI= +github.com/Query-farm/vgi-go v0.2.0/go.mod h1:LuHl5mhOtDaGj/j5sE0kacXG7cVrcr7E2iCgR3TBVTk= github.com/Query-farm/vgi-rpc-go v0.9.4 h1:Da+0bNrQkTH2rHXfDZ3i52UldUov4xJx+fWe26n/1Lk= github.com/Query-farm/vgi-rpc-go v0.9.4/go.mod h1:XbQBjp31eFKIZaBqmg0q9r2Mxp/LTPeAG97w1qy5gmI= github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= diff --git a/internal/cveworker/functions.go b/internal/cveworker/functions.go index 263ba2c..9067216 100644 --- a/internal/cveworker/functions.go +++ b/internal/cveworker/functions.go @@ -26,12 +26,62 @@ const CatalogName = "cve" // between NewState and Process (it may cross a process boundary). State structs // must therefore hold only EXPORTED, gob-encodable fields — no arrow.Record, no // interfaces, channels, funcs, or unexported fields. Each table function fetches -// its rows eagerly in NewState, stores plain exported Go slices plus a Done -// flag, and rebuilds the Arrow batch in Process. - -// emitState carries the "already emitted" flag shared by the table functions. -type emitState struct { - Done bool +// its rows eagerly in NewState, stores a plain exported Go slice (Rows) plus an +// EXPLICIT CURSOR (Offset), and rebuilds the Arrow batch in Process. +// +// WHY A CURSOR, NOT A bool Done (the HTTP-continuation fix): +// +// Over the HTTP transport the worker is STATELESS across exchanges — there is no +// long-lived process holding the live state between Process ticks. Instead the +// framework round-trips the producer state through an opaque continuation token: +// after each tick it gob-encodes the state (snapshotting the LIVE user state), +// the client returns the token, and the worker resumes by gob-decoding it. The +// HTTP server emits at most `producerBatchLimit` data batches per response +// (the SDK sets this to 1), so a producer that has more to emit is always +// resumed mid-stream from its token. +// +// The position MUST therefore live in the serialized state. A bare `Done bool` +// that is only flipped *after* the single Emit does not reliably survive the +// limit-1 continuation boundary: the resumed tick observes the pre-Emit state, +// re-emits the same rows, and the scan never terminates (an infinite loop that +// pins the worker — subprocess/unix keep the live state in memory, so they were +// unaffected and hid the bug). Carrying an explicit Offset that Process advances +// BEFORE yielding makes the snapshot authoritative: the resume sees the advanced +// Offset and emits the next slice (or Finishes when Offset >= len(Rows)). This +// is the reference pattern for every streaming Go table function over HTTP. +// +// rowsPerTick bounds how many rows each Process tick emits. Emitting the whole +// result in one batch is fine for these small NVD result sets, but emitting a +// bounded slice and advancing the cursor each tick is what makes the cursor +// observable across the continuation boundary (and scales to large results). +const rowsPerTick = 64 + +// CursorState is the shared streaming cursor embedded by every table-function +// state: the eagerly fetched rows plus the offset of the next unemitted row. +// Both fields are exported so gob round-trips them through the HTTP continuation +// token. The TYPE is exported too (CursorState, not cursorState) because the SDK +// counts a state struct's exported FIELDS at registration to verify it is +// gob-encodable — an embedded field named after an unexported type would not be +// counted and the worker would panic at startup. +type CursorState struct { + Rows []CVERow + Offset int +} + +// nextSlice returns the next bounded slice of rows to emit and advances the +// cursor past them. It reports done=true once the cursor has consumed all rows, +// at which point Process should call out.Finish(). +func (c *CursorState) nextSlice() (slice []CVERow, done bool) { + if c.Offset >= len(c.Rows) { + return nil, true + } + end := c.Offset + rowsPerTick + if end > len(c.Rows) { + end = len(c.Rows) + } + slice = c.Rows[c.Offset:end] + c.Offset = end + return slice, false } // =========================================================================== @@ -160,10 +210,9 @@ type cveArgs struct { APIKey string `vgi:"name=api_key,default=,doc=NVD API key (raises the rate limit)"` } -// cveState holds the at-most-one fetched row (gob-encodable) plus the emit flag. +// cveState holds the at-most-one fetched row (gob-encodable) plus the cursor. type cveState struct { - emitState - Rows []CVERow + CursorState } // CVEFunction fetches one CVE by ID. @@ -202,15 +251,14 @@ func (f *CVEFunction) NewState(params *vgi.ProcessParams) (*cveState, error) { if row == nil { return &cveState{}, nil } - return &cveState{Rows: []CVERow{*row}}, nil + return &cveState{CursorState: CursorState{Rows: []CVERow{*row}}}, nil } func (f *CVEFunction) Process(_ context.Context, _ *vgi.ProcessParams, state *cveState, out *vgirpc.OutputCollector) error { - if state.Done { + r, done := state.nextSlice() + if done { return out.Finish() } - state.Done = true - r := state.Rows n := int64(len(r)) batch := array.NewRecordBatch(cveSchema, []arrow.Array{ vgi.BuildStringArray(n, func(i int64) string { return r[i].ID }), @@ -250,8 +298,7 @@ type cveSearchArgs struct { } type cveSearchState struct { - emitState - Rows []CVERow + CursorState } // CVESearchFunction runs a paginated keyword search. @@ -287,15 +334,14 @@ func (f *CVESearchFunction) NewState(params *vgi.ProcessParams) (*cveSearchState if err != nil { return nil, err } - return &cveSearchState{Rows: rows}, nil + return &cveSearchState{CursorState: CursorState{Rows: rows}}, nil } func (f *CVESearchFunction) Process(_ context.Context, _ *vgi.ProcessParams, state *cveSearchState, out *vgirpc.OutputCollector) error { - if state.Done { + r, done := state.nextSlice() + if done { return out.Finish() } - state.Done = true - r := state.Rows n := int64(len(r)) batch := array.NewRecordBatch(cveSearchSchema, []arrow.Array{ vgi.BuildStringArray(n, func(i int64) string { return r[i].ID }), @@ -330,8 +376,7 @@ type cpeCVEsArgs struct { } type cpeCVEsState struct { - emitState - Rows []CVERow + CursorState } // CPECVEsFunction lists CVEs for a CPE name. @@ -367,15 +412,14 @@ func (f *CPECVEsFunction) NewState(params *vgi.ProcessParams) (*cpeCVEsState, er if err != nil { return nil, err } - return &cpeCVEsState{Rows: rows}, nil + return &cpeCVEsState{CursorState: CursorState{Rows: rows}}, nil } func (f *CPECVEsFunction) Process(_ context.Context, _ *vgi.ProcessParams, state *cpeCVEsState, out *vgirpc.OutputCollector) error { - if state.Done { + r, done := state.nextSlice() + if done { return out.Finish() } - state.Done = true - r := state.Rows n := int64(len(r)) batch := array.NewRecordBatch(cpeCVEsSchema, []arrow.Array{ vgi.BuildStringArray(n, func(i int64) string { return r[i].ID }), diff --git a/internal/cveworker/functions_test.go b/internal/cveworker/functions_test.go index 9baf5cb..6cc3a3e 100644 --- a/internal/cveworker/functions_test.go +++ b/internal/cveworker/functions_test.go @@ -3,6 +3,8 @@ package cveworker import ( + "bytes" + "encoding/gob" "net/http/httptest" "testing" @@ -65,8 +67,8 @@ func TestCVEFunctionNewState(t *testing.T) { if st.Rows[0].ID != "CVE-2021-44228" || st.Rows[0].SeverityStr != "CRITICAL" { t.Errorf("unexpected row: %+v", st.Rows[0]) } - if st.Done { - t.Error("state should not be done before Process") + if st.Offset != 0 { + t.Errorf("cursor should start at offset 0 before Process, got %d", st.Offset) } } @@ -137,3 +139,76 @@ func TestRegisterDoesNotPanic(t *testing.T) { w := vgi.NewWorker(vgi.WithCatalogName(CatalogName)) Register(w) } + +// TestCursorSurvivesContinuation simulates the HTTP transport's stateless +// continuation: the producer state is gob round-tripped between every Process +// tick (exactly what the framework does when it snapshots the state into a +// continuation token and resumes from it). It asserts the cursor (a) advances +// across the boundary, (b) emits every row exactly once, and (c) terminates. +// +// This is the fast regression guard for the HTTP infinite-loop bug: a state that +// only carried a post-Emit `Done bool` would observe the pre-Emit snapshot on +// each resume, re-emit row 0 forever, and never finish. The explicit Offset +// cursor makes the snapshot authoritative, so this terminates with len(Rows) +// rows total. +func TestCursorSurvivesContinuation(t *testing.T) { + rows := make([]CVERow, 150) // > rowsPerTick, forcing several continuations + for i := range rows { + rows[i].ID = "CVE-X" + } + + // emitted counts how many rows each simulated tick produced. + type tick struct{ n int } + var ticks []tick + state := &cveSearchState{CursorState: CursorState{Rows: rows}} + + for i := 0; i < len(rows)+10; i++ { // generous upper bound; must terminate sooner + slice, done := state.nextSlice() + if done { + ticks = append(ticks, tick{n: -1}) // -1 marks the Finish tick + break + } + ticks = append(ticks, tick{n: len(slice)}) + + // Simulate the HTTP continuation boundary: gob-encode the live state and + // decode it back, exactly like the framework's token round-trip. If the + // cursor did not serialize, Offset would reset and the loop would not end. + encoded, err := gobRoundTrip(state) + if err != nil { + t.Fatalf("gob round-trip tick %d: %v", i, err) + } + state = encoded + } + + total := 0 + finished := false + for _, tk := range ticks { + if tk.n == -1 { + finished = true + break + } + total += tk.n + } + if !finished { + t.Fatalf("cursor never reached Finish() — emitted %d rows in %d ticks (infinite-loop regression)", total, len(ticks)) + } + if total != len(rows) { + t.Errorf("emitted %d rows across continuations, want %d (rows duplicated or dropped)", total, len(rows)) + } +} + +// gobRoundTrip encodes the cursor portion of a state through gob and decodes it +// back, mirroring how the HTTP transport serializes producer state into a +// continuation token. We round-trip the embedded CursorState (the part the +// framework's user-state snapshot carries) and rebuild the typed state. +func gobRoundTrip(s *cveSearchState) (*cveSearchState, error) { + var buf bytes.Buffer + if err := gob.NewEncoder(&buf).Encode(s.CursorState); err != nil { + return nil, err + } + var cs CursorState + if err := gob.NewDecoder(&buf).Decode(&cs); err != nil { + return nil, err + } + return &cveSearchState{CursorState: cs}, nil +}