[dataloader] Decouple transform execution batch size from read batch_size#624
Open
ShreyeshArangath wants to merge 1 commit into
Open
[dataloader] Decouple transform execution batch size from read batch_size#624ShreyeshArangath wants to merge 1 commit into
ShreyeshArangath wants to merge 1 commit into
Conversation
b7ff33d to
6e6455c
Compare
…size linkedin#568 propagated the dataloader's read batch_size into datafusion.execution.batch_size. When a transform is active, a small batch_size then forces DataFusion to slice each transform input into batch_size-row pieces and invoke the (expensive) UDFs once per piece -- the per-batch overhead collapses throughput on large rows. This is the perf regression observed with batch_size=128 on embedding-sized rows (15s read_batch, starved prefetch queue, jittery s/step). - _create_transform_session now only RAISES execution.batch_size to honor a large requested batch_size (the case linkedin#568 fixed); it never LOWERS it below DataFusion's 8192 default by default. - Adds a tunable, engine-agnostic transform_batch_size parameter so callers can set the transform execution batch size explicitly (e.g. trade throughput for memory). - Adds regression + resolution + end-to-end wiring tests.
6e6455c to
79365a3
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Fixes a dataloader perf regression where a small read
batch_size(e.g. 128) makes table transforms run many times slower on large-row tables. Observed in training as 15s+read_batchstalls, a starved prefetch queue (queue=0/4), and jittery s/step (vs. a steady ~0.39 s/step without the override).batch_sizeno longer lowers DataFusion's internalexecution.batch_sizebelow its 8192 default; it is only ever raised to honor a large requested batch (the case [DataLoader] Fix for batch sizes greater than 8192 not being honored when a table transformer is used #568 fixed).transform_batch_sizeparam so callers can set the transform execution batch size explicitlyRoot cause
#568 propagated the dataloader's read
batch_sizeintodatafusion.execution.batch_size. That config does not gather rows up — DataFusion'sBatchSplitStreamslices every batch entering an operator down toexecution.batch_size, andProjectionExecinvokes the transform's UDFs once per batch. So:Setting it to 128 turns one UDF call over an 8192-row batch into 64 calls over 128-row slices. The per-row work is identical; you just pay the fixed per-batch / per-UDF-call overhead (Python<->Arrow boundary, allocation, poll cycle) 64x more.
Confirmed empirically by holding the input at 8192 rows and sweeping only
execution.batch_size:Time scales linearly with the number of slices. The "fast" pre-commit run was DataFusion's 8192 default; the "slow" run was 128.
Why a tunable instead of a hard clamp
A pure clamp silently overrides intent. Exposing
transform_batch_size(defaulting to the safe "never below 8192" behavior) fixes the regression out of the box and answers the original request to control the DataFusion execution batch size — without leaking the engine name into the public API.Changes
OpenHouseDataLoader/DataLoaderSplitgain an optionaltransform_batch_size._create_transform_sessionresolves the effectivedatafusion.execution.batch_sizevia new_resolve_execution_batch_size(batch_size, transform_batch_size): explicittransform_batch_sizewins; otherwise only raise above the 8192 default to honor a largebatch_size; never lower it.Testing Done
New
tests/test_transform_batch_size_regression.py: regression guard (fails without the fix at 64 UDF calls, passes at 1); parametrized_resolve_execution_batch_size;transform_batch_sizehonored / overrides default; end-to-end wiring throughDataLoaderSplit. Full suite green:make verify— ruff + mypy + 276 passed (includes #568's test).Additional Information
Backward compatible:
transform_batch_sizedefaults toNone. The only behavior change is that a readbatch_size<= 8192 no longer shrinks the transform execution batch (the regression being fixed).