Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ _build/*
.fleet
.zed
.ipynb_checkpoints/
.venv-dask/
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ polars = { version = "0.51", features = [
"pivot"
] }
ndarray = { version = "0.16", features = ["rayon"] }
rayon = "1"
nalgebra-sparse = "0.11.0"
anyhow = "1.0.86"
log = "0.4.27"
Expand Down
102 changes: 102 additions & 0 deletions demo/OOC_BENCHMARK.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Out-of-core preprocessing — memory vs scanpy

> **Tier-1 OOC pipeline is complete.** All of QC, `normalize_total`, `log1p`, a fused
> `preprocess`, highly variable genes (Seurat), and PCA run disk-backed in bounded memory, exposed
> as `sr_ooc <cmd>` and `sr.pp.*` (drop-in for `sc.pp.*`). Each step is unit-tested against its
> in-memory / dense counterpart: log1p & normalize match scanpy's X exactly; QC matches
> (totals/mito/top-N/var); HVG selects the **same** genes as SingleRust's in-memory HVG; PCA
> matches a dense covariance-PCA reference (variance ratios exact, embedding norms match, up to
> per-component sign). End-to-end vs scanpy, the PCA *spectrum* tracks scanpy but the exact axes
> differ because SingleRust's Seurat HVG picks a different gene set than scanpy's (a pre-existing
> in-memory difference, not introduced by the OOC path).

The benchmark below covers the QC + normalize_total + log1p portion.

## Deterministic parallelism

The compute-bound passes — QC accumulation, HVG sum/sum-of-squares, and PCA's gene×gene Gram
matrix — are parallelized with rayon. Naive parallel floating-point reduction is **not**
reproducible (FP addition isn't associative and rayon's work-stealing varies the summation order),
so all reductions use a **fixed-block, ordered-merge** scheme (`backed::processing::det`): rows are
cut into fixed-size blocks at fixed indices, each block is folded sequentially, and partials are
merged in block order. The partition and merge order depend only on the data size and a constant
block size — never on thread count or scheduling — so results are bit-identical on every run.

Verified two ways:
- **Unit tests** run QC / HVG / preprocess / PCA under rayon pools of 1 vs 8 threads and assert
bit-identical metrics, masks, embeddings, and variance ratios.
- **At scale (50k cells, real data):** the full pipeline (`preprocess → hvg → pca`) run with
`RAYON_NUM_THREADS=1` vs `=18` produced **bit-identical** X, obs QC columns, var HVG columns,
`obsm["X_pca"]`, and `uns` variance ratios.

(The cheap elementwise transforms — normalize/log1p — have no cross-row reduction and are
deterministic by construction; PCA projection is per-cell independent, also order-free.)


Same work in both lanes (QC + `normalize_total(1e4)` + `log1p`) on the same `.h5ad`, each run as
a subprocess under `/usr/bin/time -l` to capture **peak RSS** and wall time.

```bash
cargo build --release --example sr_ooc
python demo/bench_ooc.py data/bench_input_500k.h5ad 20000
```

## Result (500,000 cells × 48,788 genes, 48 GB / 18-core)

| lane | time | peak RSS |
|---|---:|---:|
| scanpy in-memory | 13.5 s (compute) | 13.6 GB |
| scanpy + Dask OOC | 35.4 s (compute) | 14.5 GB |
| **SingleRust OOC (fused)** | 45.0 s (wall, incl. I/O) | **2.5 GB** |

**SingleRust OOC uses ~5.4× less peak memory than scanpy in-memory and ~5.8× less than
scanpy+Dask**, and that footprint is chunk-bounded — it stays roughly flat as the cell count
grows, whereas scanpy in-memory scales linearly and eventually OOMs (≈46 GB at 2M cells, over a
48 GB machine). That is the point of out-of-core: process data that does not fit in RAM.

The standout finding is the **Dask lane**: its sparse out-of-core path gives essentially **no
memory benefit** — peak RSS lands at/above scanpy in-memory (it has measured 7.5–14.5 GB across
runs, i.e. ≥ in-memory) while still costing 2.6× the runtime. This is exactly the Dask-sparse
immaturity scanpy's own issues describe. Native Rust streaming is the only lane with truly bounded
memory (2.5 GB).

The SingleRust lane is a **single fused command** (`sr_ooc preprocess` = QC + normalize_total +
log1p in one job; the per-cell total computed for QC is reused as the normalization row-sum, so
it's computed once). Fusing cut its wall time from 128 s (three separate ops + a working copy) to
45 s — now in the same ballpark as Dask on time, at ~1/6th the memory.

### Caveats / honest reading

- **Time is not apples-to-apples.** scanpy's number is compute-only (excludes its initial load);
the Dask lane includes a lazy read + chunked write-out; SingleRust's wall time includes a 6 GB
working-copy plus two full read/write passes to disk as *separate* CLI processes. A fused
single-pass pipeline (+ chunk-size tuning, parallel chunk processing) would cut SingleRust's time
substantially. **Peak RSS is the fair, durable signal** and it is immune to thermal/throttling.
- SingleRust OOC RSS includes one chunk (here 20k rows) plus small per-cell / per-gene
accumulators; smaller chunks lower it further.
- Both scanpy lanes are run with the newer `.venv-dask` stack (anndata 0.12, scanpy 1.12, dask)
for consistency; SingleRust is the `sr_ooc` release binary.

### Projection to 2M cells (not run — memory safety)

At ~2M cells the count matrix is ≈46 GB dense-equivalent; scanpy in-memory would exceed the 48 GB
machine (OOM/heavy swap), and the Dask lane's ~0.6× memory ratio still lands near the ceiling.
SingleRust OOC stays ~2 GB (chunk-bounded) and is the only lane that completes comfortably. We
did **not** run 2M here to avoid destabilizing the machine; it needs a ~32 GB base dataset and a
box with headroom.

## Reproducing the Dask lane

The Dask lane needs `anndata>=0.11` (`experimental.read_elem_lazy`) + dask + xarray, which require
Python ≥3.10 — kept in an isolated venv so the main 3.9 demo env is untouched:

```bash
python3.12 -m venv .venv-dask
. .venv-dask/bin/activate
pip install "anndata>=0.11" "scanpy>=1.11" "dask[array]" xarray hdf5plugin h5py scipy numpy
```

`bench_ooc.py` auto-uses `.venv-dask/bin/python` for both scanpy lanes (override with `SR_PY`).
Relevant scanpy issues: [#4095](https://github.com/scverse/scanpy/issues/4095),
[dask#11880](https://github.com/dask/dask/issues/11880),
[pydata/sparse#860](https://github.com/pydata/sparse/issues/860).
Binary file added demo/__pycache__/singlerust.cpython-312.pyc
Binary file not shown.
65 changes: 65 additions & 0 deletions demo/_scanpy_dask_pp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""scanpy OUT-OF-CORE via Dask: read only X lazily as a dask array (obs/var eager + small), run
dask-enabled normalize_total + log1p (and attempt qc), then stream the result to disk to force
chunked execution.

Requires anndata>=0.11 + dask + xarray. Run with the isolated .venv-dask. Prints
STEP_SECONDS=<compute wall> and QC_DASK=ok|err:<msg>. Peak RSS is captured by the caller via
/usr/bin/time -l; if Dask's sparse path densifies/materializes, RSS spikes — itself the finding.

python demo/_scanpy_dask_pp.py <file.h5ad> [chunk_rows]
"""
import sys
import time
import tempfile
import os

import hdf5plugin # noqa: F401
import h5py
import anndata as ad
import scanpy as sc
from anndata.experimental import read_elem_lazy
from anndata.io import read_elem


def main():
path = sys.argv[1]
chunk = int(sys.argv[2]) if len(sys.argv) > 2 else 20000

# Keep the file open for the whole run — the lazy dask array reads from it on compute.
f = h5py.File(path, "r")
try:
X = read_elem_lazy(f["X"], chunks=(chunk, -1)) # dask array, streamed in row blocks
obs = read_elem(f["obs"]) # eager, small
var = read_elem(f["var"])
adata = ad.AnnData(X=X, obs=obs, var=var)

t = time.perf_counter()

qc_status = "ok"
try:
adata.var["mito"] = adata.var_names.str.upper().str.startswith("MT-")
sc.pp.calculate_qc_metrics(
adata, qc_vars=["mito"], percent_top=[50, 100, 200, 500], log1p=True, inplace=True
)
except Exception as e:
qc_status = f"err:{type(e).__name__}:{str(e)[:120]}"

sc.pp.normalize_total(adata, target_sum=1e4)
sc.pp.log1p(adata)

# Force streaming execution by writing the transformed X out (chunked).
out = tempfile.NamedTemporaryFile(suffix=".h5ad", delete=False).name
try:
adata.write_h5ad(out)
finally:
if os.path.exists(out):
os.remove(out)

print(f"STEP_SECONDS={time.perf_counter() - t}")
print(f"QC_DASK={qc_status}")
finally:
f.close()


if __name__ == "__main__":
main()
29 changes: 29 additions & 0 deletions demo/_scanpy_inmem_pp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""scanpy IN-MEMORY qc + normalize_total + log1p on an .h5ad (loads the whole matrix).

Used by the OOC benchmark as the baseline lane. Prints STEP_SECONDS=<total compute>.
Peak RSS (captured by the caller via /usr/bin/time -l) scales with the dataset — that's the
point of the comparison against SingleRust's bounded-memory OOC lane.
"""
import sys
import time

import hdf5plugin # noqa: F401
import scanpy as sc
import anndata as ad


def main():
path = sys.argv[1]
adata = ad.read_h5ad(path) # full load into memory
adata.var["mito"] = adata.var_names.str.upper().str.startswith("MT-")
t = time.perf_counter()
sc.pp.calculate_qc_metrics(
adata, qc_vars=["mito"], percent_top=[50, 100, 200, 500], log1p=True, inplace=True
)
sc.pp.normalize_total(adata, target_sum=1e4)
sc.pp.log1p(adata)
print(f"STEP_SECONDS={time.perf_counter() - t}")


if __name__ == "__main__":
main()
110 changes: 110 additions & 0 deletions demo/bench_ooc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""Out-of-core memory benchmark: scanpy in-memory vs SingleRust OOC.

Same work in both lanes — QC + normalize_total(1e4) + log1p — on the same .h5ad. Each lane runs
as a subprocess under /usr/bin/time -l so we capture **peak RSS** (the headline) and compute time:

* scanpy in-memory : loads the whole matrix; peak RSS scales with the dataset.
* SingleRust OOC : streams the file in chunks; peak RSS stays ~bounded.

Usage:
python demo/bench_ooc.py <file.h5ad> [chunk_size]

(A scanpy+Dask out-of-core lane needs anndata>=0.11's read_elem_as_dask; see notes in the
performance roadmap. This harness leaves a hook for it.)
"""
import os
import re
import subprocess
import sys
import pathlib

ROOT = pathlib.Path(__file__).resolve().parent.parent
BIN = ROOT / "target" / "release" / "examples" / "sr_ooc"
ENV = dict(os.environ)
ENV["PATH"] = "/opt/homebrew/bin:" + str(pathlib.Path.home() / ".cargo" / "bin") + ":" + ENV.get("PATH", "")
ENV.setdefault("HDF5_USE_FILE_LOCKING", "FALSE")

# Python with the newer stack (anndata>=0.11 + dask) for both scanpy lanes; override with SR_PY.
PY = os.environ.get("SR_PY", str(ROOT / ".venv-dask" / "bin" / "python"))


def run_timed(cmd):
"""Run under /usr/bin/time -l; return (compute_seconds|None, peak_rss_gb)."""
p = subprocess.run(["/usr/bin/time", "-l", *cmd], cwd=ROOT, env=ENV, capture_output=True, text=True)
if p.returncode:
sys.stderr.write(p.stdout[-1000:] + "\n" + p.stderr[-2000:] + "\n")
raise RuntimeError(f"failed: {' '.join(map(str, cmd))}")
secs = None
for line in p.stdout.splitlines():
if line.startswith("STEP_SECONDS="):
secs = float(line.split("=", 1)[1])
m = re.search(r"(\d+)\s+maximum resident set size", p.stderr)
rss_gb = int(m.group(1)) / 1e9 if m else float("nan") # macOS: bytes
w = re.search(r"([\d.]+)\s+real", p.stderr) # wall time from /usr/bin/time -l
wall = float(w.group(1)) if w else float("nan")
return secs if secs is not None else wall, rss_gb


def scanpy_inmem(path):
return run_timed([PY, str(ROOT / "demo" / "_scanpy_inmem_pp.py"), str(path)])


def scanpy_dask(path, chunk):
args = [PY, str(ROOT / "demo" / "_scanpy_dask_pp.py"), str(path)]
if chunk:
args.append(str(chunk))
return run_timed(args)


def singlerust_ooc(path, chunk):
# Single fused pass (qc + normalize_total + log1p), reading source -> temp output. No 6 GB
# copy and one process, so wall time and peak RSS are a single clean /usr/bin/time -l measure.
out = path.with_suffix(".ooc_out.h5ad")
chunk_args = ["--chunk", str(chunk)] if chunk else []
t, r = run_timed([str(BIN), "preprocess", str(path), "--out", str(out),
"--target-sum", "10000.0", *chunk_args])
out.unlink(missing_ok=True)
return t, r


def main():
if len(sys.argv) < 2:
print("usage: python demo/bench_ooc.py <file.h5ad> [chunk_size]")
sys.exit(2)
path = pathlib.Path(sys.argv[1])
chunk = int(sys.argv[2]) if len(sys.argv) > 2 else None
if not BIN.exists():
raise SystemExit(f"build the CLI first: cargo build --release --example sr_ooc ({BIN} missing)")

import h5py
with h5py.File(path) as f:
shape = list(f["X"].attrs.get("shape", []))
print(f"dataset: {path.name} shape={shape} chunk={chunk or 'default'}\n")

s_t, s_rss = scanpy_inmem(path)
print(f"scanpy in-memory : {s_t:7.2f}s compute | peak RSS {s_rss:6.2f} GB")

if pathlib.Path(PY).exists():
try:
d_t, d_rss = scanpy_dask(path, chunk)
print(f"scanpy + Dask OOC: {d_t:7.2f}s compute | peak RSS {d_rss:6.2f} GB")
except Exception as e:
d_rss = None
print(f"scanpy + Dask OOC: FAILED ({e})")
else:
d_rss = None
print(f"scanpy + Dask OOC: skipped (no {PY}; create .venv-dask with anndata>=0.11 + dask)")

r_t, r_rss = singlerust_ooc(path, chunk)
print(f"SingleRust OOC : {r_t:7.2f}s wall | peak RSS {r_rss:6.2f} GB")

print()
if s_rss and r_rss:
print(f"memory: SingleRust OOC uses {s_rss / r_rss:.1f}× less peak RAM than scanpy in-memory")
if d_rss and r_rss:
print(f"memory: SingleRust OOC uses {d_rss / r_rss:.1f}× less peak RAM than scanpy+Dask "
f"(Dask's sparse path barely beats in-memory)")


if __name__ == "__main__":
main()
Loading