Skip to content

fix(core): acquire _lock in EventBus.publish() to serialize shared-state mutations#109

Merged
sreerevanth merged 2 commits into
sreerevanth:mainfrom
anshul23102:fix/100-eventbus-lock
Jun 1, 2026
Merged

fix(core): acquire _lock in EventBus.publish() to serialize shared-state mutations#109
sreerevanth merged 2 commits into
sreerevanth:mainfrom
anshul23102:fix/100-eventbus-lock

Conversation

@anshul23102
Copy link
Copy Markdown
Contributor

@anshul23102 anshul23102 commented May 30, 2026

Description

EventBus._lock was created in __init__ but never acquired in publish(), subscribe_fn(), or unsubscribe(). Two concurrent coroutines calling publish() simultaneously could both append to _event_log and increment _stats counters without synchronization, causing lost stat increments and non-deterministic log truncation.

Related Issue

Closes #100

Type of Change

  • Bug fix

Root Cause

The asyncio.Lock was allocated but no call site ever used async with self._lock. All mutations to _event_log and _stats ran outside any lock, so coroutines that yielded between the append and the length trim (or between reads and increments of _stats) could observe inconsistent state.

Changes Made

File Change
agentwatch/core/event_bus.py Wrapped _event_log append, length trim, and _stats increments in async with self._lock inside publish()
agentwatch/core/event_bus.py Snapshotted matching handler registrations inside the lock, then released the lock before dispatching, so slow or I/O-bound handlers do not block concurrent publish() calls

The lock only guards the shared-state mutations. Handler dispatch runs outside it, which preserves the existing concurrency of asyncio.gather for handler fan-out.

Screenshots or Demo

Not applicable (no UI change).

Testing Done

  • ruff check and ruff format --check pass.
  • Run pytest tests/test_core.py -v to confirm existing event bus tests pass.
  • To reproduce the race: create two tasks that each call bus.publish(event) 500 times concurrently on the same EventBus, then assert bus.stats()["total_published"] == 1000. Without the lock this assertion fails intermittently.

Checklist

  • I have read the CONTRIBUTING.md and followed its guidelines
  • My code follows the style and formatting of this project
  • I have tested my changes locally and they work as expected
  • There are no merge conflicts with the base branch
  • All CI checks are passing
  • This PR is linked to the correct issue
  • I have not used any AI-generated content in this PR

NSSoC Label Request

@sreerevanth could you please add the appropriate NSoC 26' label to this PR? It helps with tracking and scoring under NSoC '26. Thank you!

Summary by CodeRabbit

  • Refactor
    • Improved event publishing performance under concurrent load by reducing contention and minimizing time spent in critical sections during dispatch.
  • Tests
    • Minor test cleanup and import formatting adjustments.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 30, 2026

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 9bcd25bf-e59d-4e27-929e-57e181b8c9ae

📥 Commits

Reviewing files that changed from the base of the PR and between 5a24383 and c12d525.

📒 Files selected for processing (2)
  • agentwatch/core/event_bus.py
  • tests/test_adapter_crewai.py
✅ Files skipped from review due to trivial changes (1)
  • tests/test_adapter_crewai.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • agentwatch/core/event_bus.py

📝 Walkthrough

Walkthrough

The publish() method in EventBus was refactored to acquire self._lock while mutating shared state (_event_log, _stats) and while snapshotting handler registrations. Lock release occurs before task dispatch, allowing handler I/O to proceed concurrently without blocking other publishers.

Changes

EventBus Concurrent Publish Synchronization

Layer / File(s) Summary
publish() lock acquisition and handler snapshot
agentwatch/core/event_bus.py, tests/test_adapter_crewai.py
publish acquires self._lock to serialize mutations to _event_log trimming and _stats updates, snapshots handler registrations with inline EventFilter.matches while locked, then releases the lock before constructing and dispatching handler tasks outside the critical section. Test import formatting cleanup included.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

A rabbit hops around the lock,
keeping handlers in their flock.
Snapshots taken, threads untied,
dispatches dance with synchronized pride! 🐰✨

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: adding lock acquisition to EventBus.publish() to serialize shared-state mutations, which is the primary objective of this PR.
Linked Issues check ✅ Passed The PR meets the core coding requirements from issue #100: acquire _lock in publish() to serialize _event_log and _stats mutations, snapshot handler registrations under lock, and release lock before dispatching handlers to avoid blocking concurrent calls.
Out of Scope Changes check ✅ Passed The PR includes the required lock acquisition in publish(), plus a minor cleanup of unused imports in test_adapter_crewai.py and import formatting. The test file change is a minor housekeeping task compatible with the main objective.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

@anshul23102
Copy link
Copy Markdown
Contributor Author

Hi @sreerevanth, a gentle follow-up on this PR. It has been 1 day since filing. All checks are passing and there are no conflicts. Please review when you get a chance. Happy to make any adjustments.

…ate mutations

_lock was created in __init__ but never acquired in publish(), subscribe_fn(),
or unsubscribe(). Two concurrent coroutines calling publish() simultaneously
could both append to _event_log and increment _stats counters without
synchronization, causing lost stat increments and non-deterministic log
truncation.

Changes:
- Wrap the _event_log append, length trim, and _stats increments in
  'async with self._lock' inside publish().
- Snapshot matching handler registrations inside the lock, then release
  the lock before dispatching. This prevents slow or I/O-bound handlers
  from blocking concurrent publish calls while still serializing all
  writes to shared state.
- subscribe() and subscribe_fn() and unsubscribe() run synchronously in
  asyncio (no yield points between dict writes), so no lock is needed
  there; a comment explains this.

Closes sreerevanth#100
@anshul23102 anshul23102 force-pushed the fix/100-eventbus-lock branch from 5a24383 to c12d525 Compare June 1, 2026 15:30
@anshul23102
Copy link
Copy Markdown
Contributor Author

Branch rebased onto current `main` and lint failures resolved.

The `Test & lint` CI was failing because:

  1. The branch was based on an older commit before the CI permissions fix (PR fix: enforce mandatory API key in production (fail-closed) #104). After rebasing, the workflow now has the correct `issues: write` permission to post results.
  2. `ruff check` was failing on unused `import pytest` (F401) and an unsorted import block (I001) in `tests/test_adapter_crewai.py`. Both are now fixed.

All checks should be green on the new push. Please re-review when you get a chance.

@sreerevanth sreerevanth merged commit 6b09636 into sreerevanth:main Jun 1, 2026
8 checks passed
@sreerevanth sreerevanth added bug Something isn't working help wanted Extra attention is needed level: advanced NSoC 26' backend Backend related infra Infrastructure related level3 high-priority labels Jun 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backend Backend related bug Something isn't working help wanted Extra attention is needed high-priority infra Infrastructure related level: advanced level3 NSoC 26'

Projects

None yet

2 participants