Skip to content

Parallelize chunk reads for atlas & other nd-array formats#264

Open
robinskil wants to merge 2 commits into
mainfrom
claude/frosty-grothendieck-522b5f
Open

Parallelize chunk reads for atlas & other nd-array formats#264
robinskil wants to merge 2 commits into
mainfrom
claude/frosty-grothendieck-522b5f

Conversation

@robinskil

Copy link
Copy Markdown
Collaborator

Motivation

Atlas files are read lazily and harmonized into a stream of Arrow RecordBatches through the shared beacon-nd-array crate, as are netcdf, zarr, and tiff. Until now that path read a dataset's chunks strictly sequentially (stream::iter(subsets).then(...)), so each chunk's storage read + Arrow conversion blocked the next. The native beacon binary format (BBF), by contrast, polls a single file across many tokio tasks.

This PR gives atlas — and generically every nd-array format — the same multi-threaded "polled from multiple threads" behavior, at two stacking levels.

Part A — generic concurrent chunk reading (beacon-nd-array)

  • Adds an explicit concurrency: usize parameter to any_dataset_as_record_batch_stream, dataset_as_record_batch_stream, and the ragged variant, plus a default_chunk_concurrency() helper (core-count default).
  • Replaces the sequential chunk loop (regular and ragged paths) with map → tokio::spawn → .buffered(N):
    • tokio::spawn distributes each chunk's read + broadcast + ndarray_to_arrow_array across runtime worker threads (true multi-core), mirroring BBF's task fan-out.
    • ordered .buffered(N) preserves emission order (existing positional tests stay valid) and bounds in-flight work to N, giving the same bounded-memory backpressure as BBF's semaphore + bounded channel.
    • A Handle::try_current() Either guard keeps the stream correct if ever polled off-runtime.
  • Benefits atlas, netcdf, zarr, tiff with one change.

Part B — DataFusion repartitioning for atlas (beacon-arrow-atlas)

  • Implements AtlasSource::repartitioned to replicate the marker across target_partitions file groups that share the source's stream_cache/SegQueue. The N partition openers then work-steal datasets off the shared queue (drain-once invariant preserved), so a single atlas store is processed by multiple partitions whose downstream operators parallelize — on top of Part A's per-dataset chunk concurrency.
  • netcdf/tiff document why single-file repartitioning is intentionally deferred (Part A covers intra-file parallelism; ListingTable covers cross-file).

Tests

  • beacon-nd-array: new tests for concurrent overlap (max_in_flight > 1), concurrency=1 serial, error propagation, and metrics interleaving-independence — plus all existing positional chunk/ragged tests still pass, proving ordering is preserved.
  • beacon-arrow-atlas: a direct repartitioned unit test and an end-to-end target_partitions=4 scan confirming no rows lost or duplicated.
  • Full suites pass for beacon-nd-array, beacon-arrow-{atlas,netcdf,zarr,tiff}; beacon-formats builds cleanly downstream.

Notes

  • default_chunk_concurrency() defaults to core count. If atlas oversubscription (in-opener buffer_unordered(8) × chunk concurrency × partitions) ever bites under load, that is the knob to lower.

Read a dataset's chunks concurrently across tokio worker threads instead
of serially, and let a single atlas store be processed by multiple
DataFusion partitions.

Part A (beacon-nd-array): thread an explicit `concurrency` parameter
through `any_dataset_as_record_batch_stream` / `dataset_as_record_batch_stream`
and the ragged path, and replace the sequential `.then(...)` chunk loop
with `map -> tokio::spawn -> .buffered(N)`. Each chunk's read + broadcast
+ Arrow conversion now runs on a worker thread (mirroring BBF's per-file
task fan-out) while emission order is preserved and in-flight work is
bounded to N. Benefits atlas, netcdf, zarr, and tiff at once.

Part B (beacon-arrow-atlas): implement `AtlasSource::repartitioned` to
replicate the marker across `target_partitions` file groups that share
the source's stream_cache/SegQueue, so partitions work-steal datasets
(drain-once preserved). netcdf/tiff document why single-file
repartitioning is intentionally deferred.

Adds nd-array tests for concurrent overlap, serial mode, error
propagation, and metrics interleaving-independence, plus atlas
repartition unit + end-to-end tests.
Copilot AI review requested due to automatic review settings June 16, 2026 09:49

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves throughput for nd-array-backed formats by introducing bounded, ordered concurrent chunk (and ragged cast) reads in beacon-nd-array, and adds DataFusion-level repartitioning for atlas so a single store can be processed by multiple partitions.

Changes:

  • Add a concurrency parameter (plus default_chunk_concurrency()) to nd-array → Arrow RecordBatch streaming APIs and switch chunk/cast iteration to concurrent .buffered(N) execution.
  • Update downstream format integrations (atlas/netcdf/zarr/tiff) to pass a default concurrency value into the shared stream constructors.
  • Implement AtlasSource::repartitioned to replicate marker files across partitions and work-steal datasets via a shared queue/cache, with accompanying tests.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
beacon-nd-array/src/arrow/batch.rs Adds concurrency controls and tokio-spawned buffered chunk/cast reading; expands tests for ordering/overlap/errors/metrics.
beacon-nd-array/README.md Documents new stream function signatures and how to choose concurrency.
beacon-arrow-zarr/src/reader.rs Updates tests to include the new concurrency argument.
beacon-arrow-zarr/src/datafusion/source.rs Uses default_chunk_concurrency() when creating the dataset stream.
beacon-arrow-tiff/src/datafusion/source.rs Uses default_chunk_concurrency() and documents why repartitioning is not implemented.
beacon-arrow-netcdf/src/reader.rs Updates tests to include the new concurrency argument.
beacon-arrow-netcdf/src/datafusion/source.rs Uses default_chunk_concurrency() and documents why repartitioning is not implemented.
beacon-arrow-atlas/src/datafusion/source.rs Implements repartitioned and uses default_chunk_concurrency() in the opener.
beacon-arrow-atlas/src/datafusion/mod.rs Adds an end-to-end test to ensure repartitioned scans preserve all rows.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 213 to 221
Ok((ragged, schema, obs_dims, offsets, passing_indices, ranges, metrics)) => {
futures::stream::iter(ranges)
.then(move |range| {
.map(move |range| {
let ragged = ragged.clone();
let schema = schema.clone();
let obs_dims = obs_dims.clone();
let offsets = offsets.clone();
let passing_indices = passing_indices.clone();
let metrics = metrics.clone();
Comment on lines +264 to 271
match tokio::runtime::Handle::try_current() {
Ok(_) => futures::future::Either::Left(async move {
tokio::spawn(fut).await.unwrap_or_else(|e| {
Err(anyhow::anyhow!("ragged cast read task failed: {e}"))
})
}),
Err(_) => futures::future::Either::Right(fut),
}
Comment on lines +604 to 611
match tokio::runtime::Handle::try_current() {
Ok(_) => futures::future::Either::Left(async move {
tokio::spawn(fut).await.unwrap_or_else(|e| {
Err(anyhow::anyhow!("chunk read task failed: {e}"))
})
}),
Err(_) => futures::future::Either::Right(fut),
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants