Skip to content

Implement streaming window functions in cudf-polars#22191

Open
Matt711 wants to merge 65 commits intorapidsai:mainfrom
Matt711:fea/polars/streaming-over
Open

Implement streaming window functions in cudf-polars#22191
Matt711 wants to merge 65 commits intorapidsai:mainfrom
Matt711:fea/polars/streaming-over

Conversation

@Matt711
Copy link
Copy Markdown
Contributor

@Matt711 Matt711 commented Apr 17, 2026

Description

over() is, at its heart, a grouped aggregation followed by a broadcast back to the shape of the input. For each group g defined by the partition-by keys, evaluate the expression, then map the result back to every row that belongs to g.

import polars as pl

df = pl.LazyFrame(
    {
        "g": [1, 1, 2, 2, 2, 1],
        "x": [1, 2, 3, 4, 5, 6],
        "g2": ["a", "b", "a", "b", "a", "b"],
        "g_null": [1, None, 1, None, 2, 1],
        "s": [6, 5, 4, 3, 2, 1],
    }
)
print(df.select(pl.col("x").sum().over("g_null")).collect())
shape: (6, 1)
┌─────┐
│ x   │
│ --- │
│ i64 │
╞═════╡
│ 10  │
│ 6   │
│ 10  │
│ 6   │
│ 5   │
│ 10  │
└─────┘

Polars represents this with a WindowMapping enum. This PR adds support for the group_to_rows mapping in the RapidsMPF streaming executor (the variant where the output has the same number of rows as the input and each row receives the value computed for its group). The entry point is a new over_actor that selects one of three execution strategies at runtime based on the incoming channel metadata and expression shape.

The over_actor: three strategies

1. Chunkwise (already partitioned)

If the incoming channel metadata shows the data is already hash-partitioned on the over() keys (or any prefix of them; being partitioned on ('a',) is sufficient for over('a', 'b'), since every group is contained within one rank), the window function is trivially correct on each chunk in isolation. We evaluate chunkwise with no coordination at all.

2. Scalar aggregations: AllGather + broadcast

When every GroupedWindow in the expression is a scalar aggregation (sum, mean, count, etc.), we exploit the fact that these are decomposable: each worker computes partial aggregates chunkwise, an AllGather collects all workers' partial results, a single reduction produces the global aggregate per group, and then each original chunk has those results broadcast back into its row positions via a hash join on the partition keys.

3. Non-scalar aggregations: forward-shuffle + return-shuffle

Functions like rank are not decomposable; they require every row in the group to be visible at once. We hash-shuffle by the partition keys so that all rows belonging to group g land in the same rank for evaluation. The challenge is then twofold: putting rows back in the right order, and getting them back to the rank that owns the corresponding output chunk in the first place. Output channels are rank-local, so only the rank that received an input chunk is wired up to emit it, and the hash shuffle scatters rows by group with no regard for where they originated. We need an explicit return trip.

Preserving full order

A lot of the implementation exists purely to put output chunks back in the same sequence-number order as the input. Getting this right across both strategies is where most of the complexity lives.

Scalar aggregation path. We can't produce any output until the global aggregate is known, so we buffer incoming chunks while simultaneously computing partial aggregates over them. Once the AllGather + final reduction completes, we iterate over the buffer and evaluate each chunk against the global aggregate, emitting results with their original sequence numbers. Order preservation falls out naturally: the buffer is in receive order and we never reorder it.

Non-scalar shuffle path. Each row is stamped with three pieces of origin metadata before it enters the forward shuffle: an origin_rank (which rank ingested it), a chunk_index (a rank-local 0-based counter, not the upstream message sequence number, which can collide when the input is the output of a prior shuffle), and a position within that input chunk. After the forward shuffle, each rank holds a mix of rows from every origin, but each row knows where it came from. We evaluate the window function on each local forward partition (so rank sees every row in the group), then route the results through a return shuffle keyed on origin_rank. The return shuffle uses num_partitions = nranks and PartitionAssignment.CONTIGUOUS, so partition i lives on rank i, and every row goes back to the rank that originally received it. Each rank then sorts the returned rows by (chunk_index, position), splits at chunk-index transitions, drops the stamp columns, and emits one output chunk per input chunk in input order.

To avoid buffering every input chunk just to size the forward shuffle, the actor samples a small number of chunks up front (_choose_modulus), AllGathers a size estimate, picks the modulus, and then replays the sampled chunks back through a fresh channel via replay_buffered_channel. The forward-insert phase reads from that replay channel and streams rows into the shuffle as they arrive, never holding more than the shuffle's own internal buffering.

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

