fix(core): acquire _lock in EventBus.publish() to serialize shared-state mutations#109
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (2)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughThe ChangesEventBus Concurrent Publish Synchronization
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
|
Hi @sreerevanth, a gentle follow-up on this PR. It has been 1 day since filing. All checks are passing and there are no conflicts. Please review when you get a chance. Happy to make any adjustments. |
…ate mutations _lock was created in __init__ but never acquired in publish(), subscribe_fn(), or unsubscribe(). Two concurrent coroutines calling publish() simultaneously could both append to _event_log and increment _stats counters without synchronization, causing lost stat increments and non-deterministic log truncation. Changes: - Wrap the _event_log append, length trim, and _stats increments in 'async with self._lock' inside publish(). - Snapshot matching handler registrations inside the lock, then release the lock before dispatching. This prevents slow or I/O-bound handlers from blocking concurrent publish calls while still serializing all writes to shared state. - subscribe() and subscribe_fn() and unsubscribe() run synchronously in asyncio (no yield points between dict writes), so no lock is needed there; a comment explains this. Closes sreerevanth#100
…est_adapter_crewai
5a24383 to
c12d525
Compare
|
Branch rebased onto current `main` and lint failures resolved. The `Test & lint` CI was failing because:
All checks should be green on the new push. Please re-review when you get a chance. |
Description
EventBus._lockwas created in__init__but never acquired inpublish(),subscribe_fn(), orunsubscribe(). Two concurrent coroutines callingpublish()simultaneously could both append to_event_logand increment_statscounters without synchronization, causing lost stat increments and non-deterministic log truncation.Related Issue
Closes #100
Type of Change
Root Cause
The
asyncio.Lockwas allocated but no call site ever usedasync with self._lock. All mutations to_event_logand_statsran outside any lock, so coroutines that yielded between the append and the length trim (or between reads and increments of_stats) could observe inconsistent state.Changes Made
agentwatch/core/event_bus.py_event_logappend, length trim, and_statsincrements inasync with self._lockinsidepublish()agentwatch/core/event_bus.pypublish()callsThe lock only guards the shared-state mutations. Handler dispatch runs outside it, which preserves the existing concurrency of
asyncio.gatherfor handler fan-out.Screenshots or Demo
Not applicable (no UI change).
Testing Done
ruff checkandruff format --checkpass.pytest tests/test_core.py -vto confirm existing event bus tests pass.bus.publish(event)500 times concurrently on the sameEventBus, then assertbus.stats()["total_published"] == 1000. Without the lock this assertion fails intermittently.Checklist
NSSoC Label Request
@sreerevanth could you please add the appropriate NSoC 26' label to this PR? It helps with tracking and scoring under NSoC '26. Thank you!
Summary by CodeRabbit