Skip to content

feat: add NominalAvroWriter primitive for file-only avro output#231

Open
drake-nominal wants to merge 1 commit into
mainfrom
feat/nominal-avro-writer-pr
Open

feat: add NominalAvroWriter primitive for file-only avro output#231
drake-nominal wants to merge 1 commit into
mainfrom
feat/nominal-avro-writer-pr

Conversation

@drake-nominal
Copy link
Copy Markdown
Contributor

@drake-nominal drake-nominal commented Apr 23, 2026

Summary

Introduces a pure-Rust AvroWriter primitive in nominal-streaming plus a thin NominalAvroWriter PyO3 facade in py-nominal-streaming. Both wrap NominalDatasetStream and target callers who want a single avro file on disk (optionally rotated). Crate users get an idiomatic Rust API; Python callers get the same wheel surface as before.

On top of the base writer the branch carries three performance / correctness improvements:

  1. Batched write_dataframe — all columns of a polars frame are accumulated into one (ChannelDescriptor, PointsType) batch and handed to NominalDatasetStream::enqueue_batch. One buffer-lock acquisition per frame instead of one per column.
  2. Parallel avro Record build — a new ParallelAvroFileConsumer fans out the per-series Points → Value tree → Record build across std::thread::scope workers before the serial writer.lock().extend(records). Stdlib only, zero new deps. The core streaming path's AvroFileConsumer is untouched, so other callers are unaffected.
  3. Exact null handling — polars nulls skip cleanly (no sentinel values). NaN / ±Infinity in Float64 columns round-trip unchanged. Struct fields with non-finite floats emit JSON null instead of silently dropping the field.

Plus a .stats() pipeline-timing hook and a py-nominal-streaming/benchmarks/ directory with parameterized bench tooling for future regression tracking.

Why not just a thin wrapper over NominalDatasetStream?

A caller could already write one avro file today by pointing a stream at a path via .to_file(path) and driving it directly — the head-to-head benchmarks (section 1b) show that idiom within ±7% of this branch. Throughput isn't the reason a separate writer exists. The reasons are:

  1. File lifecycle. NominalDatasetStream is network-first; it has no concept of "the output file." The writer owns truncate-on-open (fresh file per writer instance), fsync-on-close, idempotent close() via OnceLock, and drop-safety across clones (drop_mutex + Arc::strong_count). A wrapper that replicates this state machine is not thin.

  2. Rotation. max_points_per_file with filename template <stem>_<index:03d><suffix> drains the current stream, fsyncs, opens a new stream pointed at the next path, and atomically swaps — tracking file_index, points_in_current, and finalized_paths across the transition. Rotation-aware row slicing is baked into write_dataframe and write_batch so a single call can straddle a rotation boundary.

  3. Error latching. Consumer errors inside NominalDatasetStream die on the dispatcher thread. An ErrorLatchingConsumer wraps the consumer and captures the first error via Arc<OnceLock<Arc<AvroWriterError>>>, then surfaces it on every subsequent write_* / flush / sync / close. A write-to-disk API has to tell the caller their data didn't land; a best-effort streaming API doesn't.

  4. Consumer choice + write_dataframe. The writer installs a ParallelAvroFileConsumer (scoped-thread Record build before the serial writer.extend; +10–12% at 200M+ points) and exposes a single write_dataframe(frame, "ts") that handles per-column dispatch, rotation-aware slicing, and documented null/NaN/Inf semantics (table below). The hand-rolled stream-plus-loop idiom has to reimplement each of these — and every downstream caller gets them subtly wrong in a different way.

  5. Typed .stats() hook. PipelineStats carries named atomic counters specific to the file-writer pipeline (df_handoff_ns, column_build_ns, enqueue_batch_ns, consumer_consume_ns, write_dataframe_total_ns) — not part of the stream's surface. Makes bottleneck attribution a one-line writer.stats() call.

The value is in file-lifecycle guarantees + dataframe-oriented ergonomics layered on top of the stream. The stream remains the right primitive for network-first, multi-consumer, long-lived workloads; the writer is the right primitive for "I want N avro files on disk."

Architecture

The branch splits across both crates: the pure-Rust core lives in nominal-streaming, the PyO3 facade in py-nominal-streaming. Pure-Rust users depend only on nominal-streaming; Python callers consume the wheel exactly as they do today.

nominal-streaming (pure-Rust core)

