feat: add NominalAvroWriter primitive for file-only avro output#231
Open
drake-nominal wants to merge 1 commit into
Open
feat: add NominalAvroWriter primitive for file-only avro output#231drake-nominal wants to merge 1 commit into
drake-nominal wants to merge 1 commit into
Conversation
c88d4a6 to
f5a4d1a
Compare
243f958 to
176237b
Compare
…acade
Introduces two tightly-paired primitives for writing avro files in the
Nominal AvroStream schema:
1. `nominal_streaming::avro_writer::AvroWriter` — a pure-Rust,
idiomatically-shaped file writer (no `#[pyclass]`, no Python deps).
Lives alongside `NominalDatasetStream` in the core crate. Feature-
gates polars support behind a `polars` Cargo feature, so pure-Rust
users who don't want a DataFrame dependency don't pay for one.
2. `py_nominal_streaming::avro_writer::NominalAvroWriter` — a thin
`#[pyclass]` facade that composes `AvroWriter` + handles Python↔Rust
conversion. No pipeline logic lives here.
The two crates together deliver: single-file avro output, rotation via a
filename template, first-error latching (via `OnceLock`), idempotent
`close()` with cached result, truncate-on-open contract, and a
high-throughput polars DataFrame ingest path with documented null / NaN /
Infinity semantics that align with the Nominal Core ingest backend's
filtering.
Core crate — `nominal-streaming/src/avro_writer/`:
mod.rs — module doc + re-exports (AvroWriter, AvroWriterOpts,
AvroWriterError, PipelineStats)
opts.rs — plain struct + `::new()` + `with_*` builder chain
error.rs — AvroWriterError enum + From<ConsumerError>, documents the
Arc-wrapping contract for sticky latched errors
stats.rs — PipelineStats (public AtomicU64 fields)
consumer.rs — ParallelAvroFileConsumer (scoped-thread Record build) +
ErrorLatchingConsumer
state.rs — AvroWriterInner shared state behind Arc
helpers.rs — path_for_index, ensure_parent_and_truncate (io::Result),
open_stream, open_error_latching_consumer
writer.rs — AvroWriter struct + pub methods + Drop + 16 unit tests
polars.rs — feature-gated write_dataframe + anyvalue_to_json helpers
Facade crate — `py-nominal-streaming/src/avro_writer/`:
mod.rs — submodule decls + re-exports (NominalAvroWriter,
NominalAvroWriterOpts)
writer.rs — 32-line pyclass wrapping `AvroWriter`
opts.rs — pyclass wrapping `AvroWriterOpts`, getter/with_* mirror
for Python
pymethods.rs — #[pymethods] block; every method is Python extraction +
delegate to inner `AvroWriter`
polars_ffi.rs — Arrow C Data Interface FFI (Python polars → Rust polars);
stays here because it's inherently Python-specific
error_map.rs — Arc<AvroWriterError> → PyRuntimeError mapper
Public Rust API (`nominal_streaming::avro_writer`):
AvroWriter::new(path, opts) -> io::Result<Self>
AvroWriter::write<T>(&ChannelDescriptor, Vec<T>) where Vec<T>: IntoPoints
AvroWriter::write_batch(Vec<(ChannelDescriptor, PointsType)>)
AvroWriter::write_dataframe(&DataFrame, ts_col, tags) [polars feature]
AvroWriter::close() / flush() / sync()
AvroWriter::path() / finalized_paths() / written_files() / points_accepted()
AvroWriter::stats() -> &PipelineStats
Public Python API (unchanged from earlier iterations):
NominalAvroWriter(path, opts=None)
w.write(channel, ts_ns, value, tags=None)
w.write_batch(channel, ts_ns_list, values, tags=None)
w.write_from_dict(ts_ns, channel_values, tags=None)
w.write_struct(channel, ts_ns, value, tags=None)
w.write_float_array / write_string_array (channel, ts_ns, value, tags=None)
w.write_dataframe(df, timestamp_column, tags=None)
w.close() / flush() / sync() / path() / finalized_paths() /
written_files() / points_accepted() / stats()
context-manager support (__enter__/__exit__)
Null / NaN / Infinity semantics (documented end-to-end, aligned with
backend filters in `DirectClickHouseFileWriterV2`):
- Polars null → row skipped (no avro record emitted at that timestamp)
- Float64 NaN / ±Infinity → preserved as IEEE-754 bit patterns
- Struct fields with non-finite floats → emitted as JSON null (not
literal `NaN`/`Infinity` tokens — the backend's JSON column silently
accepts those at insert but fails at query time, a silent data-loss
trap)
- Null in the timestamp column → hard error
Cargo feature wiring:
nominal-streaming exposes optional `polars` feature
py-nominal-streaming enables it via `features = ["logging", "polars"]`
on its `nominal-streaming` dep, guaranteeing the facade's
`write_dataframe` call target always resolves (compile-time enforced —
if the feature is ever dropped, the py crate fails to build).
Benchmark tooling under `py-nominal-streaming/benchmarks/`:
bench_nominal_avro_writer.py — parameterized CSV sweep
pipeline_profile.py — single-run flame-graph attribution
sweep.py — 1D / 2D characterization matrix
head_to_head.py — NominalAvroWriter vs the idiomatic
`PyNominalDatasetStream.to_file` +
per-column `enqueue_batch` loop
All four scripts refuse to run under Python < 3.12 (PyO3 boundary
crossings are ~60% faster on 3.12+, so cross-version comparisons
mislead).
Build + dependencies:
- polars is a hard runtime dep of the py crate (used by write_dataframe
callers); the Rust FFI lives in `polars_ffi.rs` and imports via the
Arrow C Data Interface (no pyarrow required)
- extension-module is a Cargo feature on the py crate forwarded to
pyo3/extension-module; maturin enables it for wheel builds, cargo
test runs without it
- crate-type = ["cdylib", "rlib"] so cargo test + cargo doc work
alongside the wheel build
Performance (Apple Silicon, CPython 3.12, release wheel):
- Peak: ~29 M pts/s at 10 cols × 1M-point frames
- Default shape (1000 cols × 1M frames × 200M points): ~24 M pts/s
- At parity with the pre-branch `PyNominalDatasetStream.to_file` +
hand-rolled per-column `enqueue_batch` loop (within ±7% across the
cols × frame_points grid); the `write_dataframe` surface wins on
ergonomics, not raw speed
- ~8× faster than a pure-Python `fastavro` + 8-thread pool alternative
- Avro encoder (single-threaded `writer.extend`) is the ceiling at
95%+ consumer utilization for all frame sizes ≤ 1M points
Tests:
- 25 Rust tests pass (9 pre-existing `nominal-streaming` stream tests
+ 1 consumer test + 15 AvroWriter tests covering rotation, error
latching, multi-producer, close race, drop race, truncate-on-reopen,
rotation-split across boundary, etc.)
- 15 Python tests pass (roundtrip, context manager, flush/sync,
write_dataframe dispatch + null/NaN/Inf handling for all dtypes,
timestamp-null hard error)
- `just check` clean (cargo fmt --nightly, mypy, ruff format, ruff
check)
- `cargo check -p nominal-streaming` (core standalone, no features)
and `cargo check -p nominal-streaming --features polars` both clean
176237b to
4cf1814
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Introduces a pure-Rust
AvroWriterprimitive innominal-streamingplus a thinNominalAvroWriterPyO3 facade inpy-nominal-streaming. Both wrapNominalDatasetStreamand target callers who want a single avro file on disk (optionally rotated). Crate users get an idiomatic Rust API; Python callers get the same wheel surface as before.On top of the base writer the branch carries three performance / correctness improvements:
write_dataframe— all columns of a polars frame are accumulated into one(ChannelDescriptor, PointsType)batch and handed toNominalDatasetStream::enqueue_batch. One buffer-lock acquisition per frame instead of one per column.Recordbuild — a newParallelAvroFileConsumerfans out the per-seriesPoints → Value tree → Recordbuild acrossstd::thread::scopeworkers before the serialwriter.lock().extend(records). Stdlib only, zero new deps. The core streaming path'sAvroFileConsumeris untouched, so other callers are unaffected.nullinstead of silently dropping the field.Plus a
.stats()pipeline-timing hook and apy-nominal-streaming/benchmarks/directory with parameterized bench tooling for future regression tracking.Why not just a thin wrapper over
NominalDatasetStream?A caller could already write one avro file today by pointing a stream at a path via
.to_file(path)and driving it directly — the head-to-head benchmarks (section 1b) show that idiom within ±7% of this branch. Throughput isn't the reason a separate writer exists. The reasons are:File lifecycle.
NominalDatasetStreamis network-first; it has no concept of "the output file." The writer owns truncate-on-open (fresh file per writer instance), fsync-on-close, idempotentclose()viaOnceLock, and drop-safety across clones (drop_mutex+Arc::strong_count). A wrapper that replicates this state machine is not thin.Rotation.
max_points_per_filewith filename template<stem>_<index:03d><suffix>drains the current stream, fsyncs, opens a new stream pointed at the next path, and atomically swaps — trackingfile_index,points_in_current, andfinalized_pathsacross the transition. Rotation-aware row slicing is baked intowrite_dataframeandwrite_batchso a single call can straddle a rotation boundary.Error latching. Consumer errors inside
NominalDatasetStreamdie on the dispatcher thread. AnErrorLatchingConsumerwraps the consumer and captures the first error viaArc<OnceLock<Arc<AvroWriterError>>>, then surfaces it on every subsequentwrite_*/flush/sync/close. A write-to-disk API has to tell the caller their data didn't land; a best-effort streaming API doesn't.Consumer choice +
write_dataframe. The writer installs aParallelAvroFileConsumer(scoped-thread Record build before the serialwriter.extend; +10–12% at 200M+ points) and exposes a singlewrite_dataframe(frame, "ts")that handles per-column dispatch, rotation-aware slicing, and documented null/NaN/Inf semantics (table below). The hand-rolled stream-plus-loop idiom has to reimplement each of these — and every downstream caller gets them subtly wrong in a different way.Typed
.stats()hook.PipelineStatscarries named atomic counters specific to the file-writer pipeline (df_handoff_ns,column_build_ns,enqueue_batch_ns,consumer_consume_ns,write_dataframe_total_ns) — not part of the stream's surface. Makes bottleneck attribution a one-linewriter.stats()call.The value is in file-lifecycle guarantees + dataframe-oriented ergonomics layered on top of the stream. The stream remains the right primitive for network-first, multi-consumer, long-lived workloads; the writer is the right primitive for "I want N avro files on disk."
Architecture
The branch splits across both crates: the pure-Rust core lives in
nominal-streaming, the PyO3 facade inpy-nominal-streaming. Pure-Rust users depend only onnominal-streaming; Python callers consume the wheel exactly as they do today.nominal-streaming(pure-Rust core)src/avro_writer/mod.rsAvroWriter,AvroWriterOpts,AvroWriterError,PipelineStats)src/avro_writer/writer.rsAvroWriter— genericwrite<T>,write_batch,close/flush/sync, rotation,Dropsrc/avro_writer/opts.rsAvroWriterOpts— plain builder (no#[pyclass])src/avro_writer/state.rsAvroWriterInnershared viaArc; holds stream, error latch, rotation counterssrc/avro_writer/error.rsAvroWriterError+From<ConsumerError>src/avro_writer/consumer.rsParallelAvroFileConsumer(scoped-thread Record build) +ErrorLatchingConsumerwrappersrc/avro_writer/helpers.rssrc/avro_writer/stats.rsPipelineStatsatomic counterssrc/avro_writer/polars.rsimpl AvroWriter { pub fn write_dataframe }— gated on thepolarsCargo featurewrite_dataframeis behind#[cfg(feature = "polars")]. Rust callers that only usewrite<T>/write_batchpay nopolarsdependency.py-nominal-streaming(PyO3 facade)src/avro_writer/mod.rssrc/avro_writer/writer.rs#[pyclass] NominalAvroWriter { inner: AvroWriter }src/avro_writer/opts.rs#[pyclass] NominalAvroWriterOpts { inner: AvroWriterOpts }src/avro_writer/pymethods.rs#[pymethods]block — extract Python args,py.detach()into the coresrc/avro_writer/polars_ffi.rssrc/avro_writer/error_map.rsArc<AvroWriterError> → PyErrpython/nominal_streaming/_nominal_streaming.pyipy-nominal-streaming/Cargo.tomlpulls in the core withfeatures = ["logging", "polars"]— this activatesAvroWriter::write_dataframeinside the core crate (verified by removingpolarsfrom the feature list and watchingcargo checkerror on the missing method). The facade does Python-side input validation (timestamp column present + non-null + correct dtype →PyValueError/PyTypeError) before delegating, preserving the existing Python error contract.Pipeline
The writer delegates the hot path to
NominalDatasetStream— inheriting its primary/secondary buffer + batch-processor + request-dispatcher pipeline. Additions on top:write<T>API overIntoPoints— one method covers all six proto point types.write_dataframe(df, ts_col, tags=None)— per-column dispatch for Float64, Int64, String, Struct, List/Array of scalar or string.max_points_per_filewith filename template<stem>_<index:03d><suffix>. No separateRotatingAvroWriterclass.ErrorLatchingConsumerwrappingAvroFileConsumer, surfaced on every subsequentwrite_*/flush/sync/close.close()viaOnceLock;close_lock: RwLockserializes writes vs close;drop_mutex: Mutexmakes concurrent drops race-free.new()— fresh file per writer instance.Null / NaN / Infinity semantics
Float64Int64StringStruct(…)nullnullList(*)/Array(*, _)null in timestamp column)The principle: polars null ⇒ "no data at this timestamp"; NaN / ±Inf ⇒ real IEEE-754 values that round-trip through avro. Struct non-finite floats encode as JSON
nullbecause spec JSON has no NaN / Inf tokens — indistinguishable from a polars null on read, but the containing object's shape is preserved (previously the field was silently dropped).Performance characterization
All numbers: Apple Silicon M-series, CPython 3.12, release wheel, snappy-compressed avro, local SSD. Reruns are within ±3% variance unless noted.
1. Peak and baseline comparison
fastavro+ 8-thread poolParallel record-build lifts by +12.1% at 1B scale; matches the +10% observed at 200M. Consistent across scale. ~8× faster than the pure-Python
fastavrofallback at the default shape.1b. Head-to-head vs
PyNominalDatasetStream.to_fileThe baseline is the idiom a Python caller could write with the existing pre-branch API — point a
PyNominalDatasetStreamat a file via.to_file(path), loop the polars frame, callenqueue_batch(channel, ts_list, values_list)once per column per frame. Both paths produce the same avro schema to the same on-disk format; they differ only in how the caller drives them.Run via
benchmarks/head_to_head.py(200M total Float64 points,fsync_on_close=Falseon both sides for apples-to-apples):Within ±7% across the entire grid — the two idioms deliver equivalent throughput. The consumer-side avro encoder is the shared bottleneck for both; the per-column
enqueue_batchpath amortizes its per-frame FFI crossings well because each crossing already does genuine work (a whole column's worth of points at a time).The win of
write_dataframeis ergonomic, not raw speed. Callers swap a hand-rolled column-dispatch loop:…for one line:
Plus
write_dataframebakes in documented null/NaN/Inf semantics (see section above), built-in rotation, pipeline observability via.stats(), and first-error latching — all of which the hand-rolled loop either skips or gets wrong.2. Pipeline attribution at default shape (1000 cols × 200M pts × 1M frames, Float64)
Consumer saturation: 95.2% of wall → the downstream avro encoder+writer is the bottleneck;
enqueue_batch_nsgrows because the producer is backpressured on a full buffer. Producer CPU is only 12% of wall.3. Frame-size sweep (1000 cols × 200M pts, Float64)
Throughput is flat within 12% across a 200× range in frame size. Below 2M-point frames the consumer is saturated (≥94% util). At 5M-point frames the consumer has headroom and the producer becomes the limiter. Past 5M, per-frame allocation cost starts to erode gains.
4. Column-count sweep (200M pts × 1M frame, Float64)
writer.extendgrows monotonically with channel count because the avro encoder pays a per-Record overhead (schema lookup, tags map, array headers) that doesn't amortize over points inside the record. At 5000 cols eachWriteRequestproduces 5000 Records; at 10 cols only 10.5. 2D surface (cols × frame_points, 200M pts, Float64)
Throughput (M pts/s):
Consumer utilization (% of wall):
Two regimes separated cleanly by frame size (not column count):
Diagonal trade-off: bigger frames help wide workloads but hurt narrow ones. 5000 cols goes 21.1 → 23.7 M (+13%) when frames grow 250K → 5M; 10 cols goes 29.4 → 26.0 (−12%) on the same move.
Overall spread across the 4×3 grid: 1.39× (worst 21.1 M, best 29.5 M).
6. Dtype matrix (100 & 1000 cols × 100K / 1M frames, 50M pts per cell)
Relative ordering is stable across shape:
Value::Doubleper point); fastest.String::to_string()clone + variable-width avro encode.List(Float64)is cast toFloat64per row then each element allocates aValue::Double; avro arrays have per-element framing.serde_json::to_stringis the cost; struct is encoded as a JSON string inside avro's record union.Raw per-point numbers understate string/array/struct throughput — array moves 8 floats per point, so the effective byte throughput is ~118 MB/s data plus avro/snappy overhead.
The 50M-point throughput is lower than the 200M-point throughput for the same shape (float 16 M vs 24 M) because buffer-fill + encoder warm-up are a bigger fraction of short runs.
7. Rotation overhead (1000 cols × 100M pts × 1M frames, Float64)
Rotation cost is small while each file holds many frames. Rotating on every frame is catastrophic — 3.3× wall-time penalty. Once
max_points_per_file ≤ frame_points, every frame forces a rotation (drain stream, fsync, reopen stream). Three of the rows above show identical 3.3× penalty because they all collapse to "one rotation per frame."Practical guidance: set
max_points_per_filesuch that at least ~10 frames fit per file.8. Scaling confirmation at 1B points (1000 cols × 1M frames, Float64)
points_to_avrophaseThe
points_to_avrophase drops 2.6× with parallelization. Wall-time gain is smaller (+12%) because the consumer remains saturated — savings insidepoints_to_avroshift into the serializedwriter.extendstage rather than removing wall.Gaps — what was NOT tested
--dtypesflag only coversfloat / string / array / struct; Int64 is expected to match Float64 qualitatively (same scalar arm, same point size) but not benchmarked directly.write_dataframeis single-threaded on the caller side. Multi-producer throughput throughenqueueis unit-tested for correctness but not benched.apache_avro::Codec::Snappy. Deflate / Zstandard / null would shift thewriter.extendbreakdown.writer.extend.Observability hook
NominalAvroWriter.stats()(andAvroWriter::stats()on the Rust side) returns a snapshot of nanosecond counters for pipeline-attribution:df_handoff_ns— Arrow C Data Interface FFI (only populated bywrite_dataframe)extract_ts_ns— timestamp column extractioncolumn_build_ns— per-column polars-chunk extraction + point-struct buildingenqueue_batch_ns— producer wall into the stream (includes blocked-on-buffer)consumer_consume_ns/_calls— dispatcher-side avro encode + writewrite_dataframe_calls/_total_ns— metaRule of thumb:
consumer_consume_ns ≈ wall_clock→ consumer bottleneck.producer_cpu_ns ≈ wall_clock→ producer bottleneck.Bench tooling
Four scripts under
py-nominal-streaming/benchmarks/:bench_nominal_avro_writer.py— parameterized CSV sweep (dtype × cols × total × frame).pipeline_profile.py— single-run flame-graph-style attribution usingstats().sweep.py— 1D / 2D characterization matrix.head_to_head.py—NominalAvroWriter.write_dataframevsPyNominalDatasetStream.to_file+ per-columnenqueue_batch. Use this whenever referencing the old-idiom comparison.Dependencies + build
polarsCargo feature; enabling it activatesAvroWriter::write_dataframe. Pure-Rust callers who skip the feature pull nopolarsdependency.py-nominal-streamingdepends on the core withfeatures = ["logging", "polars"]and keepspolars+polars-arrowas direct deps for the Arrow C Data Interface FFI.polars>=1,<2as a hard Python runtime dep (write_dataframeuses it directly via the Arrow C Data Interface).extension-moduleis a Cargo feature onpy-nominal-streamingthat forwards topyo3/extension-module. Maturin enables it via[tool.maturin].features;cargo testruns without it.crate-type = ["cdylib", "rlib"]socargo test/cargo docwork alongside the wheel build.Test plan
nominal-streaming(25 tests: 9 stream + 1 consumer + 15 writer): roundtrip all 6 value types, batch boundaries, multi-producer, error latching + Io variant preservation, drop-without-close, close idempotency, flush+sync, concurrent write/close race, simultaneous-drop race, truncate-on-reopen, rotation, straddle-across-rotation.just checkpasses (ruff, mypy, cargo fmt).py-nominal-streaming/benchmarks/.