feat: Wire event dispatcher for ctx.events.dispatch#157
Merged
Conversation
Add a real implementation behind the placeholder `EventDispatcher`
introduced earlier. Each `RpcService` now stands up a per-instance
event dispatcher core: a publish channel and a `DashSet` of declared
`{source}.events` topic exchanges. Exchanges are declared lazily the
first time a given source emits.
`EventDispatcher::dispatch<T: Serialize>(source, event_type, &payload)`
is `pub async`. Per delivery, the consumer derives a dispatcher
stamped with the inbound delivery's headers via `with_parent_headers`,
so `nameko.call_id_stack` propagates through to emitted events.
`nameko.AMQP_URI` is always stamped. Messages use durable delivery
(mode 2), JSON content-type, and the routing key is the event type.
The new `examples/src/event_emitter.rs` demonstrates a service that
emits `user_created` on the `users.events` exchange.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ⓘ You've reached your Qodo monthly free-tier limit. Reviews pause until next month — upgrade your plan to continue now, or link your paid account if you already have one. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Closes the third and final piece of the async-handler refactor — events.
girolle/src/events.rs—EventDispatcherCoreowns a publish channel and aDashSet<String>cache of declared{source}.eventstopic exchanges (durable: true). Exchanges are declared lazily the first time a given source emits, so a single dispatcher can fan out events for multiple sources.EventDispatchernow holdsOption<Arc<EventDispatcherCore>>plus the parent delivery'sFieldTable.pub async fn dispatch<T: Serialize>(source_service, event_type, &payload)serializes the payload to JSON and publishes with persistent delivery, JSON content-type, andevent_typeas the routing key.nameko.AMQP_URIis stamped onto headers and the parent'snameko.call_id_stack(when present) is propagated through.RpcServicestartup callsEventDispatcherCore::new(&conn, conf, id)once and wires the resulting dispatcher intoSharedData. Per delivery the consumer doeswith_parent_headers(inbound_headers.clone())for bothrpcandevents.examples/src/event_emitter.rs— a service that emits auser_createdevent onusers.eventsfrom inside a handler.Acceptance
With this merged, a handler can:
RpcContextcarrying inbound metadatactx.rpc.call(...)ctx.events.dispatch(...)…which closes out the original async-handler issue's acceptance criteria.
Known limitations (deferred)
call_id_stackpropagated as-is from the parent (matches the RPC caller's behavior; see feat: Wire in-service RPC core for ctx.rpc.call #156).Test plan
🤖 Generated with Claude Code