Parallelize chunk reads for atlas & other nd-array formats#264
Open
robinskil wants to merge 2 commits into
Open
Parallelize chunk reads for atlas & other nd-array formats#264robinskil wants to merge 2 commits into
robinskil wants to merge 2 commits into
Conversation
Read a dataset's chunks concurrently across tokio worker threads instead of serially, and let a single atlas store be processed by multiple DataFusion partitions. Part A (beacon-nd-array): thread an explicit `concurrency` parameter through `any_dataset_as_record_batch_stream` / `dataset_as_record_batch_stream` and the ragged path, and replace the sequential `.then(...)` chunk loop with `map -> tokio::spawn -> .buffered(N)`. Each chunk's read + broadcast + Arrow conversion now runs on a worker thread (mirroring BBF's per-file task fan-out) while emission order is preserved and in-flight work is bounded to N. Benefits atlas, netcdf, zarr, and tiff at once. Part B (beacon-arrow-atlas): implement `AtlasSource::repartitioned` to replicate the marker across `target_partitions` file groups that share the source's stream_cache/SegQueue, so partitions work-steal datasets (drain-once preserved). netcdf/tiff document why single-file repartitioning is intentionally deferred. Adds nd-array tests for concurrent overlap, serial mode, error propagation, and metrics interleaving-independence, plus atlas repartition unit + end-to-end tests.
Contributor
There was a problem hiding this comment.
Pull request overview
This PR improves throughput for nd-array-backed formats by introducing bounded, ordered concurrent chunk (and ragged cast) reads in beacon-nd-array, and adds DataFusion-level repartitioning for atlas so a single store can be processed by multiple partitions.
Changes:
- Add a
concurrencyparameter (plusdefault_chunk_concurrency()) to nd-array → ArrowRecordBatchstreaming APIs and switch chunk/cast iteration to concurrent.buffered(N)execution. - Update downstream format integrations (atlas/netcdf/zarr/tiff) to pass a default concurrency value into the shared stream constructors.
- Implement
AtlasSource::repartitionedto replicate marker files across partitions and work-steal datasets via a shared queue/cache, with accompanying tests.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| beacon-nd-array/src/arrow/batch.rs | Adds concurrency controls and tokio-spawned buffered chunk/cast reading; expands tests for ordering/overlap/errors/metrics. |
| beacon-nd-array/README.md | Documents new stream function signatures and how to choose concurrency. |
| beacon-arrow-zarr/src/reader.rs | Updates tests to include the new concurrency argument. |
| beacon-arrow-zarr/src/datafusion/source.rs | Uses default_chunk_concurrency() when creating the dataset stream. |
| beacon-arrow-tiff/src/datafusion/source.rs | Uses default_chunk_concurrency() and documents why repartitioning is not implemented. |
| beacon-arrow-netcdf/src/reader.rs | Updates tests to include the new concurrency argument. |
| beacon-arrow-netcdf/src/datafusion/source.rs | Uses default_chunk_concurrency() and documents why repartitioning is not implemented. |
| beacon-arrow-atlas/src/datafusion/source.rs | Implements repartitioned and uses default_chunk_concurrency() in the opener. |
| beacon-arrow-atlas/src/datafusion/mod.rs | Adds an end-to-end test to ensure repartitioned scans preserve all rows. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
213
to
221
| Ok((ragged, schema, obs_dims, offsets, passing_indices, ranges, metrics)) => { | ||
| futures::stream::iter(ranges) | ||
| .then(move |range| { | ||
| .map(move |range| { | ||
| let ragged = ragged.clone(); | ||
| let schema = schema.clone(); | ||
| let obs_dims = obs_dims.clone(); | ||
| let offsets = offsets.clone(); | ||
| let passing_indices = passing_indices.clone(); | ||
| let metrics = metrics.clone(); |
Comment on lines
+264
to
271
| match tokio::runtime::Handle::try_current() { | ||
| Ok(_) => futures::future::Either::Left(async move { | ||
| tokio::spawn(fut).await.unwrap_or_else(|e| { | ||
| Err(anyhow::anyhow!("ragged cast read task failed: {e}")) | ||
| }) | ||
| }), | ||
| Err(_) => futures::future::Either::Right(fut), | ||
| } |
Comment on lines
+604
to
611
| match tokio::runtime::Handle::try_current() { | ||
| Ok(_) => futures::future::Either::Left(async move { | ||
| tokio::spawn(fut).await.unwrap_or_else(|e| { | ||
| Err(anyhow::anyhow!("chunk read task failed: {e}")) | ||
| }) | ||
| }), | ||
| Err(_) => futures::future::Either::Right(fut), | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Atlas files are read lazily and harmonized into a stream of Arrow
RecordBatches through the sharedbeacon-nd-arraycrate, as are netcdf, zarr, and tiff. Until now that path read a dataset's chunks strictly sequentially (stream::iter(subsets).then(...)), so each chunk's storage read + Arrow conversion blocked the next. The native beacon binary format (BBF), by contrast, polls a single file across many tokio tasks.This PR gives atlas — and generically every nd-array format — the same multi-threaded "polled from multiple threads" behavior, at two stacking levels.
Part A — generic concurrent chunk reading (
beacon-nd-array)concurrency: usizeparameter toany_dataset_as_record_batch_stream,dataset_as_record_batch_stream, and the ragged variant, plus adefault_chunk_concurrency()helper (core-count default).map → tokio::spawn → .buffered(N):tokio::spawndistributes each chunk's read + broadcast +ndarray_to_arrow_arrayacross runtime worker threads (true multi-core), mirroring BBF's task fan-out..buffered(N)preserves emission order (existing positional tests stay valid) and bounds in-flight work toN, giving the same bounded-memory backpressure as BBF's semaphore + bounded channel.Handle::try_current()Eitherguard keeps the stream correct if ever polled off-runtime.Part B — DataFusion repartitioning for atlas (
beacon-arrow-atlas)AtlasSource::repartitionedto replicate the marker acrosstarget_partitionsfile groups that share the source'sstream_cache/SegQueue. The N partition openers then work-steal datasets off the shared queue (drain-once invariant preserved), so a single atlas store is processed by multiple partitions whose downstream operators parallelize — on top of Part A's per-dataset chunk concurrency.Tests
beacon-nd-array: new tests for concurrent overlap (max_in_flight > 1),concurrency=1serial, error propagation, and metrics interleaving-independence — plus all existing positional chunk/ragged tests still pass, proving ordering is preserved.beacon-arrow-atlas: a directrepartitionedunit test and an end-to-endtarget_partitions=4scan confirming no rows lost or duplicated.beacon-nd-array,beacon-arrow-{atlas,netcdf,zarr,tiff};beacon-formatsbuilds cleanly downstream.Notes
default_chunk_concurrency()defaults to core count. If atlas oversubscription (in-openerbuffer_unordered(8)× chunk concurrency × partitions) ever bites under load, that is the knob to lower.