Skip to content

fix: support cmux 0.64.12 cmux-events protocol (events were dropped + stream flapped)#2

Open
fractionasian wants to merge 2 commits into
NewTurn2017:mainfrom
fractionasian:fix/cmux-events-protocol-0.64.12
Open

fix: support cmux 0.64.12 cmux-events protocol (events were dropped + stream flapped)#2
fractionasian wants to merge 2 commits into
NewTurn2017:mainfrom
fractionasian:fix/cmux-events-protocol-0.64.12

Conversation

@fractionasian
Copy link
Copy Markdown

@fractionasian fractionasian commented Jun 3, 2026

Problem

Against current cmux (tested on 0.64.12), the relay drops all events from cmux, the events.stream connection flaps attach/detach in a tight loop, and every attach pays a ~5s delay. The dispatch path (workspace/surface RPCs) is unaffected.

Root cause

cmux 0.64.12 emits events via the cmux-events protocol: each frame carries its own id (a per-boot sequence) and a "protocol": "cmux-events" tag, with no type discriminator. In CMUXClient.deliver(), the "try RPCResponse first" branch decodes these (because of the id), finds no matching pending request, and silently drops them — so they never reach the PushFrame/event path.

Separately, EventStream.start() subscribes via call(), but cmux acks the subscribe with a subscription envelope that has no matching RPC id, so the call always waits the full request timeout before the stream is treated as attached.

Fix

  • Detect cmux-events frames in deliver() (by the protocol tag) and route them to the push handler before the RPC-response check. Frames with category+name become events; the subscription envelope/heartbeats are ignored.
  • Add CMUXClient.send() (fire-and-forget) and use it for the events.stream subscribe, removing the per-attach timeout.
  • The legacy type:event push-frame path is left intact for backwards compatibility.

Tests

Adds testForwardsCmuxEventsProtocol covering the new wire shape. Note: I built with the Swift 6.3 command-line toolchain (no Xcode), which can't compile the swift-testing test targets, so I verified the fix by live integration against cmux 0.64.12 — a real notification event decodes through the new path, attach is immediate, and the stream stays stable.

Heads-up (not addressed here)

cmux 0.64.12 also gates its control socket to in-tree processes (automation.socketControlMode = "cmuxOnly"): a launchd-spawned relay (as in scripts/install-launchd.sh) is denied with ERROR: Access denied — only processes started inside cmux can connect. Running the relay inside a cmux pane works. Might be worth a docs note or a socket-password path in the installer — happy to follow up if useful.

Summary by CodeRabbit

  • New Features

    • Added a non-blocking send to improve event subscription startup and streaming responsiveness
  • Bug Fixes

    • Properly routes cmux-events protocol frames so they are delivered as events rather than misdecoded
    • Write failures during subscription are now logged and handled to avoid silent failures
  • Tests

    • Added a test ensuring cmux-events frames are forwarded to the event sink correctly

cmux 0.64.12 emits events via the cmux-events protocol: frames carry their
own id (a per-boot sequence) and a protocol tag, with no type discriminator.
deliver() decoded these as RPCResponses, found no matching pending request,
and silently dropped every event. Route cmux-events frames to the push
handler before the RPC-response check, and make the events.stream subscribe
fire-and-forget (cmux acks with a subscription envelope that has no matching
RPC id, so call() always paid the full request timeout before attaching).

Note: cmux 0.64.12 also gates the control socket to in-tree processes
(automation.socketControlMode=cmuxOnly), so the relay must run inside a cmux
pane rather than via launchd.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Jun 3, 2026

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 7d7d9b3f-857b-4d91-9915-f67782116048

📥 Commits

Reviewing files that changed from the base of the PR and between 2f0c9e0 and f93f98a.

📒 Files selected for processing (2)
  • Sources/CMUXClient/CMUXClient.swift
  • Sources/CMUXClient/EventStream.swift
🚧 Files skipped from review as they are similar to previous changes (2)
  • Sources/CMUXClient/EventStream.swift
  • Sources/CMUXClient/CMUXClient.swift

📝 Walkthrough

Walkthrough

This PR adds a fire-and-forget send(method:params:) API, decodes and routes top-level cmux 0.64.12 cmux-events frames to the event push handler, switches event subscriptions to use the fire-and-forget send, and adds a test verifying cmux-events routing.

Changes

cmux-events Frame Support

Layer / File(s) Summary
Fire-and-forget RPC send method
Sources/CMUXClient/CMUXClient.swift
New public send(method:params:) writes and flushes JSON-encoded requests without awaiting or registering a pending continuation, with failure handling that logs and closes the channel.
cmux-events frame decoding and routing
Sources/CMUXClient/CMUXClient.swift
Private CmuxEventsFrame Decodable added; deliver(line:) now attempts to decode frames with a top-level protocol discriminator, maps matching frames to EventFrame (payload .null if absent), dispatches via pushHandler, and returns early.
Event stream subscription with fire-and-forget send
Sources/CMUXClient/EventStream.swift
EventStream.start(categories:) uses the fire-and-forget send(...) to subscribe to events.stream, adds a logger, and logs a warning if the subscribe write fails.
cmux-events frame routing verification
Tests/CMUXClientTests/EventStreamTests.swift
New async test testForwardsCmuxEventsProtocol sends an envelope and a real cmux-events protocol frame and asserts the event sink receives exactly one EventFrame with expected category and name.

Sequence Diagram

sequenceDiagram
  participant Client
  participant EventStream
  participant CMUXClient
  participant pushHandler
  
  Client->>EventStream: start(categories:)
  EventStream->>CMUXClient: send(method: "events.stream", params:)
  CMUXClient->>CMUXClient: JSON encode, write, flush
  Note over CMUXClient: No pending continuation registered
  
  CMUXClient->>CMUXClient: Receive cmux-events frame
  CMUXClient->>CMUXClient: Decode CmuxEventsFrame
  CMUXClient->>CMUXClient: Check protocol == "cmux-events"
  CMUXClient->>CMUXClient: Map to EventFrame
  CMUXClient->>pushHandler: dispatch(EventFrame)
  pushHandler->>EventStream: Forward to sink
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

🐰 I stitched a send that hops away,
No wait, no block, just swift relay,
When frames declare the cmux song,
I route them right where they belong,
A twitch, a thump — the events play on.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 16.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and specifically describes the main fix: support for cmux 0.64.12 cmux-events protocol, and concisely explains the impact (events were dropped + stream flapped).
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
Sources/CMUXClient/EventStream.swift (1)

15-16: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don’t mark stream started when subscribe fails.

started is set before subscribe and try? drops errors, so a failed subscribe can permanently lock this instance in a non-attached state.

Suggested change
 public func start(categories: [EventCategory]) async {
     guard !started else { return }
-    started = true
     let sink = self.sink
     await client.onEventStream { frame in
         if case .event(let ev) = frame {
             sink(ev)
         }
     }
     let cats: JSONValue = .array(categories.map { .string($0.rawValue) })
-    try? await client.send(method: "events.stream", params: .object(["categories": cats]))
+    do {
+        try await client.send(method: "events.stream", params: .object(["categories": cats]))
+        started = true
+    } catch {
+        started = false
+    }
 }

Also applies to: 28-28

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@Sources/CMUXClient/EventStream.swift` around lines 15 - 16, The code sets
EventStream.started before calling subscribe and uses try? which swallows
errors, so a failed subscribe can leave the instance marked started while not
attached; change the flow in the EventStream start routine (and the other
occurrence at the second start site) to only set started = true after subscribe
completes successfully (or set started inside the success branch), remove the
try? around subscribe so errors are surfaced or catch and handle them, and if
you must keep an optimistic started assignment ensure you revert started = false
when subscribe fails.
🧹 Nitpick comments (1)
Tests/CMUXClientTests/EventStreamTests.swift (1)

81-88: ⚡ Quick win

Polling loop weakly verifies the "envelope is ignored" guarantee.

The loop exits as soon as sink.count() > 0, i.e. on the first delivered frame. Since the test's purpose is to confirm the subscription envelope is not delivered as an event, a regression that wrongly routed the envelope could be masked by timing: if the envelope is misrouted and arrives as a second event after the real one, the count == 1 read may race against the second append. The envelope is sent first over the ordered connection, so first?.category/first?.name offers partial protection, but the count == 1 assertion isn't a reliable guard.

Consider settling for a brief fixed window after the first event lands so a wrongly-delivered envelope frame has time to arrive before asserting the count.

♻️ Suggested settle step before asserting count
         for _ in 0..<20 where await sink.count() == 0 {
             try await Task.sleep(nanoseconds: 30_000_000)
         }
+        // Give a wrongly-routed envelope frame time to arrive so `count == 1`
+        // actually proves the envelope was ignored.
+        try await Task.sleep(nanoseconds: 90_000_000)
         let count = await sink.count()
         XCTAssertEqual(count, 1, "envelope should be ignored; exactly one event delivered")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@Tests/CMUXClientTests/EventStreamTests.swift` around lines 81 - 88, The
