feat: Implement R Shiny-style concurrency model#2182
Conversation
Replace the global reactive lock with per-session locks and a cycle start action queue, enabling concurrent execution across sessions and within sessions. Key changes: - Remove global ReactiveEnvironment lock; add per-session _reactive_lock - Run effects concurrently via asyncio.gather() instead of sequentially - Add _cycle_start_action queue to defer input processing while effects are busy, keeping input values stable during execution - Deprecate reactive.lock() (returns no-op lock for backward compat) - Update invalidate_later to use per-session lock and cycle queue - Remove ExtendedTask's dependency on the global lock - Remove _request_flush() (was already a no-op) - Wrap Effect on_flush_cb in try/finally for busy count safety The concurrency model matches R Shiny: sessions are independent, effects run concurrently within a session, and input changes are queued until all effects complete. Dispatch messages (uploadEnd, makeRequest, etc.) run immediately without deferral, matching R Shiny's behavior. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
The apps in I considered other concurrency schemes besides the one we designed for Shiny for R, but in the end, I think those were a really nice set of tradeoffs so this PR ended up in a very similar place. |
Change _flush_concurrent() from batch-draining the priority queue into gather() to dequeuing one context at a time with create_task() + sleep(0) between each. This ensures a high-priority effect's synchronous portion runs before lower-priority effects are dequeued, allowing newly-invalidated effects to enter the queue in correct priority order—matching R Shiny's flushReact() behavior. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
shiny/session/_session.py
Outdated
| # TODO: Remove this call to reactive_flush() once https://github.com/posit-dev/py-shiny/issues/1889 is fixed | ||
| # Workaround: Any `on_flushed()` calls from bookmark's `on_restored()` will be flushed here | ||
| if self.bookmark.store != "disable": | ||
| await reactive_flush() |
There was a problem hiding this comment.
Can this be removed? You're doing the reactive flush every time
| if self._cycle_start_action_queue: | ||
| asyncio.create_task(self._start_cycle()) |
There was a problem hiding this comment.
This cadence is important. I'd like a helper method
Currently, py-shiny/tests/playwright/shiny/session/flush/test_on_flush.py Lines 11 to 16 in 299bdbe |
|
Please have claude get |
| per-session. This function returns a no-op lock for backward compatibility. | ||
| """ | ||
| return _reactive_environment.lock | ||
| warnings.warn( |
There was a problem hiding this comment.
Did a search of GitHub and found these Shiny apps that rely on reactive.lock.
- https://github.com/posit-dev/shiny-assistant/blob/cae561f9f94cfda0988f8da3a83411be0962b607/shinyapp/app.py#L311
- https://github.com/jcheng5/py-sidebot/blob/55d13900b8a7b50c327609830df4877080fad19d/app.py#L311
- https://github.com/parlance-labs/langfree/blob/6aee32f08b8b71b2af077249c1e9dc47f35e0b6d/langfree/shiny.py#L93
- https://github.com/wch/chatstream/blob/ef48867ec15ff5946552d1c887cc42763fced876/chatstream/__init__.py#L458
- https://github.com/amcrabtree/cell-sidebot/blob/3c58c7aa931e5f6da24008448c3deca543762720/app.py#L239
- https://github.com/viksrc/tickplot/blob/917427ae2c73e1fe903fb68cbfec545026440592/app.py#L277
- https://github.com/matheus-rech/shiny-test/blob/6292b9f5885a4a2eb559fabec2a3bda2dda08fa8/app.py#L260
- https://github.com/zyf0717/pi-pulse/blob/4a04697abc50a08dbcaef2f42ada9b09e5752568/server.py#L61
|
|
||
| async def _do_invalidate() -> None: | ||
| nonlocal cancellable | ||
| async with root_session._reactive_lock: |
There was a problem hiding this comment.
What if we add Session.reactive_lock() so callers don't have to do the walking up the scopes themselves? (The express stub session should no-op or throw.)
| ctx.invalidate() | ||
| await flush() | ||
|
|
||
| await root_session._cycle_start_action(_do_invalidate) |
There was a problem hiding this comment.
I don't like that this uses a private. I think the philosophy should be that 3rd parties could implement something just like invalidate_later themselves, without accessing any private members.
| if self._cycle_start_action_queue: | ||
| asyncio.create_task(self._start_cycle()) | ||
|
|
||
| async def _cycle_start_action( |
There was a problem hiding this comment.
This should be renamed to include a more active verb; as is, it sounds like "start" is the verb and it's not.
Conversation with Claude about possible redundancy between session._reactive_lock and cycle_start_action❯ Is it possible that the session._reactive_lock introduced by this PR, is entirely redundant with the _busy_count/_cycle_start_action_queue (or vice ⏺ Let me examine both mechanisms to understand their roles. ⏺ Explore(Explore reactive lock and busy count) ⏺ Great question. Let me verify the exact timing relationship between the lock and busy_count by reading the critical code paths. ⏺ Read 2 files (ctrl+o to expand) ⏺ Now let me check whether reactive_flush() runs all effects to completion inline (which would mean busy_count always returns to 0 before the lock is ⏺ Searched for 1 pattern, read 1 file (ctrl+o to expand) ⏺ Let me also check invalidate_later to see how it interacts. ⏺ Searched for 1 pattern, read 1 file (ctrl+o to expand) ⏺ Now I have a complete picture. Here's my analysis: Short answer: They overlap heavily, but neither is fully redundant with the other. The overlap For the common case — serializing input-update-triggered reactive flushes — they do the same thing. Here's why: reactive_flush() (specifically _flush_concurrent) runs all effects to completion (including cascading invalidations) via asyncio.gather() + recursive If your only concern were serializing input updates against each other, the lock alone would be sufficient. What the lock adds that the queue doesn't Dispatch messages (uploadEnd, makeRequest, etc.) bypass the queue entirely and go straight through the lock (line 722). Without the lock, a dispatch message What the queue adds that the lock doesn't
Could you collapse them into one? Theoretically yes, but each direction loses something:
Bottom line They both serialize reactive operations against a session, and for the happy path they're doing the same work. But the queue provides scheduling policy ✻ Brewed for 5m 12s ❯ Is it possible some of these different behaviors are bugs, not intentional? ⏺ Yes, absolutely. Let me reconsider each "difference" with fresh eyes: Dispatch blocking the websocket loop — Almost certainly a bug. Dispatches acquire the lock inline (line 722), which means conn.recv() can't be called again Dispatch "priority" over input updates — This isn't a designed priority system, it's just a side effect of dispatches waiting on the lock (in asyncio's FIFO HTTP requests deferring input updates — Almost certainly an unintended interaction. The _increment_busy_count in _handle_request (line 872) predates this PR invalidate_later going through the queue — Worth scrutinizing. In R Shiny, invalidateLater schedules a later() callback that fires at the next event-loop The overall picture: I think the honest answer to your original question is that the two mechanisms are largely doing the same job, and the behavioral If you wanted to clean this up, I think the question to answer is: which mechanism should be the primary serialization strategy, with the other either ✻ Cooked for 1m 17s ❯ Your claim about invalidateLater is incorrect--it does use cycleStartAction ⏺ You're right, I apologize for the incorrect claim. If R Shiny's invalidateLater also routes through cycleStartAction, then the Python implementation on line That narrows the list of likely-accidental behaviors to:
|
| self._task = None | ||
|
|
||
| async def _impl(): | ||
| async with lock(): |
There was a problem hiding this comment.
Make sure it's OK that we removed this. Ultrathink.
| self._has_run_session_ended_tasks = True | ||
|
|
||
| # Clear any queued cycle start actions and reset busy count | ||
| self._cycle_start_action_queue.clear() |
There was a problem hiding this comment.
Is this the only cleanup we need to do? Make sure we don't leak.
|
|
||
| # Clear any queued cycle start actions and reset busy count | ||
| self._cycle_start_action_queue.clear() | ||
| self._busy_count = 0 |
There was a problem hiding this comment.
Should this be set to 0? Why? Calls to increment/decrement might still be outstanding?
There was a problem hiding this comment.
It does affect whether _decrement_busy_count() calls _start_cycle, so proceed carefully.
There was a problem hiding this comment.
Better yet, _start_cycle could check if we're ended (just in case).
| next_invocation() | ||
| await flush() | ||
|
|
||
| asyncio.create_task(_impl()) |
There was a problem hiding this comment.
I think this might need to be
asyncio.create_task(session._cycle_start_action(_impl()))
I think that's the equivalent to async with reactive.lock() now??
There was a problem hiding this comment.
Or session._reactive_lock(), feels like one or the other.
There was a problem hiding this comment.
Maybe... reactive.lock() should be deprecated but ALSO call get_current_session()._reactive_lock()???
| if message_obj["method"] == "init": | ||
| if message_obj["method"] == "init": | ||
| # Init must run immediately (session setup) | ||
| async with self._reactive_lock: |
There was a problem hiding this comment.
The update method does BOTH self._reactive_lock and _cycle_start_action; should we change this to be consistent?? Or do we just know it doesn't matter because the session is just starting??
| # handling for things like ExtendedTask invocations and | ||
| # data frame cell edits via makeRequest. | ||
| message_other = typing.cast(ClientMessageOther, message_obj) | ||
| async with self._reactive_lock: |
There was a problem hiding this comment.
Are we sure this needs a reactive lock (and/or cycle_start_action)?
| raise ProtocolError( | ||
| f"Unrecognized method {message_obj['method']}" | ||
| ) | ||
|
|
There was a problem hiding this comment.
Question unrelated to this line of code: do download handlers also need to be locked?
There was a problem hiding this comment.
It ended up being a bit more complicated than this; it's now locked for the handler's synchronous execution, but if it returns a generator, the lock is released between iterations. We'll need to document this.
| ctx.invalidate() | ||
| await flush() | ||
|
|
||
| await root_session._cycle_start_action(_do_invalidate) |
There was a problem hiding this comment.
IMPORTANT: As written, invalidate_later has no higher priority than inputs. Claude claims that in Shiny for R, invalidate_later is not blocked by the start cycle, so effectively it's higher priority than inputs.
Wrap the download handler in `async with self._reactive_lock` to ensure downloads don't run concurrently with reactive flushes. The streaming iterators also independently acquire the lock since they execute lazily after the initial handler returns. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Document that the reactive lock is acquired/released/reacquired per chunk during streaming downloads, and add a TODO suggesting we consider only holding the lock through the first chunk, since subsequent chunks are typically pure I/O. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| raise ProtocolError( | ||
| f"Unrecognized method {message_obj['method']}" | ||
| ) | ||
|
|
There was a problem hiding this comment.
It ended up being a bit more complicated than this; it's now locked for the handler's synchronous execution, but if it returns a generator, the lock is released between iterations. We'll need to document this.
| raise ProtocolError( | ||
| f"Unrecognized method {message_obj['method']}" | ||
| ) | ||
|
|
There was a problem hiding this comment.
Also related to download handlers: the browser doesn't prompt for a download destination until the iterator is finished, I found that surprising.
Add OutBoundMessageQueues.is_empty() and use it to avoid sending empty messages to the client during flush. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| # TODO: Consider whether we need to hold the reactive lock for | ||
| # the entire streaming iteration, or only for the first chunk. | ||
| # The common pattern is: do some reactive/async computation | ||
| # before the first yield, then stream pure I/O after that. If | ||
| # so, we could acquire the lock once, yield the first chunk, | ||
| # then release it and stream the rest without the lock. This | ||
| # would reduce contention for long-running downloads. |
Avoids potential stack overflow from deep recursion when many async effects repeatedly invalidate new effects. Semantics are unchanged. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Joe Cheng <joe@posit.co>
…nto concurrency
| """ | ||
| return _reactive_environment.lock | ||
| warnings.warn( | ||
| "reactive.lock() is deprecated. The reactive lock is now per-session. " |
There was a problem hiding this comment.
From IRL convo, maybe this should also perform a real session lock as that is what the user is expecting.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Update the asyncness example apps for R and Python to better demonstrate concurrency. Changes include: update UI copy to explicitly mention "Shiny for R/Python app", speed up the fast clock refresh (R: invalidateLater 1000 -> 100 ms; Python: invalidate_later 1 -> 0.1 s), and replace the single slow task with three indexed slow tasks (R: create three observeEvent handlers via lapply; Python: add make_effect and loop to create three reactive effects). Also comment out a redundant fast observer in the R example. These changes make it easier to observe async behavior when the app is opened in multiple tabs.
| "inputMessages": omq.input_messages, | ||
| "errors": omq.errors, | ||
| } | ||
| if not omq.is_empty(): |
There was a problem hiding this comment.
This is NOT correct, IIRC sometimes empty messages need to be sent to clear out "recalculating" state. (I think req with cancel_output is one?)
There was a problem hiding this comment.
We actually already know which outputs need to clear out their recalculating state; they're tracked using the {"recalculating":{"name":"foo","status":"recalculated"}} message, but the CSS changes are not actually carried out until the next {"errors":{},"values":{},"inputMessages":[]} style message is received. We have {"busy":"idle"} which could be a more reliable trigger. It's currently being sent before the e/v/iM, so maybe it would be weird for the CSS to clear before the outputs are rendered; we could swap the order perhaps.
Summary
_reactive_lockso sessions run independentlyasyncio.gather()instead of sequentiallycycleStartAction/busyCountpattern)reactive.lock(): Returns a no-op lock with deprecation warning for backward compatibility_request_flush(): Was already a no-op; py-shiny's per-session message loops don't need a global flush registryDetailed implementation notes and comparison against R Shiny source are in
plans/CONCURRENCY_IMPLEMENTATION_NOTES.md.Test plan
examples/asyncness/)🤖 Generated with Claude Code