Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ w.RunStdio() // or w.RunHttp("127.0.0.1:0") behind a --http flag
- **Table**: a `vgi.TypedTableFunc[S]` wrapped with `vgi.AsTableFunction[S]`.
Methods: `Name`, `Metadata`, `ArgumentSpecs` (from `vgi.DeriveArgSpecs(args{})`),
`OnBind` (→ `vgi.BindSchema(schema)`), `NewState` (bind args with
`vgi.BindArgs`; do the network fetch here), `Process` (emit with `out.Emit`,
then `out.Finish()`).
`vgi.BindArgs`; do the network fetch here, store `Rows`+`Offset`), `Process`
(emit the next cursor slice with `out.Emit`, advance `Offset`, then
`out.Finish()` when the cursor is drained — see Sharp edge #1 for why the
cursor is mandatory for HTTP).

**Argument struct tags** (`vgi.DeriveArgSpecs` / `vgi.BindArgs`):

Expand All @@ -90,13 +92,31 @@ column (the NULL `cvss_score`), build a `array.Float64Builder` directly and call
## Sharp edges (learned the hard way)

1. **Table-function state is `gob`-encoded by the SDK** between `NewState` and
`Process` (it may cross a process boundary), and the SDK now **panics at
`Process` (it may cross a process boundary), and the SDK **panics at
registration** if state isn't gob-encodable. So state `S` must have
**exported, gob-encodable fields only** — no `arrow.Record`, no interfaces,
channels, funcs, or unexported fields. The pattern every table function here
uses: **fetch rows eagerly in `NewState`**, store plain exported Go slices
(`Rows []CVERow`) plus a `Done bool`, and **rebuild the Arrow batch in
`Process`**. `TestRegisterDoesNotPanic` guards against regressions.
channels, funcs, or unexported fields. (The exported-field check counts named
exported fields, so an embedded helper type must itself be **exported** —
`CursorState`, not `cursorState` — or the worker panics at startup.) The
pattern every table function here uses: **fetch rows eagerly in `NewState`**,
store a plain exported Go slice (`Rows []CVERow`) plus an explicit cursor, and
**rebuild the Arrow batch in `Process`**. `TestRegisterDoesNotPanic` guards
against regressions.

**Streaming state MUST carry an explicit cursor, not a bare `Done bool`**
(the HTTP-continuation invariant). Over the **stateless HTTP transport** the
worker keeps no live state between `Process` ticks — the framework
round-trips the producer state through a continuation token (gob-snapshotting
the user state each tick, emitting ≤1 data batch per response, resuming from
the token). A `Done` flag flipped *after* the single `Emit` observes the
pre-`Emit` snapshot on resume, re-emits the same rows forever, and pins the
worker in an infinite loop (subprocess/unix hold live state in memory, so they
never hit it). The fix: an exported cursor `Rows []CVERow; Offset int`
(`CursorState`). `Process` emits a bounded slice from `Offset`, advances
`Offset` **before** yielding, and `out.Finish()`es when `Offset >= len(Rows)`.
The framework snapshots `Offset` into the token, so HTTP resumes correctly and
terminates. `TestCursorSurvivesContinuation` (gob round-trips between ticks)
guards this.

2. **`haybarn-unittest` silently SKIPS `require vgi`.** Under haybarn the
extension is not autoloaded for `require`, so a `.test` using `require vgi`
Expand Down
56 changes: 31 additions & 25 deletions ci/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,39 @@ regardless of how DuckDB talks to the worker, so the script always builds and
starts `mockserver`, exports `VGI_CVE_TEST_URL`, and trap-kills it (plus any
out-of-band worker process) on exit.

The **full** suite (offline scalars **and** the table-function `cve_api.test`)
runs over all three transports — **no tests are gated**.

### HTTP transport specifics

Two things are required for the **http** leg, both handled by
`run-integration.sh` automatically:

1. **`httpfs` must be loaded.** The vgi extension drives the worker-RPC HTTP
POSTs through DuckDB's `HTTPUtil`, which is only registered once the signed
core `httpfs` extension is loaded. The `.test` files only `LOAD vgi`, so for
the http leg the script injects `INSTALL httpfs FROM core; LOAD httpfs;`
after each `LOAD vgi;` in the staged copies. Without it every worker request
fails with an `HTTP`-flavoured error that the runner silently skips.

2. **`cve_api.test` is GATED on http** (runs on subprocess/unix only).
The `cve` / `cve_search` / `cpe_cves` table functions stream their result
across multiple `Process` exchanges and signal end-of-stream with
per-execution state (`state.Done`): the first `Process` emits the batch, the
next returns `Finish()`. The vgi extension's HTTP transport is **stateless**
— each RPC is an independent request, so the worker's per-execution state
does not persist between the two exchanges (the SDK itself disables its
deferred storage cleanup in HTTP mode: *"no reliable stream-end signal"*).
The `Done` flag resets on every request, `Process` re-emits the same batch
forever, and the scan never reaches `Finish()` — the worker spins
re-binding indefinitely. This is the documented *"partition-local state
across exchanges"* HTTP limitation, **not** a flaky failure, so we gate the
file rather than fake a pass. The offline CVSS scalars (`cvss_offline.test`)
are plain request/response with no streaming state and **do** run over http.
The gate list is `HTTP_GATED_TESTS` in `run-integration.sh`.
The **http** leg needs `httpfs` loaded, handled by `run-integration.sh`
automatically: the vgi extension drives the worker-RPC HTTP POSTs through
DuckDB's `HTTPUtil`, which is only registered once the signed core `httpfs`
extension is loaded. The `.test` files only `LOAD vgi`, so for the http leg the
script injects `INSTALL httpfs FROM core; LOAD httpfs;` after each `LOAD vgi;`
in the staged copies. Without it every worker request fails with an
`HTTP`-flavoured error that the runner silently skips.

#### Streaming table functions over HTTP (the cursor pattern)

The `cve` / `cve_search` / `cpe_cves` table functions stream their result across
multiple `Process` exchanges. Over the **stateless** HTTP transport the worker
holds no live state between ticks — the framework round-trips the producer state
through an opaque continuation token (gob-encoding the user state after each
tick, emitting at most one data batch per response, then resuming from the
token). The position therefore **must live in the serialized state**: a bare
post-`Emit` `Done bool` observes the pre-`Emit` snapshot on resume, re-emits the
same rows forever, and pins the worker in an infinite loop (subprocess/unix keep
the live state in memory, so they were unaffected and hid the bug).

The fix is an explicit gob-encodable **cursor** in the state — `Rows []CVERow;
Offset int` (`CursorState` in `internal/cveworker/functions.go`). `Process`
emits a bounded slice starting at `Offset`, advances `Offset` **before**
yielding, and calls `out.Finish()` once `Offset >= len(Rows)`. Because the
framework snapshots `Offset` into each continuation token, HTTP resumes from the
right row and terminates. `TestCursorSurvivesContinuation` guards this by
gob-round-tripping the state between every simulated tick. This is the reference
pattern for every streaming Go table function that must work over HTTP.

### Silent-skip guard (no fake passes)

Expand Down
34 changes: 6 additions & 28 deletions ci/run-integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -160,38 +160,16 @@ case "$TRANSPORT" in
;;
esac

# Tests GATED for the http transport (run on subprocess/unix only). See the
# block below and ci/README.md for the protocol reason — these are real
# stateless-HTTP limitations, not flaky failures, so we never fake a pass.
HTTP_GATED_TESTS="cve_api.test"

# --- Stage the preprocessed tests -------------------------------------------
# The FULL suite runs over every transport, including http. The cve/cve_search/
# cpe_cves streaming table functions work over the stateless HTTP transport
# because their state carries an explicit gob-encodable cursor (Rows + Offset)
# that the framework snapshots into the continuation token each tick — see the
# "WHY A CURSOR" comment in internal/cveworker/functions.go. No tests are gated.
echo "Staging preprocessed tests into $STAGE ..."
mkdir -p "$STAGE/test/sql"
for f in "$REPO"/test/sql/*.test; do
base="$(basename "$f")"
# Gate stateful-streaming table-function tests out of the http leg. The cve/
# cve_search/cpe_cves table functions stream their result across multiple
# Process exchanges, signalling end-of-stream with per-execution state
# (state.Done): the FIRST Process emits the batch, the NEXT returns Finish().
# The vgi extension's HTTP transport is STATELESS — each RPC is an independent
# request, so the worker's per-execution state object does not persist across
# the two exchanges (the SDK itself disables deferred storage cleanup in HTTP
# mode: "no reliable stream-end signal"). The Done flag therefore resets every
# request, Process re-emits the same batch forever, and the scan never reaches
# Finish() — the worker spins re-binding indefinitely. This is the recipe's
# documented "partition-local state across exchanges" HTTP limitation. Run the
# offline-scalar coverage over http (it is request/response, no streaming
# state) and gate the table-function file to subprocess/unix.
if [ "$TRANSPORT" = "http" ]; then
gated=""
for g in $HTTP_GATED_TESTS; do [ "$g" = "$base" ] && gated=1; done
if [ -n "$gated" ]; then
echo "::notice::GATED on http: $base (stateful streaming table functions cannot stream over the stateless HTTP transport)"
continue
fi
fi
awk -f "$HERE/preprocess-require.awk" "$f" > "$STAGE/test/sql/$base"
awk -f "$HERE/preprocess-require.awk" "$f" > "$STAGE/test/sql/$(basename "$f")"
done

# The HTTP transport needs DuckDB's HTTP client, which the vgi extension drives
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/Query-farm/vgi-cve
go 1.25.0

require (
github.com/Query-farm/vgi-go v0.1.2
github.com/Query-farm/vgi-go v0.2.0
github.com/Query-farm/vgi-rpc-go v0.9.4
github.com/apache/arrow-go/v18 v18.5.2
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/Query-farm/vgi-go v0.1.2 h1:RLT7YjPwAj/+MH2nHEmxfxzfnKEZgp/hj7U9rXMz09g=
github.com/Query-farm/vgi-go v0.1.2/go.mod h1:LuHl5mhOtDaGj/j5sE0kacXG7cVrcr7E2iCgR3TBVTk=
github.com/Query-farm/vgi-go v0.2.0 h1:S0v+4nWvys7DBdzTVkkwqsSp2EHG91RdF9QEotsZ9tI=
github.com/Query-farm/vgi-go v0.2.0/go.mod h1:LuHl5mhOtDaGj/j5sE0kacXG7cVrcr7E2iCgR3TBVTk=
github.com/Query-farm/vgi-rpc-go v0.9.4 h1:Da+0bNrQkTH2rHXfDZ3i52UldUov4xJx+fWe26n/1Lk=
github.com/Query-farm/vgi-rpc-go v0.9.4/go.mod h1:XbQBjp31eFKIZaBqmg0q9r2Mxp/LTPeAG97w1qy5gmI=
github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ=
Expand Down
94 changes: 69 additions & 25 deletions internal/cveworker/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,62 @@ const CatalogName = "cve"
// between NewState and Process (it may cross a process boundary). State structs
// must therefore hold only EXPORTED, gob-encodable fields — no arrow.Record, no
// interfaces, channels, funcs, or unexported fields. Each table function fetches
// its rows eagerly in NewState, stores plain exported Go slices plus a Done
// flag, and rebuilds the Arrow batch in Process.

// emitState carries the "already emitted" flag shared by the table functions.
type emitState struct {
Done bool
// its rows eagerly in NewState, stores a plain exported Go slice (Rows) plus an
// EXPLICIT CURSOR (Offset), and rebuilds the Arrow batch in Process.
//
// WHY A CURSOR, NOT A bool Done (the HTTP-continuation fix):
//
// Over the HTTP transport the worker is STATELESS across exchanges — there is no
// long-lived process holding the live state between Process ticks. Instead the
// framework round-trips the producer state through an opaque continuation token:
// after each tick it gob-encodes the state (snapshotting the LIVE user state),
// the client returns the token, and the worker resumes by gob-decoding it. The
// HTTP server emits at most `producerBatchLimit` data batches per response
// (the SDK sets this to 1), so a producer that has more to emit is always
// resumed mid-stream from its token.
//
// The position MUST therefore live in the serialized state. A bare `Done bool`
// that is only flipped *after* the single Emit does not reliably survive the
// limit-1 continuation boundary: the resumed tick observes the pre-Emit state,
// re-emits the same rows, and the scan never terminates (an infinite loop that
// pins the worker — subprocess/unix keep the live state in memory, so they were
// unaffected and hid the bug). Carrying an explicit Offset that Process advances
// BEFORE yielding makes the snapshot authoritative: the resume sees the advanced
// Offset and emits the next slice (or Finishes when Offset >= len(Rows)). This
// is the reference pattern for every streaming Go table function over HTTP.
//
// rowsPerTick bounds how many rows each Process tick emits. Emitting the whole
// result in one batch is fine for these small NVD result sets, but emitting a
// bounded slice and advancing the cursor each tick is what makes the cursor
// observable across the continuation boundary (and scales to large results).
const rowsPerTick = 64

// CursorState is the shared streaming cursor embedded by every table-function
// state: the eagerly fetched rows plus the offset of the next unemitted row.
// Both fields are exported so gob round-trips them through the HTTP continuation
// token. The TYPE is exported too (CursorState, not cursorState) because the SDK
// counts a state struct's exported FIELDS at registration to verify it is
// gob-encodable — an embedded field named after an unexported type would not be
// counted and the worker would panic at startup.
type CursorState struct {
Rows []CVERow
Offset int
}

// nextSlice returns the next bounded slice of rows to emit and advances the
// cursor past them. It reports done=true once the cursor has consumed all rows,
// at which point Process should call out.Finish().
func (c *CursorState) nextSlice() (slice []CVERow, done bool) {
if c.Offset >= len(c.Rows) {
return nil, true
}
end := c.Offset + rowsPerTick
if end > len(c.Rows) {
end = len(c.Rows)
}
slice = c.Rows[c.Offset:end]
c.Offset = end
return slice, false
}

// ===========================================================================
Expand Down Expand Up @@ -160,10 +210,9 @@ type cveArgs struct {
APIKey string `vgi:"name=api_key,default=,doc=NVD API key (raises the rate limit)"`
}

// cveState holds the at-most-one fetched row (gob-encodable) plus the emit flag.
// cveState holds the at-most-one fetched row (gob-encodable) plus the cursor.
type cveState struct {
emitState
Rows []CVERow
CursorState
}

// CVEFunction fetches one CVE by ID.
Expand Down Expand Up @@ -202,15 +251,14 @@ func (f *CVEFunction) NewState(params *vgi.ProcessParams) (*cveState, error) {
if row == nil {
return &cveState{}, nil
}
return &cveState{Rows: []CVERow{*row}}, nil
return &cveState{CursorState: CursorState{Rows: []CVERow{*row}}}, nil
}

func (f *CVEFunction) Process(_ context.Context, _ *vgi.ProcessParams, state *cveState, out *vgirpc.OutputCollector) error {
if state.Done {
r, done := state.nextSlice()
if done {
return out.Finish()
}
state.Done = true
r := state.Rows
n := int64(len(r))
batch := array.NewRecordBatch(cveSchema, []arrow.Array{
vgi.BuildStringArray(n, func(i int64) string { return r[i].ID }),
Expand Down Expand Up @@ -250,8 +298,7 @@ type cveSearchArgs struct {
}

type cveSearchState struct {
emitState
Rows []CVERow
CursorState
}

// CVESearchFunction runs a paginated keyword search.
Expand Down Expand Up @@ -287,15 +334,14 @@ func (f *CVESearchFunction) NewState(params *vgi.ProcessParams) (*cveSearchState
if err != nil {
return nil, err
}
return &cveSearchState{Rows: rows}, nil
return &cveSearchState{CursorState: CursorState{Rows: rows}}, nil
}

func (f *CVESearchFunction) Process(_ context.Context, _ *vgi.ProcessParams, state *cveSearchState, out *vgirpc.OutputCollector) error {
if state.Done {
r, done := state.nextSlice()
if done {
return out.Finish()
}
state.Done = true
r := state.Rows
n := int64(len(r))
batch := array.NewRecordBatch(cveSearchSchema, []arrow.Array{
vgi.BuildStringArray(n, func(i int64) string { return r[i].ID }),
Expand Down Expand Up @@ -330,8 +376,7 @@ type cpeCVEsArgs struct {
}

type cpeCVEsState struct {
emitState
Rows []CVERow
CursorState
}

// CPECVEsFunction lists CVEs for a CPE name.
Expand Down Expand Up @@ -367,15 +412,14 @@ func (f *CPECVEsFunction) NewState(params *vgi.ProcessParams) (*cpeCVEsState, er
if err != nil {
return nil, err
}
return &cpeCVEsState{Rows: rows}, nil
return &cpeCVEsState{CursorState: CursorState{Rows: rows}}, nil
}

func (f *CPECVEsFunction) Process(_ context.Context, _ *vgi.ProcessParams, state *cpeCVEsState, out *vgirpc.OutputCollector) error {
if state.Done {
r, done := state.nextSlice()
if done {
return out.Finish()
}
state.Done = true
r := state.Rows
n := int64(len(r))
batch := array.NewRecordBatch(cpeCVEsSchema, []arrow.Array{
vgi.BuildStringArray(n, func(i int64) string { return r[i].ID }),
Expand Down
Loading
Loading