@Matt711 Matt711 requested a review from a team as a code owner April 17, 2026 14:10
@Matt711 Matt711 requested review from galipremsagar and vyasr April 17, 2026 14:10
@Matt711 Matt711 added feature request New feature or request non-breaking Non-breaking change labels Apr 17, 2026
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented Apr 17, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@github-actions github-actions Bot added Python Affects Python cuDF API. cudf-polars Issues specific to cudf-polars labels Apr 17, 2026
@GPUtester GPUtester moved this to In Progress in cuDF Python Apr 17, 2026
Copy link
Copy Markdown
Contributor Author

@Matt711 Matt711 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Guide: I recommend reading the PR description first, then looking over the tests. Then looking at the three execution strategies (and the corresponding tests). Finally, the full-order preservation logic.

If you think we should split up the PR, that's okay. Additionally if you think logic should be shared (especially in the scalar aggs - groupby case), we can discuss what specifically in your review. I abstracted some logic into a helper function like _make_hash_shuffle_metadata but in general I avoided it (in the groupby case) because it made it more difficult to understand IMO.

[False, True],
ids=["same_rank", "cross_rank"],
)
def test_over_multirank(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested this using rrun

Details
(rapids) coder ➜ ~/cudf $ rrun -n 2 python -m pytest python/cudf_polars/tests/experimental/test_spmd.py::test_over_multirank -x -v

[rrun] All ranks launched. Waiting for completion...

============================= test session starts ==============================
platform linux -- Python 3.14.4, pytest-9.0.3, pluggy-1.6.0 -- /home/coder/.conda/envs/rapids/bin/python
============================= test session starts ==============================
platform linux -- Python 3.14.4, pytest-9.0.3, pluggy-1.6.0 -- /home/coder/.conda/envs/rapids/bin/python
cachedir: .pytest_cache
hypothesis profile 'default'
benchmark: 5.2.3 (defaults: timer=time.perf_counter disable_gc=False min_rounds=5 min_time=0.000005 max_time=1.0 calibration_precision=10 warmup=False warmup_iterations=100000)
rootdir: /home/coder/cudf/python/cudf_polars
configfile: pyproject.toml
plugins: cases-3.10.1, anyio-4.13.0, hypothesis-6.151.13, cov-7.1.0, xdist-3.8.0, benchmark-5.2.3, pytest_httpserver-1.1.5, rerunfailures-16.1
cachedir: .pytest_cache
hypothesis profile 'default'
benchmark: 5.2.3 (defaults: timer=time.perf_counter disable_gc=False min_rounds=5 min_time=0.000005 max_time=1.0 calibration_precision=10 warmup=False warmup_iterations=100000)
rootdir: /home/coder/cudf/python/cudf_polars
configfile: pyproject.toml
plugins: cases-3.10.1, anyio-4.13.0, hypothesis-6.151.13, cov-7.1.0, xdist-3.8.0, benchmark-5.2.3, pytest_httpserver-1.1.5, rerunfailures-16.1
collecting ... collected 4 items
collecting ... collected 4 items


python/cudf_polars/tests/experimental/test_spmd.py::test_over_multirank[same_rank-scalar_sum] PASSED [ 25%]
python/cudf_polars/tests/experimental/test_spmd.py::test_over_multirank[same_rank-scalar_sum] PASSED [ 25%]
python/cudf_polars/tests/experimental/test_spmd.py::test_over_multirank[same_rank-nonscalar_rank] PASSED [ 50%]
python/cudf_polars/tests/experimental/test_spmd.py::test_over_multirank[same_rank-nonscalar_rank] PASSED [ 50%]
python/cudf_polars/tests/experimental/test_spmd.py::test_over_multirank[cross_rank-scalar_sum] PASSED [ 75%]
python/cudf_polars/tests/experimental/test_spmd.py::test_over_multirank[cross_rank-scalar_sum] PASSED [ 75%]
python/cudf_polars/tests/experimental/test_spmd.py::test_over_multirank[cross_rank-nonscalar_rank] XFAIL [100%]
python/cudf_polars/tests/experimental/test_spmd.py::test_over_multirank[cross_rank-nonscalar_rank] XFAIL [100%]


========================= 3 passed, 1 xfailed in 3.45s =========================
========================= 3 passed, 1 xfailed in 3.46s =========================

@Matt711 Matt711 force-pushed the fea/polars/streaming-over branch from 594810d to 696bbc9 Compare April 17, 2026 14:23
@Matt711 Matt711 changed the title Implement streaming window functions in cudf-polars [WIP] Implement streaming window functions in cudf-polars Apr 17, 2026
@Matt711
Copy link
Copy Markdown
Contributor Author

Matt711 commented Apr 17, 2026

Moved this to WIP while I work throught the rapidsmpf test failures: https://github.com/rapidsai/cudf/actions/runs/24570156132/job/71845610400?pr=22191

@Matt711 Matt711 requested a review from a team as a code owner April 17, 2026 17:03
@Matt711 Matt711 removed the request for review from galipremsagar April 17, 2026 17:04
@Matt711
Copy link
Copy Markdown
Contributor Author

Matt711 commented Apr 17, 2026

/ok to test 0d6d6b1

@Matt711 Matt711 force-pushed the fea/polars/streaming-over branch from 9d48123 to bc584d7 Compare May 6, 2026 20:54
@Matt711
Copy link
Copy Markdown
Contributor Author

Matt711 commented May 6, 2026

/ok to test bc584d7

galipremsagar pushed a commit to galipremsagar/cudf that referenced this pull request May 6, 2026
- This is a follow-up to rapidsai#21796
- This (hopefully) simplifies some code in rapidsai#22191

**Problem statement**: We currently translate `HStack` nodes with non-pointwise expressions to the equivalent `Select` node at lowering time. This is because all our non-pointwise `Expr`-decomposition logic is specific to `Select`. Before this PR, this translation was skipped whenever the underlying `HStack` was completely overwriting it's original columns. The problem with this case is that we loose "anchor" columns that tell the `Select` how to broadcast scalar-aggregation results.

**Proposed solution**: We add a temporary "anchor" column to the translated `HStack` so that broadcasting works correctly in the `Select` node.

**Motivation**:
- We can handle all `over()` expression decomposition within `Select` if we know **all** non-pointwise HStack operations are lowered to `Select` anyway.
- We don't "fall back" for other non-`over` `HStack` corner cases either.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Matthew Murray (https://github.com/Matt711)

URL: rapidsai#22353
@Matt711
Copy link
Copy Markdown
Contributor Author

Matt711 commented May 6, 2026

/ok to test 338757a

@Matt711 Matt711 requested a review from rjzamora May 7, 2026 01:56
@Matt711
Copy link
Copy Markdown
Contributor Author

Matt711 commented May 7, 2026

/ok to test 5b25eea

@Matt711 Matt711 requested a review from wence- May 7, 2026 03:51
Copy link
Copy Markdown
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A first go. I think there is opportunity to find some more abstraction here, since it seems we are recreating many concepts sui generis for this particular implementation. I did not get to the shuffle-based implementation yet.

I think a useful signpost would be for a module-level docstring that describes the algorithmic aspects of what is going on, without reaching into the implementation details.

Comment thread python/cudf_polars/cudf_polars/experimental/rapidsmpf/collectives/common.py Outdated
Comment thread python/cudf_polars/cudf_polars/experimental/rapidsmpf/core.py Outdated
Comment thread python/cudf_polars/cudf_polars/experimental/rapidsmpf/tracing.py Outdated
Comment thread python/cudf_polars/cudf_polars/experimental/rapidsmpf/utils.py
Comment thread python/cudf_polars/cudf_polars/experimental/over.py Outdated
Comment on lines +180 to +181
@dataclass(frozen=True)
class OriginStamps:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all of this is sui generis sort_by_key with the key being a tuple of (rank, chunk_index, position). Can we reuse the infrastructure for sort to do that for us?

I guess that doesn't necessarily give you everything with a given rank as its key on the input rank?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the sort infrastructure doesn't give us "everything with a given rank as its key on the input rank"

Comment thread python/cudf_polars/cudf_polars/experimental/rapidsmpf/over.py Outdated
Comment thread python/cudf_polars/cudf_polars/experimental/rapidsmpf/over.py Outdated
Comment thread python/cudf_polars/cudf_polars/experimental/rapidsmpf/over.py Outdated
Comment thread python/cudf_polars/cudf_polars/experimental/rapidsmpf/over.py Outdated
@Matt711 Matt711 requested a review from wence- May 7, 2026 15:22
@Matt711 Matt711 force-pushed the fea/polars/streaming-over branch from 9cb0834 to 397d50f Compare May 7, 2026 15:44
@Matt711
Copy link
Copy Markdown
Contributor Author

Matt711 commented May 7, 2026

/ok to test f7687f1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cudf-polars Issues specific to cudf-polars feature request New feature or request non-breaking Non-breaking change Python Affects Python cuDF API.

Projects

Status: In Progress

Development

Successfully merging this pull request may close these issues.

[FEA] Support .over() in streaming cuDF-Polars

4 participants