Skip to content

[dataloader] Decouple transform execution batch size from read batch_size#624

Open
ShreyeshArangath wants to merge 1 commit into
linkedin:mainfrom
ShreyeshArangath:bug/df-batch-size
Open

[dataloader] Decouple transform execution batch size from read batch_size#624
ShreyeshArangath wants to merge 1 commit into
linkedin:mainfrom
ShreyeshArangath:bug/df-batch-size

Conversation

@ShreyeshArangath
Copy link
Copy Markdown
Collaborator

@ShreyeshArangath ShreyeshArangath commented May 30, 2026

Summary

Fixes a dataloader perf regression where a small read batch_size (e.g. 128) makes table transforms run many times slower on large-row tables. Observed in training as 15s+ read_batch stalls, a starved prefetch queue (queue=0/4), and jittery s/step (vs. a steady ~0.39 s/step without the override).

Root cause

#568 propagated the dataloader's read batch_size into datafusion.execution.batch_size. That config does not gather rows up — DataFusion's BatchSplitStream slices every batch entering an operator down to execution.batch_size, and ProjectionExec invokes the transform's UDFs once per batch. So:

# UDF invocations  :=  input_rows / execution.batch_size

Setting it to 128 turns one UDF call over an 8192-row batch into 64 calls over 128-row slices. The per-row work is identical; you just pay the fixed per-batch / per-UDF-call overhead (Python<->Arrow boundary, allocation, poll cycle) 64x more.

Confirmed empirically by holding the input at 8192 rows and sweeping only execution.batch_size:

execution.batch_size # batches = UDF calls identity UDF UDF w/ per-call cost
8192 1 0.115 ms 0.397 ms
1024 8 0.332 ms 2.511 ms
256 32 0.881 ms 9.709 ms
128 64 1.609 ms 19.399 ms
64 128 2.981 ms 38.178 ms

Time scales linearly with the number of slices. The "fast" pre-commit run was DataFusion's 8192 default; the "slow" run was 128.

Why a tunable instead of a hard clamp

A pure clamp silently overrides intent. Exposing transform_batch_size (defaulting to the safe "never below 8192" behavior) fixes the regression out of the box and answers the original request to control the DataFusion execution batch size — without leaking the engine name into the public API.

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

OpenHouseDataLoader / DataLoaderSplit gain an optional transform_batch_size. _create_transform_session resolves the effective datafusion.execution.batch_size via new _resolve_execution_batch_size(batch_size, transform_batch_size): explicit transform_batch_size wins; otherwise only raise above the 8192 default to honor a large batch_size; never lower it.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

New tests/test_transform_batch_size_regression.py: regression guard (fails without the fix at 64 UDF calls, passes at 1); parametrized _resolve_execution_batch_size; transform_batch_size honored / overrides default; end-to-end wiring through DataLoaderSplit. Full suite green: make verify — ruff + mypy + 276 passed (includes #568's test).

Additional Information

  • Breaking Changes
  • Deprecations
  • Large PR broken into smaller PRs, and PR plan linked in the description.

Backward compatible: transform_batch_size defaults to None. The only behavior change is that a read batch_size <= 8192 no longer shrinks the transform execution batch (the regression being fixed).

@ShreyeshArangath ShreyeshArangath changed the title [DataLoader] Decouple transform execution batch size from read batch_size [dataloader] Decouple transform execution batch size from read batch_size May 30, 2026
…size

linkedin#568 propagated the dataloader's read batch_size into datafusion.execution.batch_size.
When a transform is active, a small batch_size then forces DataFusion to slice each
transform input into batch_size-row pieces and invoke the (expensive) UDFs once per
piece -- the per-batch overhead collapses throughput on large rows. This is the perf
regression observed with batch_size=128 on embedding-sized rows (15s read_batch,
starved prefetch queue, jittery s/step).

- _create_transform_session now only RAISES execution.batch_size to honor a large
  requested batch_size (the case linkedin#568 fixed); it never LOWERS it below DataFusion's
  8192 default by default.
- Adds a tunable, engine-agnostic transform_batch_size parameter so callers can set
  the transform execution batch size explicitly (e.g. trade throughput for memory).
- Adds regression + resolution + end-to-end wiring tests.
@ShreyeshArangath ShreyeshArangath marked this pull request as ready for review May 30, 2026 18:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant