Skip to content

feat: Implement R Shiny-style concurrency model#2182

Draft
jcheng5 wants to merge 17 commits intomainfrom
concurrency
Draft

feat: Implement R Shiny-style concurrency model#2182
jcheng5 wants to merge 17 commits intomainfrom
concurrency

Conversation

@jcheng5
Copy link
Collaborator

@jcheng5 jcheng5 commented Feb 24, 2026

Summary

  • Remove global reactive lock: Replace with per-session _reactive_lock so sessions run independently
  • Concurrent effect execution: Effects within a session now run via asyncio.gather() instead of sequentially
  • Cycle start action queue: Defer input processing while effects are busy, keeping input values stable (R Shiny-style cycleStartAction / busyCount pattern)
  • Deprecate reactive.lock(): Returns a no-op lock with deprecation warning for backward compatibility
  • Remove _request_flush(): Was already a no-op; py-shiny's per-session message loops don't need a global flush registry

Detailed implementation notes and comparison against R Shiny source are in plans/CONCURRENCY_IMPLEMENTATION_NOTES.md.

Test plan

  • All 392 unit tests pass
  • Pyright type checking passes with 0 errors
  • Formatting passes
  • Playwright end-to-end tests
  • Manual testing with example apps (including examples/asyncness/)

🤖 Generated with Claude Code

Replace the global reactive lock with per-session locks and a cycle start
action queue, enabling concurrent execution across sessions and within
sessions.

Key changes:
- Remove global ReactiveEnvironment lock; add per-session _reactive_lock
- Run effects concurrently via asyncio.gather() instead of sequentially
- Add _cycle_start_action queue to defer input processing while effects
  are busy, keeping input values stable during execution
- Deprecate reactive.lock() (returns no-op lock for backward compat)
- Update invalidate_later to use per-session lock and cycle queue
- Remove ExtendedTask's dependency on the global lock
- Remove _request_flush() (was already a no-op)
- Wrap Effect on_flush_cb in try/finally for busy count safety

The concurrency model matches R Shiny: sessions are independent, effects
run concurrently within a session, and input changes are queued until all
effects complete. Dispatch messages (uploadEnd, makeRequest, etc.) run
immediately without deferral, matching R Shiny's behavior.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@jcheng5
Copy link
Collaborator Author

jcheng5 commented Feb 24, 2026

The apps in examples/asyncness and the plans/ documents don't have to stay, but I wanted them available for the PR review.

I considered other concurrency schemes besides the one we designed for Shiny for R, but in the end, I think those were a really nice set of tradeoffs so this PR ended up in a very similar place.

Change _flush_concurrent() from batch-draining the priority queue into
gather() to dequeuing one context at a time with create_task() + sleep(0)
between each. This ensures a high-priority effect's synchronous portion
runs before lower-priority effects are dequeued, allowing newly-invalidated
effects to enter the queue in correct priority order—matching R Shiny's
flushReact() behavior.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Comment on lines 692 to 695
# TODO: Remove this call to reactive_flush() once https://github.com/posit-dev/py-shiny/issues/1889 is fixed
# Workaround: Any `on_flushed()` calls from bookmark's `on_restored()` will be flushed here
if self.bookmark.store != "disable":
await reactive_flush()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be removed? You're doing the reactive flush every time

Comment on lines +1122 to +1123
if self._cycle_start_action_queue:
asyncio.create_task(self._start_cycle())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cadence is important. I'd like a helper method

@schloerke
Copy link
Collaborator

Test plan

  • Playwright end-to-end tests

Currently,

controller.OutputTextVerbatim(page, "all_txt").expect_value(
"('a-3-flush', 'bx-3-first-flush', 'by-3-first-flush', 'bx-3-second-flush', "
"'by-3-second-flush', 'c-3-flush', 'a-3-flushed', 'bx-3-first-flushed', "
"'by-3-first-flushed', 'bx-3-second-flushed', 'by-3-second-flushed', "
"'c-3-flushed')"
)
is failing

@schloerke
Copy link
Collaborator

Please have claude get uv run make check-fix to run successfully

per-session. This function returns a no-op lock for backward compatibility.
"""
return _reactive_environment.lock
warnings.warn(

async def _do_invalidate() -> None:
nonlocal cancellable
async with root_session._reactive_lock:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we add Session.reactive_lock() so callers don't have to do the walking up the scopes themselves? (The express stub session should no-op or throw.)

ctx.invalidate()
await flush()

await root_session._cycle_start_action(_do_invalidate)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that this uses a private. I think the philosophy should be that 3rd parties could implement something just like invalidate_later themselves, without accessing any private members.

if self._cycle_start_action_queue:
asyncio.create_task(self._start_cycle())

async def _cycle_start_action(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be renamed to include a more active verb; as is, it sounds like "start" is the verb and it's not.

@jcheng5
Copy link
Collaborator Author

jcheng5 commented Mar 3, 2026

Conversation with Claude about possible redundancy between session._reactive_lock and cycle_start_action

❯ Is it possible that the session._reactive_lock introduced by this PR, is entirely redundant with the _busy_count/_cycle_start_action_queue (or vice
versa)? Is it possible that both play exactly one role, which is to serialize actions against a single session?

⏺ Let me examine both mechanisms to understand their roles.

⏺ Explore(Explore reactive lock and busy count)
⎿  Done (25 tool uses · 56.0k tokens · 1m 28s)
(ctrl+o to expand)

⏺ Great question. Let me verify the exact timing relationship between the lock and busy_count by reading the critical code paths.

⏺ Read 2 files (ctrl+o to expand)

⏺ Now let me check whether reactive_flush() runs all effects to completion inline (which would mean busy_count always returns to 0 before the lock is
released).

⏺ Searched for 1 pattern, read 1 file (ctrl+o to expand)

⏺ Let me also check invalidate_later to see how it interacts.

⏺ Searched for 1 pattern, read 1 file (ctrl+o to expand)

⏺ Now I have a complete picture. Here's my analysis:

Short answer: They overlap heavily, but neither is fully redundant with the other.

The overlap

For the common case — serializing input-update-triggered reactive flushes — they do the same thing. Here's why:

reactive_flush() (specifically _flush_concurrent) runs all effects to completion (including cascading invalidations) via asyncio.gather() + recursive
re-flush before returning. Since the lock is held for the entire manage_inputs → flush cycle, busy_count is always back to 0 by the time the lock is
released. So the queue's "wait until effects settle" behavior is identical to the lock's "wait until the lock is free" behavior.

If your only concern were serializing input updates against each other, the lock alone would be sufficient.

What the lock adds that the queue doesn't

Dispatch messages (uploadEnd, makeRequest, etc.) bypass the queue entirely and go straight through the lock (line 722). Without the lock, a dispatch message
could interleave with a queued input update at any await point during reactive_flush().

What the queue adds that the lock doesn't

  1. Dispatch priority over input updates. Dispatches acquire the lock directly, so they can jump ahead of queued input updates. Without the queue
    (lock-only), everything would be FIFO in the lock's waiter list. This is the R Shiny behavior: dispatches (like ExtendedTask invocations) run promptly,
    while input updates wait for the current cycle to finish.
  2. Non-blocking websocket loop. Input updates return immediately after appending to the queue (_cycle_start_action doesn't await the lock). Without the
    queue, the websocket loop would block on the lock for each input update, preventing it from receiving further messages until the current flush completes.
  3. Deferred inputs during HTTP requests. _handle_request (line 872) increments busy_count without holding the lock. So during a file upload, the queue
    defers input updates even though the lock is free. Without the queue, input updates would sail through the lock unimpeded during an upload.
  4. Client busy/idle signaling. The busy_count drives the {"busy": "busy"}/{"busy": "idle"} messages to the client. This is orthogonal to serialization but
    coupled to the mechanism.

Could you collapse them into one?

Theoretically yes, but each direction loses something:

  • Queue only (drop the lock): You'd need to route dispatches through the queue too, losing their prompt-execution semantics. Or you'd need a different
    serialization mechanism for dispatches.
  • Lock only (drop the queue): You'd lose dispatch priority, websocket loop responsiveness, the HTTP-request deferral, and client busy/idle signaling. The
    first two are the most material.

Bottom line

They both serialize reactive operations against a session, and for the happy path they're doing the same work. But the queue provides scheduling policy
(what order things run, what gets deferred) while the lock provides mutual exclusion (nothing runs concurrently). Those are complementary concerns that
happen to produce the same outcome in the simple case.

✻ Brewed for 5m 12s

❯ Is it possible some of these different behaviors are bugs, not intentional?

⏺ Yes, absolutely. Let me reconsider each "difference" with fresh eyes:

Dispatch blocking the websocket loop — Almost certainly a bug. Dispatches acquire the lock inline (line 722), which means conn.recv() can't be called again
until the lock is available and the dispatch+flush completes. If a long-running flush is in progress, the websocket loop is frozen. In R Shiny this isn't an
issue because the message-receive path is never blocked—R's single-threaded event loop processes messages that are already received and queued.

Dispatch "priority" over input updates — This isn't a designed priority system, it's just a side effect of dispatches waiting on the lock (in asyncio's FIFO
waiter list) while input updates wait in the separate action queue. The ordering falls out accidentally from using two different queueing mechanisms. In R
Shiny, dispatches run "immediately" only in the sense that they're not routed through startCycle—but there's no preemption story because everything is
naturally serialized.

HTTP requests deferring input updates — Almost certainly an unintended interaction. The _increment_busy_count in _handle_request (line 872) predates this PR
and existed purely for client busy/idle signaling. Now that busy_count also gates the action queue, a file upload inadvertently blocks all input
processing. In R Shiny, file uploads don't defer input updates this way.

invalidate_later going through the queue — Worth scrutinizing. In R Shiny, invalidateLater schedules a later() callback that fires at the next event-loop
iteration. It does not go through startCycle. In the Python code (line 435), it goes through _cycle_start_action, which means it queues behind any pending
input updates. If three input updates are queued and then an invalidate_later timer fires, the invalidation waits for all three to process first. That's
different from R, where the later() callback would fire at the top of the event loop.

The overall picture: I think the honest answer to your original question is that the two mechanisms are largely doing the same job, and the behavioral
differences between them aren't intentional design choices but emergent consequences of having two overlapping serialization systems. The lock was likely
added for safety ("we're in async Python, we need mutual exclusion") and the queue was added for R Shiny semantics ("we need startCycle/busy_count"),
without fully reckoning with the fact that they interact in subtle, untested ways.

If you wanted to clean this up, I think the question to answer is: which mechanism should be the primary serialization strategy, with the other either
removed or demoted to a narrow supporting role?

✻ Cooked for 1m 17s

❯ Your claim about invalidateLater is incorrect--it does use cycleStartAction

⏺ You're right, I apologize for the incorrect claim. If R Shiny's invalidateLater also routes through cycleStartAction, then the Python implementation on line
435 is faithful to R's behavior and that one is correct.

That narrows the list of likely-accidental behaviors to:

  1. Dispatch blocking the websocket receive loop — Still suspect. The async with self._reactive_lock on line 722 freezes the message loop until the lock is
    free and the dispatch+flush completes.
  2. HTTP requests deferring input updates — Still suspect. _handle_request bumping busy_count now has a side effect it didn't used to have.
  3. Dispatch "priority" — Still just a side effect of two different queueing mechanisms, not a designed ordering guarantee.

self._task = None

async def _impl():
async with lock():
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure it's OK that we removed this. Ultrathink.

self._has_run_session_ended_tasks = True

# Clear any queued cycle start actions and reset busy count
self._cycle_start_action_queue.clear()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the only cleanup we need to do? Make sure we don't leak.


# Clear any queued cycle start actions and reset busy count
self._cycle_start_action_queue.clear()
self._busy_count = 0
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be set to 0? Why? Calls to increment/decrement might still be outstanding?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does affect whether _decrement_busy_count() calls _start_cycle, so proceed carefully.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better yet, _start_cycle could check if we're ended (just in case).

next_invocation()
await flush()

asyncio.create_task(_impl())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might need to be

        asyncio.create_task(session._cycle_start_action(_impl()))

I think that's the equivalent to async with reactive.lock() now??

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or session._reactive_lock(), feels like one or the other.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe... reactive.lock() should be deprecated but ALSO call get_current_session()._reactive_lock()???

if message_obj["method"] == "init":
if message_obj["method"] == "init":
# Init must run immediately (session setup)
async with self._reactive_lock:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The update method does BOTH self._reactive_lock and _cycle_start_action; should we change this to be consistent?? Or do we just know it doesn't matter because the session is just starting??

# handling for things like ExtendedTask invocations and
# data frame cell edits via makeRequest.
message_other = typing.cast(ClientMessageOther, message_obj)
async with self._reactive_lock:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this needs a reactive lock (and/or cycle_start_action)?

raise ProtocolError(
f"Unrecognized method {message_obj['method']}"
)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question unrelated to this line of code: do download handlers also need to be locked?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Barret says: yes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It ended up being a bit more complicated than this; it's now locked for the handler's synchronous execution, but if it returns a generator, the lock is released between iterations. We'll need to document this.

ctx.invalidate()
await flush()

await root_session._cycle_start_action(_do_invalidate)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMPORTANT: As written, invalidate_later has no higher priority than inputs. Claude claims that in Shiny for R, invalidate_later is not blocked by the start cycle, so effectively it's higher priority than inputs.

jcheng5 and others added 2 commits March 6, 2026 11:35
Wrap the download handler in `async with self._reactive_lock` to ensure
downloads don't run concurrently with reactive flushes. The streaming
iterators also independently acquire the lock since they execute lazily
after the initial handler returns.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Document that the reactive lock is acquired/released/reacquired per
chunk during streaming downloads, and add a TODO suggesting we consider
only holding the lock through the first chunk, since subsequent chunks
are typically pure I/O.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
raise ProtocolError(
f"Unrecognized method {message_obj['method']}"
)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It ended up being a bit more complicated than this; it's now locked for the handler's synchronous execution, but if it returns a generator, the lock is released between iterations. We'll need to document this.

raise ProtocolError(
f"Unrecognized method {message_obj['method']}"
)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also related to download handlers: the browser doesn't prompt for a download destination until the iterator is finished, I found that surprising.

schloerke and others added 2 commits March 6, 2026 16:30
Add OutBoundMessageQueues.is_empty() and use it to avoid sending
empty messages to the client during flush.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Comment on lines +955 to +961
# TODO: Consider whether we need to hold the reactive lock for
# the entire streaming iteration, or only for the first chunk.
# The common pattern is: do some reactive/async computation
# before the first yield, then stream pure I/O after that. If
# so, we could acquire the lock once, yield the first chunk,
# then release it and stream the rest without the lock. This
# would reduce contention for long-running downloads.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be addressed

schloerke and others added 5 commits March 6, 2026 16:41
Avoids potential stack overflow from deep recursion when many async
effects repeatedly invalidate new effects. Semantics are unchanged.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Joe Cheng <joe@posit.co>
"""
return _reactive_environment.lock
warnings.warn(
"reactive.lock() is deprecated. The reactive lock is now per-session. "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From IRL convo, maybe this should also perform a real session lock as that is what the user is expecting.

schloerke and others added 6 commits March 6, 2026 17:52
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Update the asyncness example apps for R and Python to better demonstrate concurrency. Changes include: update UI copy to explicitly mention "Shiny for R/Python app", speed up the fast clock refresh (R: invalidateLater 1000 -> 100 ms; Python: invalidate_later 1 -> 0.1 s), and replace the single slow task with three indexed slow tasks (R: create three observeEvent handlers via lapply; Python: add make_effect and loop to create three reactive effects). Also comment out a redundant fast observer in the R example. These changes make it easier to observe async behavior when the app is opened in multiple tabs.
"inputMessages": omq.input_messages,
"errors": omq.errors,
}
if not omq.is_empty():
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is NOT correct, IIRC sometimes empty messages need to be sent to clear out "recalculating" state. (I think req with cancel_output is one?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually already know which outputs need to clear out their recalculating state; they're tracked using the {"recalculating":{"name":"foo","status":"recalculated"}} message, but the CSS changes are not actually carried out until the next {"errors":{},"values":{},"inputMessages":[]} style message is received. We have {"busy":"idle"} which could be a more reliable trigger. It's currently being sent before the e/v/iM, so maybe it would be weird for the CSS to clear before the outputs are rendered; we could swap the order perhaps.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants