fix: support cmux 0.64.12 cmux-events protocol (events were dropped + stream flapped)#2
Conversation
cmux 0.64.12 emits events via the cmux-events protocol: frames carry their own id (a per-boot sequence) and a protocol tag, with no type discriminator. deliver() decoded these as RPCResponses, found no matching pending request, and silently dropped every event. Route cmux-events frames to the push handler before the RPC-response check, and make the events.stream subscribe fire-and-forget (cmux acks with a subscription envelope that has no matching RPC id, so call() always paid the full request timeout before attaching). Note: cmux 0.64.12 also gates the control socket to in-tree processes (automation.socketControlMode=cmuxOnly), so the relay must run inside a cmux pane rather than via launchd. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (2)
📝 WalkthroughWalkthroughThis PR adds a fire-and-forget Changescmux-events Frame Support
Sequence DiagramsequenceDiagram
participant Client
participant EventStream
participant CMUXClient
participant pushHandler
Client->>EventStream: start(categories:)
EventStream->>CMUXClient: send(method: "events.stream", params:)
CMUXClient->>CMUXClient: JSON encode, write, flush
Note over CMUXClient: No pending continuation registered
CMUXClient->>CMUXClient: Receive cmux-events frame
CMUXClient->>CMUXClient: Decode CmuxEventsFrame
CMUXClient->>CMUXClient: Check protocol == "cmux-events"
CMUXClient->>CMUXClient: Map to EventFrame
CMUXClient->>pushHandler: dispatch(EventFrame)
pushHandler->>EventStream: Forward to sink
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
Sources/CMUXClient/EventStream.swift (1)
15-16:⚠️ Potential issue | 🟠 Major | ⚡ Quick winDon’t mark stream started when subscribe fails.
startedis set before subscribe andtry?drops errors, so a failed subscribe can permanently lock this instance in a non-attached state.Suggested change
public func start(categories: [EventCategory]) async { guard !started else { return } - started = true let sink = self.sink await client.onEventStream { frame in if case .event(let ev) = frame { sink(ev) } } let cats: JSONValue = .array(categories.map { .string($0.rawValue) }) - try? await client.send(method: "events.stream", params: .object(["categories": cats])) + do { + try await client.send(method: "events.stream", params: .object(["categories": cats])) + started = true + } catch { + started = false + } }Also applies to: 28-28
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@Sources/CMUXClient/EventStream.swift` around lines 15 - 16, The code sets EventStream.started before calling subscribe and uses try? which swallows errors, so a failed subscribe can leave the instance marked started while not attached; change the flow in the EventStream start routine (and the other occurrence at the second start site) to only set started = true after subscribe completes successfully (or set started inside the success branch), remove the try? around subscribe so errors are surfaced or catch and handle them, and if you must keep an optimistic started assignment ensure you revert started = false when subscribe fails.
🧹 Nitpick comments (1)
Tests/CMUXClientTests/EventStreamTests.swift (1)
81-88: ⚡ Quick winPolling loop weakly verifies the "envelope is ignored" guarantee.
The loop exits as soon as
sink.count() > 0, i.e. on the first delivered frame. Since the test's purpose is to confirm the subscription envelope is not delivered as an event, a regression that wrongly routed the envelope could be masked by timing: if the envelope is misrouted and arrives as a second event after the real one, thecount == 1read may race against the second append. The envelope is sent first over the ordered connection, sofirst?.category/first?.nameoffers partial protection, but thecount == 1assertion isn't a reliable guard.Consider settling for a brief fixed window after the first event lands so a wrongly-delivered envelope frame has time to arrive before asserting the count.
♻️ Suggested settle step before asserting count
for _ in 0..<20 where await sink.count() == 0 { try await Task.sleep(nanoseconds: 30_000_000) } + // Give a wrongly-routed envelope frame time to arrive so `count == 1` + // actually proves the envelope was ignored. + try await Task.sleep(nanoseconds: 90_000_000) let count = await sink.count() XCTAssertEqual(count, 1, "envelope should be ignored; exactly one event delivered")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@Tests/CMUXClientTests/EventStreamTests.swift` around lines 81 - 88, The polling loop that exits as soon as sink.count() > 0 is racy for the "envelope is ignored" guarantee; change the test around the loop that uses sink.count(), Task.sleep(...) and sink.first() so that once the first event is observed you wait a short fixed settle window (e.g. another Task.sleep for ~50_000_000 ns) before re-checking await sink.count() and asserting XCTAssertEqual(count, 1) and validating first?.category/name; this ensures a misrouted envelope arriving slightly later would be observed before the assertion.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@Sources/CMUXClient/CMUXClient.swift`:
- Around line 82-97: CMUXClient.send currently drops async write failures by
calling channel.writeAndFlush(..., promise: nil); change it to surface write
errors (either by returning/awaiting the EventLoopFuture from writeAndFlush or
making send async and awaiting the write) and attach a failure handler similar
to call (use the promise/future's whenFailure to set terminalError or rethrow an
appropriate CMUXClientError), so write failures between isActive and flush
aren't ignored. Update callers (notably EventStream.start where you do try?
await client.send(...)) to stop swallowing errors—propagate or handle the
failure so CmuxRelay's reconnect logic can run when the subscribe write fails.
Ensure you still check channel.isActive and keep existing identifiers
(CMUXClient.send, terminalError, CMUXClientError.channelClosed,
EventStream.start, CmuxRelay) so the change is localized.
---
Outside diff comments:
In `@Sources/CMUXClient/EventStream.swift`:
- Around line 15-16: The code sets EventStream.started before calling subscribe
and uses try? which swallows errors, so a failed subscribe can leave the
instance marked started while not attached; change the flow in the EventStream
start routine (and the other occurrence at the second start site) to only set
started = true after subscribe completes successfully (or set started inside the
success branch), remove the try? around subscribe so errors are surfaced or
catch and handle them, and if you must keep an optimistic started assignment
ensure you revert started = false when subscribe fails.
---
Nitpick comments:
In `@Tests/CMUXClientTests/EventStreamTests.swift`:
- Around line 81-88: The polling loop that exits as soon as sink.count() > 0 is
racy for the "envelope is ignored" guarantee; change the test around the loop
that uses sink.count(), Task.sleep(...) and sink.first() so that once the first
event is observed you wait a short fixed settle window (e.g. another Task.sleep
for ~50_000_000 ns) before re-checking await sink.count() and asserting
XCTAssertEqual(count, 1) and validating first?.category/name; this ensures a
misrouted envelope arriving slightly later would be observed before the
assertion.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: ff09dd81-6d16-49b9-9ee6-d43c4ead220c
📒 Files selected for processing (3)
Sources/CMUXClient/CMUXClient.swiftSources/CMUXClient/EventStream.swiftTests/CMUXClientTests/EventStreamTests.swift
send() previously used writeAndFlush(promise: nil), dropping any write failure. With no continuation to fail, a failed subscribe on an open channel left the supervisor blocked in awaitClosed() with no events. Now close the channel on write failure so closeFuture fires and the supervisor re-attaches; log the synchronous pre-check path instead of swallowing it with try?. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Problem
Against current cmux (tested on 0.64.12), the relay drops all events from cmux, the
events.streamconnection flaps attach/detach in a tight loop, and every attach pays a ~5s delay. The dispatch path (workspace/surface RPCs) is unaffected.Root cause
cmux 0.64.12 emits events via the
cmux-eventsprotocol: each frame carries its ownid(a per-boot sequence) and a"protocol": "cmux-events"tag, with notypediscriminator. InCMUXClient.deliver(), the "try RPCResponse first" branch decodes these (because of theid), finds no matching pending request, and silently drops them — so they never reach thePushFrame/event path.Separately,
EventStream.start()subscribes viacall(), but cmux acks the subscribe with a subscription envelope that has no matching RPCid, so the call always waits the full request timeout before the stream is treated as attached.Fix
cmux-eventsframes indeliver()(by theprotocoltag) and route them to the push handler before the RPC-response check. Frames withcategory+namebecome events; the subscription envelope/heartbeats are ignored.CMUXClient.send()(fire-and-forget) and use it for theevents.streamsubscribe, removing the per-attach timeout.type:eventpush-frame path is left intact for backwards compatibility.Tests
Adds
testForwardsCmuxEventsProtocolcovering the new wire shape. Note: I built with the Swift 6.3 command-line toolchain (no Xcode), which can't compile theswift-testingtest targets, so I verified the fix by live integration against cmux 0.64.12 — a realnotificationevent decodes through the new path, attach is immediate, and the stream stays stable.Heads-up (not addressed here)
cmux 0.64.12 also gates its control socket to in-tree processes (
automation.socketControlMode = "cmuxOnly"): a launchd-spawned relay (as inscripts/install-launchd.sh) is denied withERROR: Access denied — only processes started inside cmux can connect. Running the relay inside a cmux pane works. Might be worth a docs note or a socket-password path in the installer — happy to follow up if useful.Summary by CodeRabbit
New Features
Bug Fixes
Tests