File Role
src/avro_writer/mod.rs Module entry + re-exports (AvroWriter, AvroWriterOpts, AvroWriterError, PipelineStats)
src/avro_writer/writer.rs AvroWriter — generic write<T>, write_batch, close/flush/sync, rotation, Drop
src/avro_writer/opts.rs AvroWriterOpts — plain builder (no #[pyclass])
src/avro_writer/state.rs AvroWriterInner shared via Arc; holds stream, error latch, rotation counters
src/avro_writer/error.rs AvroWriterError + From<ConsumerError>
src/avro_writer/consumer.rs ParallelAvroFileConsumer (scoped-thread Record build) + ErrorLatchingConsumer wrapper
src/avro_writer/helpers.rs Path templating, truncate-on-open, stream+consumer opening
src/avro_writer/stats.rs PipelineStats atomic counters
src/avro_writer/polars.rs impl AvroWriter { pub fn write_dataframe } — gated on the polars Cargo feature

write_dataframe is behind #[cfg(feature = "polars")]. Rust callers that only use write<T> / write_batch pay no polars dependency.

py-nominal-streaming (PyO3 facade)

File Role
src/avro_writer/mod.rs Submodule decls + re-exports
src/avro_writer/writer.rs #[pyclass] NominalAvroWriter { inner: AvroWriter }
src/avro_writer/opts.rs #[pyclass] NominalAvroWriterOpts { inner: AvroWriterOpts }
src/avro_writer/pymethods.rs #[pymethods] block — extract Python args, py.detach() into the core
src/avro_writer/polars_ffi.rs Arrow C Data Interface (Python polars → Rust polars)
src/avro_writer/error_map.rs Arc<AvroWriterError> → PyErr
python/nominal_streaming/_nominal_streaming.pyi IDE / mypy support

py-nominal-streaming/Cargo.toml pulls in the core with features = ["logging", "polars"] — this activates AvroWriter::write_dataframe inside the core crate (verified by removing polars from the feature list and watching cargo check error on the missing method). The facade does Python-side input validation (timestamp column present + non-null + correct dtype → PyValueError / PyTypeError) before delegating, preserving the existing Python error contract.

Pipeline

The writer delegates the hot path to NominalDatasetStream — inheriting its primary/secondary buffer + batch-processor + request-dispatcher pipeline. Additions on top:

  • Generic write<T> API over IntoPoints — one method covers all six proto point types.
  • write_dataframe(df, ts_col, tags=None) — per-column dispatch for Float64, Int64, String, Struct, List/Array of scalar or string.
  • Rotation via max_points_per_file with filename template <stem>_<index:03d><suffix>. No separate RotatingAvroWriter class.
  • Error latching (first-wins) via ErrorLatchingConsumer wrapping AvroFileConsumer, surfaced on every subsequent write_* / flush / sync / close.
  • Idempotent close() via OnceLock; close_lock: RwLock serializes writes vs close; drop_mutex: Mutex makes concurrent drops race-free.
  • Explicit truncate-on-open in new() — fresh file per writer instance.

Null / NaN / Infinity semantics

Column dtype Polars null Float NaN Float ±Infinity
Float64 skipped (no record) preserved preserved
Int64 skipped
String skipped
Struct(…) row skipped field → JSON null field → JSON null
List(*) / Array(*, _) row skipped (element preserved when inner) element preserved element preserved
timestamp column hard error (null in timestamp column)

The principle: polars null ⇒ "no data at this timestamp"; NaN / ±Inf ⇒ real IEEE-754 values that round-trip through avro. Struct non-finite floats encode as JSON null because spec JSON has no NaN / Inf tokens — indistinguishable from a polars null on read, but the containing object's shape is preserved (previously the field was silently dropped).


Performance characterization

All numbers: Apple Silicon M-series, CPython 3.12, release wheel, snappy-compressed avro, local SSD. Reruns are within ±3% variance unless noted.

1. Peak and baseline comparison

Config pts/s wall
Pure-Python fastavro + 8-thread pool ~3.1 M
This branch, default shape (1000 cols × 1M frame, 200M pts) 24.4 M 8.2s
This branch, peak (10 cols × 1M frame, 200M pts) 29.5 M 6.8s
1B-point run (serial record build, for A/B) 21.2 M 47.1s
1B-point run (parallel record build) 23.8 M 42.0s

Parallel record-build lifts by +12.1% at 1B scale; matches the +10% observed at 200M. Consistent across scale. ~8× faster than the pure-Python fastavro fallback at the default shape.

1b. Head-to-head vs PyNominalDatasetStream.to_file

The baseline is the idiom a Python caller could write with the existing pre-branch API — point a PyNominalDatasetStream at a file via .to_file(path), loop the polars frame, call enqueue_batch(channel, ts_list, values_list) once per column per frame. Both paths produce the same avro schema to the same on-disk format; they differ only in how the caller drives them.

Run via benchmarks/head_to_head.py (200M total Float64 points, fsync_on_close=False on both sides for apples-to-apples):

cols frame_pts old wall old pts/s new wall new pts/s ratio
10 100K 7.10s 28.2 M/s 7.03s 28.4 M/s 1.01×
10 1M 3.37s 59.4 M/s 3.48s 57.4 M/s 0.97×
100 100K 7.46s 26.8 M/s 7.19s 27.8 M/s 1.04×
100 1M 6.71s 29.8 M/s 7.22s 27.7 M/s 0.93×
1000 100K 8.46s 23.6 M/s 8.69s 23.0 M/s 0.97×
1000 1M 8.25s 24.3 M/s 8.38s 23.9 M/s 0.98×
5000 100K 20.2s 9.9 M/s 19.8s 10.1 M/s 1.02×
5000 1M 9.19s 21.8 M/s 9.28s 21.6 M/s 0.99×

Within ±7% across the entire grid — the two idioms deliver equivalent throughput. The consumer-side avro encoder is the shared bottleneck for both; the per-column enqueue_batch path amortizes its per-frame FFI crossings well because each crossing already does genuine work (a whole column's worth of points at a time).

The win of write_dataframe is ergonomic, not raw speed. Callers swap a hand-rolled column-dispatch loop:

stream = PyNominalDatasetStream(opts).to_file(path)
stream.open()
try:
    for frame in frames:
        ts_list = frame["ts"].to_list()
        for col in frame.columns:
            if col == "ts": continue
            stream.enqueue_batch(col, ts_list, frame[col].to_list())
finally:
    stream.close()

…for one line:

with NominalAvroWriter(path, opts) as w:
    for frame in frames:
        w.write_dataframe(frame, timestamp_column="ts")

Plus write_dataframe bakes in documented null/NaN/Inf semantics (see section above), built-in rotation, pipeline observability via .stats(), and first-error latching — all of which the hand-rolled loop either skips or gets wrong.

2. Pipeline attribution at default shape (1000 cols × 200M pts × 1M frames, Float64)

wall_clock:              8.20s   100.0%   (24.4 M pts/s)

Producer CPU phases (caller thread, inside write_dataframe):
  df_handoff (FFI)        234ms     2.9%
  extract timestamps        6ms     0.1%
  column_build            712ms     8.7%
    → producer CPU total  952ms    11.6%

Producer wall inside stream:
  enqueue_batch          6.87s    83.8%   ← blocked on buffer
                                           (consumer can't drain fast enough)

Consumer thread (parallel, single dispatcher):
  consumer.consume       7.80s    95.2%   ← SATURATED bottleneck
    ↳ points_to_avro     0.73s            (parallel; was 2.14s serial)
    ↳ writer.extend      7.07s            (serial avro encode + snappy + write)

Consumer saturation: 95.2% of wall → the downstream avro encoder+writer is the bottleneck; enqueue_batch_ns grows because the producer is backpressured on a full buffer. Producer CPU is only 12% of wall.

3. Frame-size sweep (1000 cols × 200M pts, Float64)

frame_pts    rows/frm   calls   elapsed      pts/s    consumer util
---------------------------------------------------------------------
   50,000          50    4000     8.48s     23.6 M          95.0%
  100,000         100    2000     8.35s     23.9 M          94.5%
  250,000         250     800     8.33s     24.0 M          95.8%
  500,000         500     400     8.33s     24.0 M          95.4%
1,000,000        1000     200     8.25s     24.2 M          96.1%
2,000,000        2000     100     7.98s     25.1 M          91.2%
5,000,000        5000      40     7.56s     26.5 M          76.5%   ← sweet spot
10,000,000      10000      20     7.71s     25.9 M          55.8%   ← producer-limited

Throughput is flat within 12% across a 200× range in frame size. Below 2M-point frames the consumer is saturated (≥94% util). At 5M-point frames the consumer has headroom and the producer becomes the limiter. Past 5M, per-frame allocation cost starts to erode gains.

4. Column-count sweep (200M pts × 1M frame, Float64)

n_cols    rows/frm   calls   elapsed      pts/s    writer.extend total
------------------------------------------------------------------------
    10    100K        200     6.74s     29.7 M          5.94s
    50     20K        200     6.89s     29.0 M          6.31s
   100     10K        200     7.08s     28.2 M          6.51s
   500      2K        200     7.93s     25.2 M          7.25s
  1000      1K        200     8.65s     23.1 M          7.86s
  2500    400         200     8.91s     22.5 M          8.17s
  5000    200         200     9.08s     22.0 M          8.27s

writer.extend grows monotonically with channel count because the avro encoder pays a per-Record overhead (schema lookup, tags map, array headers) that doesn't amortize over points inside the record. At 5000 cols each WriteRequest produces 5000 Records; at 10 cols only 10.

5. 2D surface (cols × frame_points, 200M pts, Float64)

Throughput (M pts/s):

cols \ frame_pts    250K      1M      5M
      10          29.4    29.5    26.0
     100          27.9    28.0    25.2
    1000          24.0    24.2    26.3    ← default shape
    5000          21.1    21.9    23.7

Consumer utilization (% of wall):

cols \ frame_pts    250K      1M      5M
      10          95.7%   95.6%   76.8%
     100          95.8%   95.6%   76.6%
    1000          95.9%   95.4%   76.5%
    5000          94.2%   94.2%   77.2%

Two regimes separated cleanly by frame size (not column count):

  • ≤1M-point frames → consumer-saturated (~95% util across all widths).
  • 5M-point frames → producer-limited (~77% util).

Diagonal trade-off: bigger frames help wide workloads but hurt narrow ones. 5000 cols goes 21.1 → 23.7 M (+13%) when frames grow 250K → 5M; 10 cols goes 29.4 → 26.0 (−12%) on the same move.

Overall spread across the 4×3 grid: 1.39× (worst 21.1 M, best 29.5 M).

6. Dtype matrix (100 & 1000 cols × 100K / 1M frames, 50M pts per cell)

dtype   n_cols  frame_pts    elapsed     pts/s    size on disk
----------------------------------------------------------------
float      100     100K        2.78s    18.0 M      533 MiB
float      100       1M        2.82s    17.8 M      572 MiB
float     1000     100K        3.16s    15.8 M      470 MiB
float     1000       1M        3.11s    16.1 M      525 MiB
string     100     100K        7.25s     6.9 M      333 MiB
string     100       1M        7.55s     6.6 M      370 MiB
string    1000     100K        7.68s     6.5 M      112 MiB    ← snappy compresses repeats
string    1000       1M        7.49s     6.7 M      167 MiB
array      100     100K       26.99s     1.85 M     822 MiB    ← List(Float64), 8 inner f64 per row
array      100       1M       27.46s     1.82 M     801 MiB
array     1000     100K       26.67s     1.87 M     804 MiB
array     1000       1M       26.60s     1.88 M     810 MiB
struct     100     100K       25.56s     1.96 M     566 MiB    ← JSON-serialized
struct     100       1M       25.71s     1.94 M     589 MiB
struct    1000     100K       29.08s     1.72 M     176 MiB
struct    1000       1M       26.30s     1.90 M     584 MiB

Relative ordering is stable across shape:

  • Float: 16–18 M pts/s. Simplest encoding path (one Value::Double per point); fastest.
  • String: 6–7 M pts/s, ~3× slower than float. Each point requires a per-row String::to_string() clone + variable-width avro encode.
  • Array: ~1.85 M pts/s, ~10× slower than float. List(Float64) is cast to Float64 per row then each element allocates a Value::Double; avro arrays have per-element framing.
  • Struct: ~1.9 M pts/s, ~10× slower than float. Per-row serde_json::to_string is the cost; struct is encoded as a JSON string inside avro's record union.

Raw per-point numbers understate string/array/struct throughput — array moves 8 floats per point, so the effective byte throughput is ~118 MB/s data plus avro/snappy overhead.

The 50M-point throughput is lower than the 200M-point throughput for the same shape (float 16 M vs 24 M) because buffer-fill + encoder warm-up are a bigger fraction of short runs.

7. Rotation overhead (1000 cols × 100M pts × 1M frames, Float64)

max_points_per_file    files   elapsed      pts/s    vs no-rotation
--------------------------------------------------------------------
no rotation               1     6.28s     15.9 M           1.00×
100M (effectively off)    1     6.27s     15.9 M           1.00×
10M                      10     6.64s     15.1 M           1.06×   ← +6%
1M (= frame size)       100    20.79s      4.8 M           3.31×   ← 3.3× slowdown
500K                    100    21.20s      4.7 M           3.38×
250K                    100    20.85s      4.8 M           3.32×

Rotation cost is small while each file holds many frames. Rotating on every frame is catastrophic — 3.3× wall-time penalty. Once max_points_per_file ≤ frame_points, every frame forces a rotation (drain stream, fsync, reopen stream). Three of the rows above show identical 3.3× penalty because they all collapse to "one rotation per frame."

Practical guidance: set max_points_per_file such that at least ~10 frames fit per file.

8. Scaling confirmation at 1B points (1000 cols × 1M frames, Float64)

wall pts/s points_to_avro phase
Serial baseline 47.08s 21.2 M 11.16s
Parallel (scoped threads) 42.00s 23.8 M 4.26s (2.6× faster)

The points_to_avro phase drops 2.6× with parallelization. Wall-time gain is smaller (+12%) because the consumer remains saturated — savings inside points_to_avro shift into the serialized writer.extend stage rather than removing wall.

Gaps — what was NOT tested

  • Int64 throughput: the parameterized bench script's --dtypes flag only covers float / string / array / struct; Int64 is expected to match Float64 qualitatively (same scalar arm, same point size) but not benchmarked directly.
  • Mixed-dtype frames: all sweeps use homogeneous columns. Per-column dispatch adds no shared state across dtypes, so qualitative shape should hold, but mixed frames are not measured.
  • Concurrent producers: write_dataframe is single-threaded on the caller side. Multi-producer throughput through enqueue is unit-tested for correctness but not benched.
  • Non-snappy codecs: everything uses apache_avro::Codec::Snappy. Deflate / Zstandard / null would shift the writer.extend breakdown.
  • Non-Apple-Silicon architectures: not benchmarked on x86-64.
  • CPython 3.10 / 3.11: all numbers above are 3.12. Earlier observation (from prior sessions) was ~60% throughput on 3.10 due to PyO3 boundary-crossing overhead.
  • I/O-bound disks: tests used a local SSD with plenty of free space. Disk backpressure would show up inside writer.extend.
  • Error-latched hot path: the fast-return-on-latched-error behavior is unit-tested but not benchmarked.
  • 1B-point runs beyond the single A/B above (only ran once per configuration at that scale).

Observability hook

NominalAvroWriter.stats() (and AvroWriter::stats() on the Rust side) returns a snapshot of nanosecond counters for pipeline-attribution:

  • df_handoff_ns — Arrow C Data Interface FFI (only populated by write_dataframe)
  • extract_ts_ns — timestamp column extraction
  • column_build_ns — per-column polars-chunk extraction + point-struct building
  • enqueue_batch_ns — producer wall into the stream (includes blocked-on-buffer)
  • consumer_consume_ns / _calls — dispatcher-side avro encode + write
  • write_dataframe_calls / _total_ns — meta

Rule of thumb: consumer_consume_ns ≈ wall_clock → consumer bottleneck. producer_cpu_ns ≈ wall_clock → producer bottleneck.

Bench tooling

Four scripts under py-nominal-streaming/benchmarks/:

  • bench_nominal_avro_writer.py — parameterized CSV sweep (dtype × cols × total × frame).
  • pipeline_profile.py — single-run flame-graph-style attribution using stats().
  • sweep.py — 1D / 2D characterization matrix.
  • head_to_head.pyNominalAvroWriter.write_dataframe vs PyNominalDatasetStream.to_file + per-column enqueue_batch. Use this whenever referencing the old-idiom comparison.

Dependencies + build

  • Core crate gains an optional polars Cargo feature; enabling it activates AvroWriter::write_dataframe. Pure-Rust callers who skip the feature pull no polars dependency.
  • py-nominal-streaming depends on the core with features = ["logging", "polars"] and keeps polars + polars-arrow as direct deps for the Arrow C Data Interface FFI.
  • polars>=1,<2 as a hard Python runtime dep (write_dataframe uses it directly via the Arrow C Data Interface).
  • Test extras: fastavro + cramjam (for reading snappy-encoded output).
  • extension-module is a Cargo feature on py-nominal-streaming that forwards to pyo3/extension-module. Maturin enables it via [tool.maturin].features; cargo test runs without it.
  • crate-type = ["cdylib", "rlib"] so cargo test / cargo doc work alongside the wheel build.

Test plan

  • Rust unit testsnominal-streaming (25 tests: 9 stream + 1 consumer + 15 writer): roundtrip all 6 value types, batch boundaries, multi-producer, error latching + Io variant preservation, drop-without-close, close idempotency, flush+sync, concurrent write/close race, simultaneous-drop race, truncate-on-reopen, rotation, straddle-across-rotation.
  • Python tests (15 total): roundtrip, rotation, write_dataframe dispatch, written_files observability, context manager, flush/sync best-effort, float null-skip + NaN/Inf round-trip, int/string null-skip, all-null column, struct non-finite → JSON null, struct null-row skip, timestamp-null hard error.
  • just check passes (ruff, mypy, cargo fmt).
  • Benchmarks above reproducible via the four scripts under py-nominal-streaming/benchmarks/.

@drake-nominal drake-nominal force-pushed the feat/nominal-avro-writer-pr branch 6 times, most recently from c88d4a6 to f5a4d1a Compare April 23, 2026 21:54
Comment thread Cargo.lock
@drake-nominal drake-nominal force-pushed the feat/nominal-avro-writer-pr branch 14 times, most recently from 243f958 to 176237b Compare April 24, 2026 20:57
…acade

Introduces two tightly-paired primitives for writing avro files in the
Nominal AvroStream schema:

 1. `nominal_streaming::avro_writer::AvroWriter` — a pure-Rust,
    idiomatically-shaped file writer (no `#[pyclass]`, no Python deps).
    Lives alongside `NominalDatasetStream` in the core crate. Feature-
    gates polars support behind a `polars` Cargo feature, so pure-Rust
    users who don't want a DataFrame dependency don't pay for one.
 2. `py_nominal_streaming::avro_writer::NominalAvroWriter` — a thin
    `#[pyclass]` facade that composes `AvroWriter` + handles Python↔Rust
    conversion. No pipeline logic lives here.

The two crates together deliver: single-file avro output, rotation via a
filename template, first-error latching (via `OnceLock`), idempotent
`close()` with cached result, truncate-on-open contract, and a
high-throughput polars DataFrame ingest path with documented null / NaN /
Infinity semantics that align with the Nominal Core ingest backend's
filtering.

Core crate — `nominal-streaming/src/avro_writer/`:
  mod.rs        — module doc + re-exports (AvroWriter, AvroWriterOpts,
                  AvroWriterError, PipelineStats)
  opts.rs       — plain struct + `::new()` + `with_*` builder chain
  error.rs      — AvroWriterError enum + From<ConsumerError>, documents the
                  Arc-wrapping contract for sticky latched errors
  stats.rs      — PipelineStats (public AtomicU64 fields)
  consumer.rs   — ParallelAvroFileConsumer (scoped-thread Record build) +
                  ErrorLatchingConsumer
  state.rs      — AvroWriterInner shared state behind Arc
  helpers.rs    — path_for_index, ensure_parent_and_truncate (io::Result),
                  open_stream, open_error_latching_consumer
  writer.rs     — AvroWriter struct + pub methods + Drop + 16 unit tests
  polars.rs     — feature-gated write_dataframe + anyvalue_to_json helpers

Facade crate — `py-nominal-streaming/src/avro_writer/`:
  mod.rs        — submodule decls + re-exports (NominalAvroWriter,
                  NominalAvroWriterOpts)
  writer.rs     — 32-line pyclass wrapping `AvroWriter`
  opts.rs       — pyclass wrapping `AvroWriterOpts`, getter/with_* mirror
                  for Python
  pymethods.rs  — #[pymethods] block; every method is Python extraction +
                  delegate to inner `AvroWriter`
  polars_ffi.rs — Arrow C Data Interface FFI (Python polars → Rust polars);
                  stays here because it's inherently Python-specific
  error_map.rs  — Arc<AvroWriterError> → PyRuntimeError mapper

Public Rust API (`nominal_streaming::avro_writer`):
  AvroWriter::new(path, opts) -> io::Result<Self>
  AvroWriter::write<T>(&ChannelDescriptor, Vec<T>) where Vec<T>: IntoPoints
  AvroWriter::write_batch(Vec<(ChannelDescriptor, PointsType)>)
  AvroWriter::write_dataframe(&DataFrame, ts_col, tags)   [polars feature]
  AvroWriter::close() / flush() / sync()
  AvroWriter::path() / finalized_paths() / written_files() / points_accepted()
  AvroWriter::stats() -> &PipelineStats

Public Python API (unchanged from earlier iterations):
  NominalAvroWriter(path, opts=None)
  w.write(channel, ts_ns, value, tags=None)
  w.write_batch(channel, ts_ns_list, values, tags=None)
  w.write_from_dict(ts_ns, channel_values, tags=None)
  w.write_struct(channel, ts_ns, value, tags=None)
  w.write_float_array / write_string_array (channel, ts_ns, value, tags=None)
  w.write_dataframe(df, timestamp_column, tags=None)
  w.close() / flush() / sync() / path() / finalized_paths() /
    written_files() / points_accepted() / stats()
  context-manager support (__enter__/__exit__)

Null / NaN / Infinity semantics (documented end-to-end, aligned with
backend filters in `DirectClickHouseFileWriterV2`):

 - Polars null → row skipped (no avro record emitted at that timestamp)
 - Float64 NaN / ±Infinity → preserved as IEEE-754 bit patterns
 - Struct fields with non-finite floats → emitted as JSON null (not
   literal `NaN`/`Infinity` tokens — the backend's JSON column silently
   accepts those at insert but fails at query time, a silent data-loss
   trap)
 - Null in the timestamp column → hard error

Cargo feature wiring:
  nominal-streaming exposes optional `polars` feature
  py-nominal-streaming enables it via `features = ["logging", "polars"]`
  on its `nominal-streaming` dep, guaranteeing the facade's
  `write_dataframe` call target always resolves (compile-time enforced —
  if the feature is ever dropped, the py crate fails to build).

Benchmark tooling under `py-nominal-streaming/benchmarks/`:
  bench_nominal_avro_writer.py  — parameterized CSV sweep
  pipeline_profile.py           — single-run flame-graph attribution
  sweep.py                      — 1D / 2D characterization matrix
  head_to_head.py               — NominalAvroWriter vs the idiomatic
                                   `PyNominalDatasetStream.to_file` +
                                   per-column `enqueue_batch` loop
All four scripts refuse to run under Python < 3.12 (PyO3 boundary
crossings are ~60% faster on 3.12+, so cross-version comparisons
mislead).

Build + dependencies:
  - polars is a hard runtime dep of the py crate (used by write_dataframe
    callers); the Rust FFI lives in `polars_ffi.rs` and imports via the
    Arrow C Data Interface (no pyarrow required)
  - extension-module is a Cargo feature on the py crate forwarded to
    pyo3/extension-module; maturin enables it for wheel builds, cargo
    test runs without it
  - crate-type = ["cdylib", "rlib"] so cargo test + cargo doc work
    alongside the wheel build

Performance (Apple Silicon, CPython 3.12, release wheel):
  - Peak: ~29 M pts/s at 10 cols × 1M-point frames
  - Default shape (1000 cols × 1M frames × 200M points): ~24 M pts/s
  - At parity with the pre-branch `PyNominalDatasetStream.to_file` +
    hand-rolled per-column `enqueue_batch` loop (within ±7% across the
    cols × frame_points grid); the `write_dataframe` surface wins on
    ergonomics, not raw speed
  - ~8× faster than a pure-Python `fastavro` + 8-thread pool alternative
  - Avro encoder (single-threaded `writer.extend`) is the ceiling at
    95%+ consumer utilization for all frame sizes ≤ 1M points

Tests:
  - 25 Rust tests pass (9 pre-existing `nominal-streaming` stream tests
    + 1 consumer test + 15 AvroWriter tests covering rotation, error
    latching, multi-producer, close race, drop race, truncate-on-reopen,
    rotation-split across boundary, etc.)
  - 15 Python tests pass (roundtrip, context manager, flush/sync,
    write_dataframe dispatch + null/NaN/Inf handling for all dtypes,
    timestamp-null hard error)
  - `just check` clean (cargo fmt --nightly, mypy, ruff format, ruff
    check)
  - `cargo check -p nominal-streaming` (core standalone, no features)
    and `cargo check -p nominal-streaming --features polars` both clean
@drake-nominal drake-nominal force-pushed the feat/nominal-avro-writer-pr branch from 176237b to 4cf1814 Compare April 24, 2026 21:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant