SSE executor worker #56
Merged
Merged
Conversation
The plan combines two converging refactors: - A dedicated SSE render executor so PostgreSQL connection count is bounded by a small pool of long-lived worker threads instead of growing with SSE stream count. - A single command pipeline shared by HTTP and SSE so SSE handlers can yield the full djhtmx command graph (notably `Emit`), and the duplicate command match in `sse.py` disappears. The two are framed as phases of the same architectural arc: Phase 1 stabilizes the sync/async boundary so Phase 2 can change what runs on the sync side without disturbing how it gets there. The plan covers deployment topology (dedicated Granian SSE process behind Caddy `/_sse/*`), sizing in terms of `WEB_DB_POOL_MAX_SIZE`, the psycopg-pool `max_lifetime` interaction, connection health under PG restarts, backpressure, tail-latency math, and a phased implementation order interleaving both phases.
Move the `Destroy`, `Redirect`, `Open`, `Focus`, `ScrollIntoView`, `DispatchDOMEvent`, `SkipRender`, `BuildAndRender`, `Render`, `Emit`, `Signal`, and `Execute` command dataclasses (plus the `Command` union) from `component.py` into the existing `commands.py` module, alongside `SendHtml`, `PushURL`, and `ReplaceURL`. `component.py` now contains only `HtmxComponent`, `Triggers`, `SSEEventRouter`, and component-machinery helpers, which keeps that module focused on the component-base-class concerns it has always owned. All internal imports (`consumer.py`, `testing.py`, `urls.py`, `command_queue.py`, tests) point to `djhtmx.commands`, and `__all__` on `commands.py` is filled out so downstream code has a stable import target. No behavior change.
A new `djhtmx.sse_executor` module hosts a long-lived `ThreadPoolExecutor` with `DJHTMX_SSE_RENDER_WORKERS` worker threads (default 8). Each worker keeps its own Django DB connection across renders, so PostgreSQL connection count is bounded by pool size instead of growing one-per-stream as it did under the previous sticky-thread `sync_to_async` model. The render path in `sse.py` now dispatches through `submit_sse_render` which: - Propagates `contextvars.Context` so Sentry scope and tracing spans reach the worker thread (custom executors don't get this from `loop.run_in_executor` automatically). - Rotates each worker's DB connection every `DJHTMX_SSE_RENDER_ROTATE_EVERY` renders so psycopg-pool's `max_lifetime` can actually retire aged-out connections. - Runs `is_usable()` every `DJHTMX_SSE_RENDER_HEALTHCHECK_EVERY` renders and closes on `OperationalError`/`InterfaceError` so PG restarts and `pg_terminate_backend` don't poison a worker. - Falls back to thread-sensitive `sync_to_async` under `utils.runtime.is_testing()`, which routes the render back to the test thread so `TestCase`-style transactional tests still see uncommitted data. The per-render `connections.close()` stopgap in `_render_consumer_sse_events` is removed; the executor's connection lifecycle replaces it. The acute Sentry KAIKO-JH leak remains fixed and the design also amortizes the connection handshake across many renders instead of paying it on every drain.
The command loop that drives an event through its yielded commands moves out of `Repository._run_command` / `_process_emited_commands` and into a dedicated `djhtmx.command_processor.CommandProcessor`. `Repository.dispatch_event` and `adispatch_event` are now ~5-line wrappers that hand a list of root commands to `CommandProcessor(self).process(...)`. `Repository` keeps the responsibilities tied to component state — loading, storing, building, rendering HTML, session storage, query parameter patching — i.e. the *state* a command consults. The *decisions* about what each command means now live in one place. HTTP behavior is byte-for-byte identical (existing tests untouched beyond a one-line `from djhtmx.command_processor import CommandProcessor` in `test_session.py` where the old `repo._run_command` was called directly). This sets up Phase 2 of the SSE-generalized-worker plan: SSE renders will eventually run the same `CommandProcessor.process(...)` against a session-scoped `Repository`, picking up `Emit`, `Signal`, render coalescing, and the full browser command set without the duplicate match in `sse.py`.
A new `djhtmx.command_response` module owns the single place that maps a `ProcessedCommand` stream to wire effects. `CommandBatch` is the transport-neutral accumulator (HTML fragments, browser commands, optional URL state); `to_http_response` turns it into an `HttpResponse` with the same HX headers, trigger headers, and OOB delete fragments the previous `urls.endpoint` loop produced. `urls.endpoint` collapses from a ~70-line per-iteration command match into four lines: build the batch from `repo.dispatch_event(...)` and hand it to `to_http_response`. Externally visible HTTP behavior is unchanged, including the rule that `HX-Redirect` suppresses `HX-Push-Url` and `HX-Replace-Url`. This sets up Phase 2.4 of the SSE-generalized-worker plan: the SSE renderer will dispatch through the same `CommandProcessor` and serialize through a sibling SSE serializer that consumes the same `CommandBatch`, eliminating the duplicate command match currently in `sse.py`.
Two related updates to docs/plans/sse-generalized-worker.md: 1. Add a Progress section after Status that lists all eight phases with `[x]`/`[ ]` markers. Phase 1, 2.1, and 2.5 are done; 2.2, 2.3, 2.4, 2.6, and 2.7 remain. Lets the plan act as the live progress tracker for the refactor rather than relying on commit archaeology. 2. Tighten the "Emit from SSE" subsection to state explicitly that `Emit` is always session-local. No opt-in cross-session republish path; `emit_sse_event` remains the only cross-session mechanism. The previous wording left room for "session-local + opt-in republish" which would blur the producer contract and complicate failure-mode reasoning.
Three explicit unions in `djhtmx.commands` make each command's role clear: - `Command` — handler-yieldable. Public API surface for event handlers (`Render`, `BuildAndRender`, `Destroy`, `Emit`, `SkipRender`, `Open`, `Focus`, `ScrollIntoView`, `Redirect`, `DispatchDOMEvent`, `PushURL`, `ReplaceURL`, `Execute`). - `InternalCommand` — queue-only, never yielded by handlers. `Signal` and the new `HandleSSEEvents`. - `ProcessedCommand` — wire-effect subset that `CommandBatch` and the transport serializers consume. Re-exported from `repo.py` so existing `from djhtmx.repo import ProcessedCommand` keeps working. Previously the legacy `Command` union conflated handler-yieldable with queue-internal (`Signal` was in it). Pyright now catches handlers that accidentally yield `Signal` / `HandleSSEEvents` — they're no longer assignable to `Command`. `Execute` stays in `Command` because user code legitimately yields it to chain into another handler. `HandleSSEEvents` is the new SSE entry-point command. Its case in `CommandProcessor._run_command` loads the consumer's component, iterates envelopes through `_handle_sse_events`, wraps handler exceptions as `Emit(HtmxUnhandledError(...))`, and routes yields through `_process_emited_commands(during_execute=False)`. Phase 2.4 will switch `sse.py` to enqueue this instead of running its ad-hoc command match. `CommandPriority` (NamedTuple) replaces the bare `tuple[int, str, int]` return of `_priority`, making the sort key self-documenting at the call sites. Every command dataclass now carries a substantive docstring covering whether it is handler-yieldable / internal / transport-output, the semantics, and when to use it from application code. `Execute` vs `DispatchDOMEvent` is explicitly clarified as "server-side method dispatch" vs "browser-side DOM event fire". The small `case _ as unreachable` → `case unreachable` simplification in `consumer.py` and the matching coverage exclude in `pyproject.toml` follow the same pattern already used in `command_queue.py`.
Sub-phase 2.2 in the plan now covers the full scope landed in the preceding commit: - The three-way type split between `Command`, `InternalCommand`, and `ProcessedCommand`, with the rationale for keeping `Execute` in `Command` (user code yields it to chain handlers). - A short comparison of `Execute` vs `DispatchDOMEvent` for the recurring "what's the difference?" question. - The `HandleSSEEvents` introduction (previously the entire scope of 2.2; now sub-section 2.2.b). - A docstring pass on every command dataclass. Progress checklist updated: Phase 2.2 is `[x]`.
The Unreleased section had grown into a refactor diary — references to phases, `CommandProcessor`, `CommandBatch`, `HandleSSEEvents`, the `connections.close()` stopgap. Readers of the next release don't need the path we took; they need to know what changed. Trimmed to three audience-relevant items: - **Added**: the SSE channel and the new SSE sizing settings. - **Changed**: command classes now live in `djhtmx.commands` (the only source-level change downstream consumers must adapt to), and the narrower handler-yield typing (`Signal`/`SendHtml` no longer yieldable). - **Fixed**: the PG connection exhaustion under SSE load, described as the symptom and the resolution without the internal mechanism.
After the Phase 2.2 type split, `Command` excludes `None` and the yield type is `Iterable[Command] | None`. At runtime today, a yielded `None` from an HTTP handler crashes in `CommandQueue._priority`'s `assert_never`; only the legacy SSE ad-hoc loop had explicit `case None:` handling. Aligning the spec with that reality: - `docs/plans/sse-spec.md`: removed the "`yield None` means default render" semantic. Default render is triggered by an empty yield (no commands) or a bare `return`/`pass`. Updated the `PDFButton` example accordingly. - `docs/plans/sse-generalized-worker.md`: Phase 2.3 rewritten to describe normalisation against the new contract — Rule 3 explicitly states handlers must not yield `None`, and no special case is added in the normaliser. Test-plan and yield-type lines updated to match.
Three small changes round out the handler-return contract that Phase 2.2 established at the type level: - Drop the `c for c in yielded if c is not None` defensive filter in the new `HandleSSEEvents` case. `Command` excludes `None`, so a yielded `None` is undefined behaviour — let it surface via the downstream `assert_never` instead of silently swallowing. - Rename `_process_emited_commands` → `_process_emitted_commands` and the local `emited_commands` variables likewise. The typo had been duplicated across all three handler entry points. - Document the normalisation rules on the helper. All three entry points (`Execute`, `Emit` fan-out, `HandleSSEEvents`) feed the same function, so the rules live in one docstring covering: implicit default render when nothing is yielded; `SkipRender(self)` suppressing only that default; `during_execute=True` forcing non-lazy partial renders; query-patcher signal/`ReplaceURL` emission. Behaviour is unchanged for code that doesn't rely on the under-specified `yield None` shortcut (which would have crashed on the HTTP path anyway). Plan progress checklist now marks Phase 2.3 done.
`render_sse_event_fragments` and `render_sse_heartbeat_fragments` no longer call `_handle_sse_events` directly through an ad-hoc match. They group pending envelopes by consumer, build one `HandleSSEEvents` command per consumer, and submit a single sync `_drain_sse_session` job to the render executor. That job constructs one `Repository` for the session, runs all consumers' work through one `CommandProcessor.process(...)` (one `CommandQueue`, one render coalescing pass, one `Emit` fan-out cascade), and serialises the resulting `ProcessedCommand` stream via the new `to_sse_fragments`. Consequences for SSE handlers: - `yield Emit(...)` now wakes `_handle_event` listeners in the same session — page-global notifications (`FeedbackMessages`) work without any special case. - `yield Render`, `BuildAndRender`, `Destroy`, `Open` all flow through the shared processor and shared serialiser. - `Focus`, `ScrollIntoView`, `DispatchDOMEvent`, `Redirect`, `PushURL`, `ReplaceURL` from an SSE handler are logged as "unsupported" until Phase 2.6 generalises the browser command sink. `_render_consumer_sse_events` and `_render_open_command` are deleted from `sse.py`; `to_sse_fragments` + a new in-module `_render_open_command` live in `djhtmx.command_response` so the serialiser stays next to its sibling `to_http_response`. `sse_command_sink_id` is removed (it had no callers outside the deleted helper). The `HandleSSEEvents` case in `_run_command` silently skips when `get_component_by_id` returns `Destroy` (stale consumer record). This matches the deleted helper's behaviour and avoids double-emitting OOB deletes; Phase 2.8 will tighten the lifecycle so the case is cold defence-in-depth instead of routine. The `TodoCounter`/`TodoItem` fixtures used the now-removed `yield None` shortcut; they're updated to use `pass` / `return` for the default-render case. The `Unreleased` CHANGELOG is collapsed back to release-notes scale: one `Added` for the experimental SSE channel + its settings, one `Changed` for the command-class import paths and the narrower `Command` union. All SSE-specific behaviour has no prior release to contrast against, so its details are part of the new feature description, not separate entries.
- Phase 2.4 marked `[x]` in the progress checklist. - New sub-phase 2.8: "Clean up SSE consumer records on Destroy". The Phase 2.4 SSE rewrite surfaced a pre-existing lifecycle gap — when a component is destroyed, the SSE consumer record in Redis and its topic/type index memberships aren't cleaned up. Producers keep enqueuing events for the orphaned consumer until TTL; the drain silently skips them via the `case Destroy(): return` defence in `CommandProcessor._run_command`. Phase 2.8 adds a symmetric `unregister_component` to `sse.py` and wires it into the destroy cascade so the silent-skip becomes cold code. - Linked the comment in `command_processor.py` to the new sub-phase so future readers know the leak is tracked.
`register_component` had no symmetric inverse, so destroying a component left its SSE consumer record (`consumer_key`, `consumer_indexes_key`) and topic/type index memberships behind. Producers (`emit_sse_event`) kept enqueuing events for the orphaned consumer until TTL; the drain swallowed them via the `case Destroy(): return` guard in `CommandProcessor`'s `HandleSSEEvents` case. All wasted work. `unregister_consumer(session_id, component_id)` in `sse.py` is the symmetric inverse: srem the consumer id from every index it was a member of, delete `consumer_key`, delete the consumer's indexes set, srem from the session consumers set. `Repository.unregister_component` diffs `session.unregistered` before/after the Session call and runs `unregister_consumer` for each newly-destroyed id (the explicit component plus any children cascaded by `Session.unregister_component`). One call per destroyed component. The `case Destroy(): return` defence in `command_processor._run_command` stays as cold-path safety net for TTL/race scenarios where cleanup hasn't completed yet.
Two halves of the same wire-protocol change land together; splitting
would leave an intermediate state where the server emits a format the
browser can't read.
### Phase 2.6 — Generalized SSE command sink
The session-scoped sink (`<div data-djhtmx-sse-command-sink="...">`
rendered by `SSEEventRouter`) now carries the full browser command
set, not just `Open`. `_render_sink_command(session_id, command)` in
`command_response.py` encodes each command as:
<div hx-swap-oob="beforeend: #djhtmx-sse-commands-<hash>">
<template data-djhtmx-browser-command
data-session="<hash>"
data-payload="<base64url(json(dataclasses.asdict(command)))>"></template>
</div>
The `<template>` wrapper keeps the browser from rendering or scanning
the inert command content; only data attributes are read by the
MutationObserver-driven processor. Base64url-encoded JSON sidesteps
HTML-attribute escaping for arbitrary `DispatchDOMEvent.detail`
payloads.
`to_sse_fragments` now routes `Focus`, `ScrollIntoView`, `Open`,
`DispatchDOMEvent`, `Redirect`, `PushURL`, and `ReplaceURL` through
the sink. No more "unsupported from SSE handlers" log lines.
`BrowserSinkCommand` union added to type the helper.
### Phase 2.7 — Unified JS executor
`django.js` reorganises around `executeBrowserCommand(commandData)`
that switches on `commandData.command`:
`open-tab`, `focus`, `scroll_into_view`, `redirect`, `push_url`,
`replace_url`, `dispatch_dom_event`, `destroy`.
Three transports feed it:
- SSE sink: MutationObserver picks up
`[data-djhtmx-browser-command]` templates, base64url-decodes the
payload, dispatches.
- HTMX `HX-Trigger-After-Settle` events (`hxFocus`,
`hxScrollIntoView`, `hxOpenURL`, `hxDispatchDOMEvent`): per-event
handlers convert event detail to command payloads and feed the
executor.
- WebSocket JSON messages: `htmx:wsBeforeMessage` parses and
dispatches. The legacy WS `dispatch_event` name is normalised to
`dispatch_dom_event`. `send_state` is preserved as a WS-only
side door (not a browser command).
Same-origin and target-name validation for `open-tab`; same-origin
validation for `redirect`.
Net effect: SSE handlers can now emit any browser command and have
the browser act on it; the three transports share one safety-checked
executor instead of three near-duplicates.
Progress checklist is now fully ticked. The SSE-generalized-worker plan that started as the unified blueprint for the PG-connection hotfix follow-up has landed end-to-end on this branch: - Phase 1 — render executor (deployed). - Phase 2.1 — `CommandProcessor` extracted from `Repository`. - Phase 2.2 — command type split, `HandleSSEEvents`, docstrings. - Phase 2.3 — handler-return normalisation, `yield None` removed. - Phase 2.4 — SSE rewrites to dispatch through `CommandProcessor`. - Phase 2.5 — transport-neutral `CommandBatch` + HTTP serialiser. - Phase 2.6 — generalised SSE command sink. - Phase 2.7 — unified browser-side command executor. - Phase 2.8 — consumer cleanup on Destroy. What this enables for SSE handlers, from one branch: - `yield Emit(...)` for session-local fan-out. - The full browser command set through the session-scoped sink. - Bounded PG connections via the render executor. - No consumer-record leaks on destroy. The next branch can ship a release that surfaces these in the CHANGELOG.
`TodoList._handle_sse_events`'s cross-session `TodoItemAdded` branch
yielded `BuildAndRender.append("#todo-list", TodoItem, ...)` to insert
the new item in place, but didn't suppress the implicit default
`Render(TodoList)`. Both ended up in the queue:
- `Render(TodoList, oob="true")` — bucket 70, key starts "hx-…"
- `Render(TodoItem, oob="beforeend: #todo-list")` — bucket 70, key starts "item-id-…"
Within the bucket, sort-by-key put TodoList first (`"h" < "i"`), so
the SSE fragments reached Browser B as:
1. OOB `true` replaces the TodoList outer div — the fresh `#todo-list`
already contains the new item (DB has it by now).
2. OOB `beforeend: #todo-list` appends another copy.
Browser B then showed the new item twice. (Browser A wasn't affected
because the HTTP-side handler in `ListHeader.add` only does the
targeted append; it doesn't re-render `TodoList`.)
Fix: explicitly `yield SkipRender(self)` in the cross-session-add
branch. TodoList is handing off the DOM update to the targeted
`BuildAndRender.append(...)`, so the full default render is wrong and
creates the race.
Sentry/Logfire spans for the two paths most likely to show contention under load: - `djhtmx.CommandProcessor.process` wraps every dispatch. Tagged with `session` (hashed) and `roots` (count of root commands in the initial queue, so an HTTP `Execute` is `1`, an SSE drain with N consumers is N). Time inside this span is the full load-component → handler → fan-out → render cycle for one event source. - `djhtmx.sse.iteration` wraps each turn of the SSE endpoint's `while True` loop, with nested spans for the heartbeat drain (`djhtmx.sse.heartbeat_drain`, tagged with the firing paces), the event drain (`djhtmx.sse.event_drain`), and the pub/sub wait (`djhtmx.sse.wait`, tagged with the timeout used). The wait span sits outside the iteration span so an idle stream shows long waits between short iterations rather than long iterations. All spans tag `session` with `compact_hash(session_id)` so the raw id doesn't leak to traces. Spans are no-ops if Sentry and Logfire are unavailable or disabled via `DJHTMX_ENABLE_*_TRACING`.
sse-spec.md: rewrite end-to-end to describe what currently exists. - Drop "first version" / "MVP" / "later versions" framing. - Correct `_handle_sse_events`: `Emit` is *not* rejected. The full command set (Render, Destroy, Focus, ScrollIntoView, Open, DispatchDOMEvent, Redirect, PushURL, ReplaceURL, Emit) is supported over SSE. - Replace the imaginary central in-process broker with the real per-task pubsub topology (each SSE task owns its own session wake-channel subscription). - Replace the "unconditional close per drain" DB story with the actual dedicated ThreadPoolExecutor and its DJHTMX_SSE_RENDER_* settings. - Fix `SSEHeartbeat` shape (BaseModel, not @DataClass). - Correct browser command sink markup: `<template data-djhtmx-browser-command data-session=... data-payload=base64url(json)>`, not the per-attribute `data-command=open` form. - Note that each OOB fragment is sent as its own `event: djhtmx` message. - Drop the `SSEModelEvent` section — not implemented. Module headers and docstrings: strip references to historical phases and to docs/plans/sse-generalized-worker.md, and replace "matching the previous behavior" phrasings with statements of what the code does now. Affected: commands.py, command_processor.py, command_response.py, settings.py, sse.py, sse_executor.py. sse-generalized-worker.md: reflowed for markdown conventions (no paragraph wrapping, two spaces after sentence-ending periods). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
982b1e4 to
0821a02
Compare
The SSE async loop's `pubsub.get_message` wait was hard-coded to 15s in `urls.py` while the `SSE_HEARTBEAT_DEFAULT_TIMEOUT` setting (default 30) existed unused. Finish the integration: rename the setting to `SSE_HEARTBEAT_TIMEOUT` (its role is the cap on any computed heartbeat wait, not a default), wire `urls.py` through it, and refresh the adjacent comment so the worst-case recovery window for lost pub/sub wakes is described against the live setting instead of the obsolete literal. Document the same semantics in the SSE spec: the cap shortens when a heartbeat tick is sooner, becomes the wait when no heartbeats are scheduled, and bounds how long a lost wake can delay event delivery.
The previous diagram drew the "other" route leaving the SSE Granian box, which read as the SSE process forwarding non-SSE traffic to the HTTP process. In reality Caddy is the fan-out: it routes `/_sse*` to the SSE Granian process and everything else to the HTTP process, and the two upstreams share nothing beyond Redis and Postgres. Redraw both upstreams as siblings under Caddy and add a one-line caption stating the fan-out explicitly so the diagram cannot be misread again.
Closed
Collaborator
Author
Code reviewNo issues found. Checked for bugs and CLAUDE.md compliance. 🤖 Generated with Claude Code - If this code review was useful, please react with 👍. Otherwise, react with 👎. |
Add a thin dual-backend metric helper layer (`metric_incr`, `metric_distribution`) in `djhtmx.tracing`, mirroring the existing `tracing_span` pattern: each call publishes through Sentry's metrics API and Logfire's instruments when those backends are enabled, and is a no-op otherwise. Logfire instruments are cached because its factories return per-instrument objects that must be reused — recreating them per call would double-register the metric. Use the helpers from the SSE render executor to surface what's otherwise invisible behind the worker pool: `queue_depth` and `queue_wait_ms` distributions track backpressure shape, `duration_ms` tracks per-render cost, and counters cover `drops` (queue-cap hits), `rotations` (`SSE_RENDER_ROTATE_EVERY` cycles), `healthcheck_closes` (`is_usable()` rejections), and `broken_connection_closes` (`OperationalError`/`InterfaceError` reseats). Operators tuning `DJHTMX_SSE_RENDER_*` need these to see whether the pool is sized correctly without instrumenting downstream services.
…stream When `DJHTMX_SSE_RENDER_QUEUE_MAX` is set, `submit_sse_render` raises `SSERenderQueueFull` as the back-pressure signal once the executor backlog reaches the cap. The async SSE stream in `sse_endpoint` had only a bare `except Exception:` that logged and re-raised, so the exception propagated out of the generator and tore the connection down. HTMX's auto-reconnect then immediately re-opened a stream that hit the same saturated queue and crashed again — converting "overload" into "reconnect storm at Caddy/Granian/Redis," which is precisely the runaway-resource symptom the cap was added to prevent. The documented intent in `docs/plans/sse-spec.md` and `docs/plans/sse-generalized-worker.md` is "log and drop, continue." Catch `SSERenderQueueFull` at the two call sites in the stream loop (heartbeat drain and event drain), log a warning, and substitute an empty fragment list so the iteration falls through to the next `pubsub.get_message` without yielding anything. The events that triggered the queue-full are lost in either path (they're already removed from the Redis list before submission), so the only behavioural difference is whether the connection survives — and surviving is strictly the better outcome under overload. Drops are counted at the raise site via `djhtmx.sse.render.drops`, so the silent path remains observable.
`render_sse_event_fragments` reads the session events list with `LRANGE 0 -1` and then issues a separate `DELETE`. A concurrent `emit_sse_event` that RPUSHes between the two commands has its envelope wiped by the `DELETE` and is never delivered — the consumer silently misses events under load. Mark the spot with a TODO that names the two viable atomic replacements (`LPOP COUNT` on Redis 6.2+, or a `MULTI`/`EXEC` block) so the fix is unambiguous when someone picks it up. No behaviour change in this commit.
`render_sse_event_fragments` used to read the events list with a bare `LRANGE 0 -1` and then issue a separate `DELETE`. A concurrent `emit_sse_event` that `RPUSH`ed an envelope between the two commands had its event wiped by the `DELETE` and never delivered — the consumer silently missed events under load. The previous commit only marked the spot with a TODO; this one fixes it. Wrap the read-and-clear in a `MULTI/EXEC` pipeline so the two commands execute as a single atomic unit. Any `RPUSH` that arrives during the transaction either lands before (and is drained) or after (and remains queued for the next iteration) — no event can be silently dropped. `async_lrange` was only used at this one site and is now unreachable; remove it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`Session.flush` previously issued an `HDEL`, then up to three `HSET`s, then an `EXPIRE` as separate Redis commands. A concurrent `_ensure_read` calling `HGETALL` on the same session key could observe a half-written state — for example freshly updated component states paired with a stale `__subs__` / `__children__` from the previous flush — and build a Session whose subscription graph did not match its component states. Wrap all four mutations and the `EXPIRE` in a transactional pipeline so readers either see the old session in full or the new one in full. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`register_component` and `unregister_consumer` both mutate the consumer record, the per-consumer indexes set, and the shared event/topic index sets across many separate Redis commands. Concurrent calls for the *same* consumer (a quick re-mount, or a destroy that races a re-render) interleave their SREMs and SADDs against the shared sets. Each writer sees a stale `old_indexes` snapshot and leaves orphan `id_` entries in shared sets it did not know it had to clean up. Those orphans persist for the lifetime of any peer subscriber to the same event/topic, causing spurious `emit_sse_event` deliveries and wasted render-pool cycles. Wrap both functions in an optimistic transaction: `WATCH` the per-consumer indexes set, snapshot it, then queue every mutation inside `MULTI/EXEC`. Because both functions watch the same key, concurrent register/unregister calls arbitrate via `WatchError` and retry against fresh state instead of interleaving. The retry budget is bounded by the new `DJHTMX_SSE_REGISTER_MAX_ATTEMPTS` setting (default 8); the function raises if it exhausts, surfacing pathological contention instead of silently busy-looping. The transactional pipeline also collapses the previous ~13 round-trips on the happy path down to 3 (WATCH, SMEMBERS, EXEC), so the fix pays for itself on Redis latency even when there is no contention. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wrap `register_component` and `unregister_consumer` in `tracing_span` so the WATCH/EXEC retry latency shows up directly in Sentry/Logfire, and so the `RuntimeError` raised on exhausted contention attaches to a span that already carries the session and component tags. Span names follow the existing `djhtmx.sse.*` convention used by the streaming loop in `urls.py`. Tags use `compact_hash` for the session_id; `register_component` tags the human-readable `hx_name`, while `unregister_consumer` only has the component_id available so it hashes that too. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This is a left over from the initial commit 81811d0 'feat(sse): add router smoke endpoint'.
9de853d to
8ceb609
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.