polling loop that exits as soon as sink.count() > 0 is racy for the "envelope is
ignored" guarantee; change the test around the loop that uses sink.count(),
Task.sleep(...) and sink.first() so that once the first event is observed you
wait a short fixed settle window (e.g. another Task.sleep for ~50_000_000 ns)
before re-checking await sink.count() and asserting XCTAssertEqual(count, 1) and
validating first?.category/name; this ensures a misrouted envelope arriving
slightly later would be observed before the assertion.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@Sources/CMUXClient/CMUXClient.swift`:
- Around line 82-97: CMUXClient.send currently drops async write failures by
calling channel.writeAndFlush(..., promise: nil); change it to surface write
errors (either by returning/awaiting the EventLoopFuture from writeAndFlush or
making send async and awaiting the write) and attach a failure handler similar
to call (use the promise/future's whenFailure to set terminalError or rethrow an
appropriate CMUXClientError), so write failures between isActive and flush
aren't ignored. Update callers (notably EventStream.start where you do try?
await client.send(...)) to stop swallowing errors—propagate or handle the
failure so CmuxRelay's reconnect logic can run when the subscribe write fails.
Ensure you still check channel.isActive and keep existing identifiers
(CMUXClient.send, terminalError, CMUXClientError.channelClosed,
EventStream.start, CmuxRelay) so the change is localized.

---

Outside diff comments:
In `@Sources/CMUXClient/EventStream.swift`:
- Around line 15-16: The code sets EventStream.started before calling subscribe
and uses try? which swallows errors, so a failed subscribe can leave the
instance marked started while not attached; change the flow in the EventStream
start routine (and the other occurrence at the second start site) to only set
started = true after subscribe completes successfully (or set started inside the
success branch), remove the try? around subscribe so errors are surfaced or
catch and handle them, and if you must keep an optimistic started assignment
ensure you revert started = false when subscribe fails.

---

Nitpick comments:
In `@Tests/CMUXClientTests/EventStreamTests.swift`:
- Around line 81-88: The polling loop that exits as soon as sink.count() > 0 is
racy for the "envelope is ignored" guarantee; change the test around the loop
that uses sink.count(), Task.sleep(...) and sink.first() so that once the first
event is observed you wait a short fixed settle window (e.g. another Task.sleep
for ~50_000_000 ns) before re-checking await sink.count() and asserting
XCTAssertEqual(count, 1) and validating first?.category/name; this ensures a
misrouted envelope arriving slightly later would be observed before the
assertion.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ff09dd81-6d16-49b9-9ee6-d43c4ead220c

📥 Commits

Reviewing files that changed from the base of the PR and between 3573d44 and 2f0c9e0.

📒 Files selected for processing (3)
  • Sources/CMUXClient/CMUXClient.swift
  • Sources/CMUXClient/EventStream.swift
  • Tests/CMUXClientTests/EventStreamTests.swift

Comment thread Sources/CMUXClient/CMUXClient.swift
send() previously used writeAndFlush(promise: nil), dropping any write
failure. With no continuation to fail, a failed subscribe on an open
channel left the supervisor blocked in awaitClosed() with no events.
Now close the channel on write failure so closeFuture fires and the
supervisor re-attaches; log the synchronous pre-check path instead of
swallowing it with try?.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant