Skip to content

SSE executor worker #56

Merged
mvaled merged 31 commits into
masterfrom
sse-executor-worker
May 25, 2026
Merged

SSE executor worker #56
mvaled merged 31 commits into
masterfrom
sse-executor-worker

Conversation

@mvaled
Copy link
Copy Markdown
Collaborator

@mvaled mvaled commented May 22, 2026

No description provided.

mvaled and others added 19 commits May 22, 2026 11:56
The plan combines two converging refactors:

- A dedicated SSE render executor so PostgreSQL connection count is
  bounded by a small pool of long-lived worker threads instead of
  growing with SSE stream count.
- A single command pipeline shared by HTTP and SSE so SSE handlers can
  yield the full djhtmx command graph (notably `Emit`), and the
  duplicate command match in `sse.py` disappears.

The two are framed as phases of the same architectural arc: Phase 1
stabilizes the sync/async boundary so Phase 2 can change what runs on
the sync side without disturbing how it gets there.  The plan covers
deployment topology (dedicated Granian SSE process behind Caddy
`/_sse/*`), sizing in terms of `WEB_DB_POOL_MAX_SIZE`, the
psycopg-pool `max_lifetime` interaction, connection health under PG
restarts, backpressure, tail-latency math, and a phased implementation
order interleaving both phases.
Move the `Destroy`, `Redirect`, `Open`, `Focus`, `ScrollIntoView`,
`DispatchDOMEvent`, `SkipRender`, `BuildAndRender`, `Render`, `Emit`,
`Signal`, and `Execute` command dataclasses (plus the `Command` union)
from `component.py` into the existing `commands.py` module, alongside
`SendHtml`, `PushURL`, and `ReplaceURL`.

`component.py` now contains only `HtmxComponent`, `Triggers`,
`SSEEventRouter`, and component-machinery helpers, which keeps that
module focused on the component-base-class concerns it has always
owned.  All internal imports (`consumer.py`, `testing.py`, `urls.py`,
`command_queue.py`, tests) point to `djhtmx.commands`, and `__all__`
on `commands.py` is filled out so downstream code has a stable import
target.

No behavior change.
A new `djhtmx.sse_executor` module hosts a long-lived
`ThreadPoolExecutor` with `DJHTMX_SSE_RENDER_WORKERS` worker threads
(default 8).  Each worker keeps its own Django DB connection across
renders, so PostgreSQL connection count is bounded by pool size
instead of growing one-per-stream as it did under the previous
sticky-thread `sync_to_async` model.

The render path in `sse.py` now dispatches through `submit_sse_render`
which:

- Propagates `contextvars.Context` so Sentry scope and tracing spans
  reach the worker thread (custom executors don't get this from
  `loop.run_in_executor` automatically).
- Rotates each worker's DB connection every
  `DJHTMX_SSE_RENDER_ROTATE_EVERY` renders so psycopg-pool's
  `max_lifetime` can actually retire aged-out connections.
- Runs `is_usable()` every `DJHTMX_SSE_RENDER_HEALTHCHECK_EVERY`
  renders and closes on `OperationalError`/`InterfaceError` so PG
  restarts and `pg_terminate_backend` don't poison a worker.
- Falls back to thread-sensitive `sync_to_async` under
  `utils.runtime.is_testing()`, which routes the render back to the
  test thread so `TestCase`-style transactional tests still see
  uncommitted data.

The per-render `connections.close()` stopgap in
`_render_consumer_sse_events` is removed; the executor's connection
lifecycle replaces it.  The acute Sentry KAIKO-JH leak remains fixed
and the design also amortizes the connection handshake across many
renders instead of paying it on every drain.
The command loop that drives an event through its yielded commands
moves out of `Repository._run_command` / `_process_emited_commands`
and into a dedicated `djhtmx.command_processor.CommandProcessor`.

`Repository.dispatch_event` and `adispatch_event` are now ~5-line
wrappers that hand a list of root commands to
`CommandProcessor(self).process(...)`.  `Repository` keeps the
responsibilities tied to component state — loading, storing,
building, rendering HTML, session storage, query parameter patching —
i.e. the *state* a command consults.  The *decisions* about what each
command means now live in one place.

HTTP behavior is byte-for-byte identical (existing tests untouched
beyond a one-line `from djhtmx.command_processor import
CommandProcessor` in `test_session.py` where the old
`repo._run_command` was called directly).

This sets up Phase 2 of the SSE-generalized-worker plan: SSE renders
will eventually run the same `CommandProcessor.process(...)` against a
session-scoped `Repository`, picking up `Emit`, `Signal`, render
coalescing, and the full browser command set without the duplicate
match in `sse.py`.
A new `djhtmx.command_response` module owns the single place that maps
a `ProcessedCommand` stream to wire effects.  `CommandBatch` is the
transport-neutral accumulator (HTML fragments, browser commands,
optional URL state); `to_http_response` turns it into an `HttpResponse`
with the same HX headers, trigger headers, and OOB delete fragments
the previous `urls.endpoint` loop produced.

`urls.endpoint` collapses from a ~70-line per-iteration command match
into four lines: build the batch from `repo.dispatch_event(...)` and
hand it to `to_http_response`.

Externally visible HTTP behavior is unchanged, including the rule
that `HX-Redirect` suppresses `HX-Push-Url` and `HX-Replace-Url`.

This sets up Phase 2.4 of the SSE-generalized-worker plan: the SSE
renderer will dispatch through the same `CommandProcessor` and
serialize through a sibling SSE serializer that consumes the same
`CommandBatch`, eliminating the duplicate command match currently in
`sse.py`.
Two related updates to docs/plans/sse-generalized-worker.md:

1. Add a Progress section after Status that lists all eight phases with
   `[x]`/`[ ]` markers.  Phase 1, 2.1, and 2.5 are done; 2.2, 2.3, 2.4,
   2.6, and 2.7 remain.  Lets the plan act as the live progress
   tracker for the refactor rather than relying on commit archaeology.

2. Tighten the "Emit from SSE" subsection to state explicitly that
   `Emit` is always session-local.  No opt-in cross-session republish
   path; `emit_sse_event` remains the only cross-session mechanism.
   The previous wording left room for "session-local + opt-in
   republish" which would blur the producer contract and complicate
   failure-mode reasoning.
Three explicit unions in `djhtmx.commands` make each command's role
clear:

- `Command` — handler-yieldable.  Public API surface for event
  handlers (`Render`, `BuildAndRender`, `Destroy`, `Emit`,
  `SkipRender`, `Open`, `Focus`, `ScrollIntoView`, `Redirect`,
  `DispatchDOMEvent`, `PushURL`, `ReplaceURL`, `Execute`).
- `InternalCommand` — queue-only, never yielded by handlers.
  `Signal` and the new `HandleSSEEvents`.
- `ProcessedCommand` — wire-effect subset that `CommandBatch` and
  the transport serializers consume.  Re-exported from `repo.py` so
  existing `from djhtmx.repo import ProcessedCommand` keeps working.

Previously the legacy `Command` union conflated handler-yieldable
with queue-internal (`Signal` was in it).  Pyright now catches
handlers that accidentally yield `Signal` / `HandleSSEEvents` —
they're no longer assignable to `Command`.  `Execute` stays in
`Command` because user code legitimately yields it to chain into
another handler.

`HandleSSEEvents` is the new SSE entry-point command.  Its case in
`CommandProcessor._run_command` loads the consumer's component,
iterates envelopes through `_handle_sse_events`, wraps handler
exceptions as `Emit(HtmxUnhandledError(...))`, and routes yields
through `_process_emited_commands(during_execute=False)`.  Phase
2.4 will switch `sse.py` to enqueue this instead of running its
ad-hoc command match.

`CommandPriority` (NamedTuple) replaces the bare
`tuple[int, str, int]` return of `_priority`, making the sort key
self-documenting at the call sites.

Every command dataclass now carries a substantive docstring covering
whether it is handler-yieldable / internal / transport-output, the
semantics, and when to use it from application code.  `Execute` vs
`DispatchDOMEvent` is explicitly clarified as "server-side method
dispatch" vs "browser-side DOM event fire".

The small `case _ as unreachable` → `case unreachable` simplification
in `consumer.py` and the matching coverage exclude in
`pyproject.toml` follow the same pattern already used in
`command_queue.py`.
Sub-phase 2.2 in the plan now covers the full scope landed in the
preceding commit:

- The three-way type split between `Command`, `InternalCommand`, and
  `ProcessedCommand`, with the rationale for keeping `Execute` in
  `Command` (user code yields it to chain handlers).
- A short comparison of `Execute` vs `DispatchDOMEvent` for the
  recurring "what's the difference?" question.
- The `HandleSSEEvents` introduction (previously the entire scope of
  2.2; now sub-section 2.2.b).
- A docstring pass on every command dataclass.

Progress checklist updated: Phase 2.2 is `[x]`.
The Unreleased section had grown into a refactor diary — references to
phases, `CommandProcessor`, `CommandBatch`, `HandleSSEEvents`, the
`connections.close()` stopgap.  Readers of the next release don't need
the path we took; they need to know what changed.

Trimmed to three audience-relevant items:

- **Added**: the SSE channel and the new SSE sizing settings.
- **Changed**: command classes now live in `djhtmx.commands` (the only
  source-level change downstream consumers must adapt to), and the
  narrower handler-yield typing (`Signal`/`SendHtml` no longer
  yieldable).
- **Fixed**: the PG connection exhaustion under SSE load, described
  as the symptom and the resolution without the internal mechanism.
After the Phase 2.2 type split, `Command` excludes `None` and the
yield type is `Iterable[Command] | None`.  At runtime today, a yielded
`None` from an HTTP handler crashes in `CommandQueue._priority`'s
`assert_never`; only the legacy SSE ad-hoc loop had explicit
`case None:` handling.  Aligning the spec with that reality:

- `docs/plans/sse-spec.md`: removed the "`yield None` means default
  render" semantic.  Default render is triggered by an empty yield
  (no commands) or a bare `return`/`pass`.  Updated the `PDFButton`
  example accordingly.
- `docs/plans/sse-generalized-worker.md`: Phase 2.3 rewritten to
  describe normalisation against the new contract — Rule 3 explicitly
  states handlers must not yield `None`, and no special case is added
  in the normaliser.  Test-plan and yield-type lines updated to match.
Three small changes round out the handler-return contract that Phase
2.2 established at the type level:

- Drop the `c for c in yielded if c is not None` defensive filter in
  the new `HandleSSEEvents` case.  `Command` excludes `None`, so a
  yielded `None` is undefined behaviour — let it surface via the
  downstream `assert_never` instead of silently swallowing.
- Rename `_process_emited_commands` → `_process_emitted_commands` and
  the local `emited_commands` variables likewise.  The typo had been
  duplicated across all three handler entry points.
- Document the normalisation rules on the helper.  All three entry
  points (`Execute`, `Emit` fan-out, `HandleSSEEvents`) feed the same
  function, so the rules live in one docstring covering: implicit
  default render when nothing is yielded; `SkipRender(self)`
  suppressing only that default; `during_execute=True` forcing
  non-lazy partial renders; query-patcher signal/`ReplaceURL` emission.

Behaviour is unchanged for code that doesn't rely on the
under-specified `yield None` shortcut (which would have crashed on
the HTTP path anyway).  Plan progress checklist now marks Phase 2.3
done.
`render_sse_event_fragments` and `render_sse_heartbeat_fragments` no
longer call `_handle_sse_events` directly through an ad-hoc match.
They group pending envelopes by consumer, build one `HandleSSEEvents`
command per consumer, and submit a single sync `_drain_sse_session`
job to the render executor.  That job constructs one `Repository`
for the session, runs all consumers' work through one
`CommandProcessor.process(...)` (one `CommandQueue`, one render
coalescing pass, one `Emit` fan-out cascade), and serialises the
resulting `ProcessedCommand` stream via the new `to_sse_fragments`.

Consequences for SSE handlers:

- `yield Emit(...)` now wakes `_handle_event` listeners in the same
  session — page-global notifications (`FeedbackMessages`) work
  without any special case.
- `yield Render`, `BuildAndRender`, `Destroy`, `Open` all flow
  through the shared processor and shared serialiser.
- `Focus`, `ScrollIntoView`, `DispatchDOMEvent`, `Redirect`,
  `PushURL`, `ReplaceURL` from an SSE handler are logged as
  "unsupported" until Phase 2.6 generalises the browser command sink.

`_render_consumer_sse_events` and `_render_open_command` are deleted
from `sse.py`; `to_sse_fragments` + a new in-module
`_render_open_command` live in `djhtmx.command_response` so the
serialiser stays next to its sibling `to_http_response`.
`sse_command_sink_id` is removed (it had no callers outside the
deleted helper).

The `HandleSSEEvents` case in `_run_command` silently skips when
`get_component_by_id` returns `Destroy` (stale consumer record).
This matches the deleted helper's behaviour and avoids double-emitting
OOB deletes; Phase 2.8 will tighten the lifecycle so the case is
cold defence-in-depth instead of routine.

The `TodoCounter`/`TodoItem` fixtures used the now-removed
`yield None` shortcut; they're updated to use `pass` / `return` for
the default-render case.

The `Unreleased` CHANGELOG is collapsed back to release-notes scale:
one `Added` for the experimental SSE channel + its settings, one
`Changed` for the command-class import paths and the narrower
`Command` union.  All SSE-specific behaviour has no prior release to
contrast against, so its details are part of the new feature
description, not separate entries.
- Phase 2.4 marked `[x]` in the progress checklist.
- New sub-phase 2.8: "Clean up SSE consumer records on Destroy".  The
  Phase 2.4 SSE rewrite surfaced a pre-existing lifecycle gap — when
  a component is destroyed, the SSE consumer record in Redis and its
  topic/type index memberships aren't cleaned up.  Producers keep
  enqueuing events for the orphaned consumer until TTL; the drain
  silently skips them via the `case Destroy(): return` defence in
  `CommandProcessor._run_command`.  Phase 2.8 adds a symmetric
  `unregister_component` to `sse.py` and wires it into the destroy
  cascade so the silent-skip becomes cold code.
- Linked the comment in `command_processor.py` to the new sub-phase
  so future readers know the leak is tracked.
`register_component` had no symmetric inverse, so destroying a
component left its SSE consumer record (`consumer_key`,
`consumer_indexes_key`) and topic/type index memberships behind.
Producers (`emit_sse_event`) kept enqueuing events for the orphaned
consumer until TTL; the drain swallowed them via the
`case Destroy(): return` guard in `CommandProcessor`'s
`HandleSSEEvents` case.  All wasted work.

`unregister_consumer(session_id, component_id)` in `sse.py` is the
symmetric inverse: srem the consumer id from every index it was a
member of, delete `consumer_key`, delete the consumer's indexes set,
srem from the session consumers set.

`Repository.unregister_component` diffs `session.unregistered`
before/after the Session call and runs `unregister_consumer` for each
newly-destroyed id (the explicit component plus any children
cascaded by `Session.unregister_component`).  One call per destroyed
component.

The `case Destroy(): return` defence in
`command_processor._run_command` stays as cold-path safety net for
TTL/race scenarios where cleanup hasn't completed yet.
Two halves of the same wire-protocol change land together; splitting
would leave an intermediate state where the server emits a format the
browser can't read.

### Phase 2.6 — Generalized SSE command sink

The session-scoped sink (`<div data-djhtmx-sse-command-sink="...">`
rendered by `SSEEventRouter`) now carries the full browser command
set, not just `Open`.  `_render_sink_command(session_id, command)` in
`command_response.py` encodes each command as:

    <div hx-swap-oob="beforeend: #djhtmx-sse-commands-<hash>">
      <template data-djhtmx-browser-command
                data-session="<hash>"
                data-payload="<base64url(json(dataclasses.asdict(command)))>"></template>
    </div>

The `<template>` wrapper keeps the browser from rendering or scanning
the inert command content; only data attributes are read by the
MutationObserver-driven processor.  Base64url-encoded JSON sidesteps
HTML-attribute escaping for arbitrary `DispatchDOMEvent.detail`
payloads.

`to_sse_fragments` now routes `Focus`, `ScrollIntoView`, `Open`,
`DispatchDOMEvent`, `Redirect`, `PushURL`, and `ReplaceURL` through
the sink.  No more "unsupported from SSE handlers" log lines.
`BrowserSinkCommand` union added to type the helper.

### Phase 2.7 — Unified JS executor

`django.js` reorganises around `executeBrowserCommand(commandData)`
that switches on `commandData.command`:
`open-tab`, `focus`, `scroll_into_view`, `redirect`, `push_url`,
`replace_url`, `dispatch_dom_event`, `destroy`.

Three transports feed it:

- SSE sink: MutationObserver picks up
  `[data-djhtmx-browser-command]` templates, base64url-decodes the
  payload, dispatches.
- HTMX `HX-Trigger-After-Settle` events (`hxFocus`,
  `hxScrollIntoView`, `hxOpenURL`, `hxDispatchDOMEvent`): per-event
  handlers convert event detail to command payloads and feed the
  executor.
- WebSocket JSON messages: `htmx:wsBeforeMessage` parses and
  dispatches.  The legacy WS `dispatch_event` name is normalised to
  `dispatch_dom_event`.  `send_state` is preserved as a WS-only
  side door (not a browser command).

Same-origin and target-name validation for `open-tab`; same-origin
validation for `redirect`.

Net effect: SSE handlers can now emit any browser command and have
the browser act on it; the three transports share one safety-checked
executor instead of three near-duplicates.
Progress checklist is now fully ticked.  The SSE-generalized-worker
plan that started as the unified blueprint for the PG-connection
hotfix follow-up has landed end-to-end on this branch:

- Phase 1 — render executor (deployed).
- Phase 2.1 — `CommandProcessor` extracted from `Repository`.
- Phase 2.2 — command type split, `HandleSSEEvents`, docstrings.
- Phase 2.3 — handler-return normalisation, `yield None` removed.
- Phase 2.4 — SSE rewrites to dispatch through `CommandProcessor`.
- Phase 2.5 — transport-neutral `CommandBatch` + HTTP serialiser.
- Phase 2.6 — generalised SSE command sink.
- Phase 2.7 — unified browser-side command executor.
- Phase 2.8 — consumer cleanup on Destroy.

What this enables for SSE handlers, from one branch:

- `yield Emit(...)` for session-local fan-out.
- The full browser command set through the session-scoped sink.
- Bounded PG connections via the render executor.
- No consumer-record leaks on destroy.

The next branch can ship a release that surfaces these in the
CHANGELOG.
`TodoList._handle_sse_events`'s cross-session `TodoItemAdded` branch
yielded `BuildAndRender.append("#todo-list", TodoItem, ...)` to insert
the new item in place, but didn't suppress the implicit default
`Render(TodoList)`.  Both ended up in the queue:

- `Render(TodoList, oob="true")`            — bucket 70, key starts "hx-…"
- `Render(TodoItem, oob="beforeend: #todo-list")` — bucket 70, key starts "item-id-…"

Within the bucket, sort-by-key put TodoList first (`"h" < "i"`), so
the SSE fragments reached Browser B as:

1. OOB `true` replaces the TodoList outer div — the fresh `#todo-list`
   already contains the new item (DB has it by now).
2. OOB `beforeend: #todo-list` appends another copy.

Browser B then showed the new item twice.  (Browser A wasn't affected
because the HTTP-side handler in `ListHeader.add` only does the
targeted append; it doesn't re-render `TodoList`.)

Fix: explicitly `yield SkipRender(self)` in the cross-session-add
branch.  TodoList is handing off the DOM update to the targeted
`BuildAndRender.append(...)`, so the full default render is wrong and
creates the race.
Sentry/Logfire spans for the two paths most likely to show contention
under load:

- `djhtmx.CommandProcessor.process` wraps every dispatch.  Tagged with
  `session` (hashed) and `roots` (count of root commands in the
  initial queue, so an HTTP `Execute` is `1`, an SSE drain with N
  consumers is N).  Time inside this span is the full
  load-component → handler → fan-out → render cycle for one event
  source.
- `djhtmx.sse.iteration` wraps each turn of the SSE endpoint's
  `while True` loop, with nested spans for the heartbeat drain
  (`djhtmx.sse.heartbeat_drain`, tagged with the firing paces), the
  event drain (`djhtmx.sse.event_drain`), and the pub/sub wait
  (`djhtmx.sse.wait`, tagged with the timeout used).  The wait span
  sits outside the iteration span so an idle stream shows long waits
  between short iterations rather than long iterations.

All spans tag `session` with `compact_hash(session_id)` so the raw id
doesn't leak to traces.  Spans are no-ops if Sentry and Logfire are
unavailable or disabled via `DJHTMX_ENABLE_*_TRACING`.
sse-spec.md: rewrite end-to-end to describe what currently exists.

- Drop "first version" / "MVP" / "later versions" framing.
- Correct `_handle_sse_events`: `Emit` is *not* rejected.  The full command set
  (Render, Destroy, Focus, ScrollIntoView, Open, DispatchDOMEvent, Redirect,
  PushURL, ReplaceURL, Emit) is supported over SSE.
- Replace the imaginary central in-process broker with the real per-task
  pubsub topology (each SSE task owns its own session wake-channel
  subscription).
- Replace the "unconditional close per drain" DB story with the actual
  dedicated ThreadPoolExecutor and its DJHTMX_SSE_RENDER_* settings.
- Fix `SSEHeartbeat` shape (BaseModel, not @DataClass).
- Correct browser command sink markup: `<template data-djhtmx-browser-command
  data-session=... data-payload=base64url(json)>`, not the per-attribute
  `data-command=open` form.
- Note that each OOB fragment is sent as its own `event: djhtmx` message.
- Drop the `SSEModelEvent` section — not implemented.

Module headers and docstrings: strip references to historical phases and to
docs/plans/sse-generalized-worker.md, and replace "matching the previous
behavior" phrasings with statements of what the code does now.  Affected:
commands.py, command_processor.py, command_response.py, settings.py, sse.py,
sse_executor.py.

sse-generalized-worker.md: reflowed for markdown conventions (no paragraph
wrapping, two spaces after sentence-ending periods).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@mvaled mvaled force-pushed the sse-executor-worker branch from 982b1e4 to 0821a02 Compare May 24, 2026 10:34
mvaled added 2 commits May 24, 2026 13:28
The SSE async loop's `pubsub.get_message` wait was hard-coded to 15s in
`urls.py` while the `SSE_HEARTBEAT_DEFAULT_TIMEOUT` setting (default 30)
existed unused.  Finish the integration: rename the setting to
`SSE_HEARTBEAT_TIMEOUT` (its role is the cap on any computed heartbeat
wait, not a default), wire `urls.py` through it, and refresh the
adjacent comment so the worst-case recovery window for lost pub/sub
wakes is described against the live setting instead of the obsolete
literal.

Document the same semantics in the SSE spec: the cap shortens when a
heartbeat tick is sooner, becomes the wait when no heartbeats are
scheduled, and bounds how long a lost wake can delay event delivery.
The previous diagram drew the "other" route leaving the SSE Granian
box, which read as the SSE process forwarding non-SSE traffic to the
HTTP process.  In reality Caddy is the fan-out: it routes `/_sse*` to
the SSE Granian process and everything else to the HTTP process, and
the two upstreams share nothing beyond Redis and Postgres.

Redraw both upstreams as siblings under Caddy and add a one-line
caption stating the fan-out explicitly so the diagram cannot be
misread again.
@mvaled mvaled marked this pull request as ready for review May 24, 2026 11:33
@mvaled mvaled mentioned this pull request May 24, 2026
@mvaled
Copy link
Copy Markdown
Collaborator Author

mvaled commented May 24, 2026

Code review

No issues found. Checked for bugs and CLAUDE.md compliance.

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

mvaled and others added 5 commits May 24, 2026 15:35
Add a thin dual-backend metric helper layer (`metric_incr`,
`metric_distribution`) in `djhtmx.tracing`, mirroring the existing
`tracing_span` pattern: each call publishes through Sentry's metrics
API and Logfire's instruments when those backends are enabled, and is a
no-op otherwise.  Logfire instruments are cached because its factories
return per-instrument objects that must be reused — recreating them per
call would double-register the metric.

Use the helpers from the SSE render executor to surface what's
otherwise invisible behind the worker pool: `queue_depth` and
`queue_wait_ms` distributions track backpressure shape, `duration_ms`
tracks per-render cost, and counters cover `drops` (queue-cap hits),
`rotations` (`SSE_RENDER_ROTATE_EVERY` cycles), `healthcheck_closes`
(`is_usable()` rejections), and `broken_connection_closes`
(`OperationalError`/`InterfaceError` reseats).  Operators tuning
`DJHTMX_SSE_RENDER_*` need these to see whether the pool is sized
correctly without instrumenting downstream services.
…stream

When `DJHTMX_SSE_RENDER_QUEUE_MAX` is set, `submit_sse_render` raises
`SSERenderQueueFull` as the back-pressure signal once the executor
backlog reaches the cap.  The async SSE stream in `sse_endpoint` had
only a bare `except Exception:` that logged and re-raised, so the
exception propagated out of the generator and tore the connection
down.  HTMX's auto-reconnect then immediately re-opened a stream that
hit the same saturated queue and crashed again — converting "overload"
into "reconnect storm at Caddy/Granian/Redis," which is precisely the
runaway-resource symptom the cap was added to prevent.

The documented intent in `docs/plans/sse-spec.md` and
`docs/plans/sse-generalized-worker.md` is "log and drop, continue."
Catch `SSERenderQueueFull` at the two call sites in the stream loop
(heartbeat drain and event drain), log a warning, and substitute an
empty fragment list so the iteration falls through to the next
`pubsub.get_message` without yielding anything.  The events that
triggered the queue-full are lost in either path (they're already
removed from the Redis list before submission), so the only behavioural
difference is whether the connection survives — and surviving is
strictly the better outcome under overload.  Drops are counted at the
raise site via `djhtmx.sse.render.drops`, so the silent path remains
observable.
`render_sse_event_fragments` reads the session events list with
`LRANGE 0 -1` and then issues a separate `DELETE`.  A concurrent
`emit_sse_event` that RPUSHes between the two commands has its
envelope wiped by the `DELETE` and is never delivered — the consumer
silently misses events under load.

Mark the spot with a TODO that names the two viable atomic
replacements (`LPOP COUNT` on Redis 6.2+, or a `MULTI`/`EXEC` block)
so the fix is unambiguous when someone picks it up.  No behaviour
change in this commit.
`render_sse_event_fragments` used to read the events list with a bare
`LRANGE 0 -1` and then issue a separate `DELETE`.  A concurrent
`emit_sse_event` that `RPUSH`ed an envelope between the two commands had
its event wiped by the `DELETE` and never delivered — the consumer
silently missed events under load.  The previous commit only marked the
spot with a TODO; this one fixes it.

Wrap the read-and-clear in a `MULTI/EXEC` pipeline so the two commands
execute as a single atomic unit.  Any `RPUSH` that arrives during the
transaction either lands before (and is drained) or after (and remains
queued for the next iteration) — no event can be silently dropped.

`async_lrange` was only used at this one site and is now unreachable;
remove it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`Session.flush` previously issued an `HDEL`, then up to three `HSET`s,
then an `EXPIRE` as separate Redis commands.  A concurrent
`_ensure_read` calling `HGETALL` on the same session key could observe a
half-written state — for example freshly updated component states paired
with a stale `__subs__` / `__children__` from the previous flush — and
build a Session whose subscription graph did not match its component
states.

Wrap all four mutations and the `EXPIRE` in a transactional pipeline so
readers either see the old session in full or the new one in full.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
mvaled and others added 5 commits May 24, 2026 19:04
`register_component` and `unregister_consumer` both mutate the consumer
record, the per-consumer indexes set, and the shared event/topic index
sets across many separate Redis commands.  Concurrent calls for the
*same* consumer (a quick re-mount, or a destroy that races a re-render)
interleave their SREMs and SADDs against the shared sets.  Each writer
sees a stale `old_indexes` snapshot and leaves orphan `id_` entries in
shared sets it did not know it had to clean up.  Those orphans persist
for the lifetime of any peer subscriber to the same event/topic, causing
spurious `emit_sse_event` deliveries and wasted render-pool cycles.

Wrap both functions in an optimistic transaction: `WATCH` the
per-consumer indexes set, snapshot it, then queue every mutation inside
`MULTI/EXEC`.  Because both functions watch the same key, concurrent
register/unregister calls arbitrate via `WatchError` and retry against
fresh state instead of interleaving.  The retry budget is bounded by
the new `DJHTMX_SSE_REGISTER_MAX_ATTEMPTS` setting (default 8); the
function raises if it exhausts, surfacing pathological contention
instead of silently busy-looping.

The transactional pipeline also collapses the previous ~13 round-trips
on the happy path down to 3 (WATCH, SMEMBERS, EXEC), so the fix pays for
itself on Redis latency even when there is no contention.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wrap `register_component` and `unregister_consumer` in `tracing_span` so
the WATCH/EXEC retry latency shows up directly in Sentry/Logfire, and so
the `RuntimeError` raised on exhausted contention attaches to a span
that already carries the session and component tags.

Span names follow the existing `djhtmx.sse.*` convention used by the
streaming loop in `urls.py`.  Tags use `compact_hash` for the
session_id; `register_component` tags the human-readable `hx_name`,
while `unregister_consumer` only has the component_id available so it
hashes that too.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This is a left over from the initial commit 81811d0 'feat(sse): add
router smoke endpoint'.
@mvaled mvaled force-pushed the sse-executor-worker branch from 9de853d to 8ceb609 Compare May 25, 2026 15:33
@mvaled mvaled merged commit 0b34e88 into master May 25, 2026
2 of 3 checks passed
@mvaled mvaled deleted the sse-executor-worker branch May 25, 2026 15:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant