Skip to content

Johnny/preview#3117

Draft
jgraettinger wants to merge 7 commits into
masterfrom
johnny/preview
Draft

Johnny/preview#3117
jgraettinger wants to merge 7 commits into
masterfrom
johnny/preview

Conversation

@jgraettinger

Copy link
Copy Markdown
Member

Description:

(Describe the high level scope of new or changed features)

Workflow steps:

(How does one use this feature, and how has it changed)

Documentation links affected:

(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)

Notes for reviewers:

(anything that might help someone review this PR)

A broker aborts an Append RPC when its client can't sustain the minimum
data rate (gazette's MinAppendRate flow-control policing), shedding a slow
or contended client so others may proceed over the exclusively-owned,
highly-contended append pipeline. The append rolls back cleanly and is
safe to retry — indeed gazette's own Go AppendService retries it
indefinitely with backoff.

Gazette surfaces this as ErrFlowControlUnderflow: a bare error that leaves
the resolved status OK, so the client sees it as a gRPC status with code
Unknown and message "append stream: client stream didn't sustain the
minimum append data rate" rather than an AppendResponse.Status.

`Error::is_transient()` only treated Unknown-coded statuses as transient
for two hardcoded transport strings, so runtime-next's publisher (which
gates retries on is_transient) latched it as terminal and failed the
shard. Classify the underflow as transient so it rides the existing retry
+ backoff loop in journal::Client::append, matching legacy behavior.

Verified against a live broker: a deliberately-stalled append is aborted
with the underflow, classified transient, retried, and commits — where
before it hard-failed.
Both the V1 `runtime` and `runtime-next` container startup paths mount a
`flow-connector-init` binary into each connector container, preferring a
locally-built copy (via `locate_bin`) and otherwise extracting one from a
pinned image. That pin still pointed at `ghcr.io/estuary/flow:v0.5.24-30-...`,
but the `ghcr.io/estuary/flow` image is no longer published — its newest tag
predates the `projection_constraints` Validated field by ~6 months. An
older `flow-connector-init` re-decodes each connector's Validated response
with a proto that lacks that field and silently drops it, so a materialization
whose connector emits `projection_constraints` (e.g. current materialize-sql
connectors) comes back with no field constraints and fails validation with a
spurious "group-by key field has no constraint" error.

Repoint both pins at the reactor image, which bundles the same
statically-linked `flow-connector-init` at the same path and tracks current
flow. This unblocks local validation / `flowctl raw preview-next` of
connectors using the list-form constraint response.
The soak `views` materialization pinned `materialize-postgres:13946b2`, a
branch build carrying the synchronous StartedCommit change the V2 runtime's
remote-authoritative checkpoint path requires. That change has since merged
(connectors "second round of synchronous StartedCommit") and ships in the tip
`:dev` tag, so drop the branch pin and track `:dev` like the other tasks.
The materialize leader FSM returned `Action::Idle` when reaching a clean
transaction boundary while stopping (both `HeadIdle` and
`HeadStartCommit`). `Idle` parks the actor loop for a full 60s
`ACTOR_TICK_INTERVAL` before it re-polls and notices `Head::Stop`, so
tear-down hung for a minute awaiting the tick.

Return `Action::PollAgain` instead, which the actor loop maps to
`Duration::ZERO` and re-polls immediately, exiting `while !Head::Stop`
without delay. This matches the capture / derive sibling FSMs, which already
return `(PollAgain, Head::Stop)` at their stop boundaries.
…tories

Replace the prior per-seam openers with three uniform, monomorphized host
seams, each a factory the leader and shard `Service`s are generic over:

- publish: rename `PublisherOpener` / `RealPublisher(Opener)` to
  `PublisherFactory` / `JournalPublisher(Factory)`. The publisher now carries
  only document output; its former observation calls move to the Logger seam.
- leader: replace the `CheckpointSource` / `CheckpointOpener` + `ShuffleOpener`
  pair with `ShuffleSession` / `ShuffleSessionFactory` (static RPITIT, not
  `dyn`) plus the standard `ShuffleServiceFactory`, merging
  `leader/checkpoint.rs` into `leader/shuffle.rs`.
- log: new `Logger` / `LoggerFactory` seam — the task's out-of-band log and
  event stream. A `Logger` sinks both the connector's own logs (`log`) and
  structured runtime `LogEvent`s (`event`): a `#[non_exhaustive]` enum
  (persist / applied / inferred-schema / container lifecycle) whose variants
  carry borrowed, verbatim payloads so a host may intercept them structurally.
  Its canonical `LogEvent::to_log` flattens each event into the ops-log line it
  replaced, and `event` defaults to that flattening — so a host overrides only
  to intercept, delegating the rest. Production shards install `FnLoggerFactory`
  (sinking to the task-log writer); leaders and tests install
  `TracingLoggerFactory`.

Retire the now-unnecessary `Task.preview` / `preview_state` / `preview_apply`
controls and `initial_state_json` from the runtime proto: how a task publishes
and logs is a host concern decided by the installed factories, not a protocol
flag. Add `seed_initial_connector_state` / `read_final_connector_state` so a
harness can seed shard zero's RocksDB by path and read back its final reduced
state, and re-expose `shuffle::log::BlockMeta` for harness segment writers.
Drive `flowctl raw preview-next` through runtime-next's three factory seams
instead of the retired combined publisher opener:

- publish: an output-capturing `PreviewPublisherFactory` writes captured /
  derived documents to stdout as NDJSON.
- log: a `PreviewLoggerFactory` sinks the task's log stream and powers
  `--output-state` / `--output-apply` (the legacy `connectorState` / `applied`
  lines) by intercepting runtime-next's `LogEvent::Persist` / `LogEvent::Applied`
  structurally — decoding state deltas from its tab-framed patch wire form — and
  flattening every other event through `LogEvent::to_log` into the same log
  handler as the connector's own logs. The `--output-state` run-end final reduced
  state is emitted by re-opening shard zero's RocksDB (`read_final_connector_state`)
  once the session loop has released it.
- shuffle: a `PreviewShuffleFactory` enum selects per run between a live
  in-process journal-reading shuffle Session and a `--fixture` replay
  (`FixtureOpener`, a `ShuffleSessionFactory` that writes log segments and feeds
  synthetic checkpoint Frontiers).

`--initial-state` seeds shard zero's RocksDB by path via
`runtime_next::seed_initial_connector_state`, replacing the retired
`Task.initial_state_json` field. `--delay`, eager/streaming fixtures, and
`--sessions` planning are unchanged.
`flowctl preview` now runs on the runtime-next + shuffle stack that was
developed on this branch as `flowctl raw preview-next`, which the prior
commits verified to be behaviorally compatible with the legacy preview.

Move the `raw/preview_next/` module into `preview/`, delete the old
V1-runtime `preview/{mod,journal_reader}.rs`, and drop the `raw
preview-next` command wiring. The CLI flag surface, stdout/stderr line
shapes, and exit-code contract that the estuary/connectors CI harnesses
depend on are preserved; `--shards` and `--debug-port` are additive.

The old preview was the sole consumer of `runtime::harness` (and its
private `exchange` combinator), so remove that now-dead module too.

Update the soak README to the new command name, and delete the
runtime-v2 preview harness guide: a closed, point-in-time document
describing `raw preview-next` as it stood during bring-up.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant