Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .claude/rules/broadcasting.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ Register via `Echo.addInterceptor()` or `driver.addInterceptor()` in a ServicePr

- Implements Pusher-compatible WebSocket protocol (Laravel Reverb, Soketi, etc.)
- Constructor DI: `channelFactory` overrides WebSocket creation, `authFactory` overrides HTTP auth call — both for testing
- Auto-reconnection: exponential backoff `min(500ms × 2^attempt, max_reconnect_delay)` — set `reconnect: false` to disable
- Auto-reconnection: exponential backoff with 30% random jitter — `base = 500ms × 2^attempt` (capped at `max_reconnect_delay`), then `delay = base + random(0..base×0.3)`. Jitter prevents thundering herd on server restart. Set `reconnect: false` to disable
- Activity monitor: client-side inactivity detection using Pusher protocol `activity_timeout` (from server handshake). After `activity_timeout` seconds of silence → sends `pusher:ping`. If no `pusher:pong` within 30s (`pongTimeout`) → closes socket, triggers reconnect. Timer resets on ANY inbound message
- Connection timeout: configurable via `connection_timeout` (default 15s). If server doesn't complete Pusher handshake within timeout → closes socket, schedules reconnect, throws `TimeoutException`
- Constructor DI: `pongTimeout` (Duration, default 30s) and `random` (Random) — both for testing determinism
- Reconnect resubscription: all channels re-subscribed with `await` after reconnect. Private/presence re-authenticate. `onReconnect` emits only after all resubscriptions complete
- Auth error handling: failures logged via `Log.error()` with channel name, routed through interceptor `onError()` chain. Per-channel try/catch — one failure doesn't block others
- Pusher error codes: 4000–4099 = fatal (no reconnect), 4100–4199 = immediate, 4200–4299 = backoff
Expand Down Expand Up @@ -95,6 +98,7 @@ final broadcastingConfig = {
'auth_endpoint': '/broadcasting/auth',
'reconnect': true,
'max_reconnect_delay': 30000,
'activity_timeout': 120,
'connection_timeout': 15,
'dedup_buffer_size': 100,
},
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

## [1.0.0-alpha.12] - 2026-04-09

### ✨ New Features
- **Broadcasting**: Client-side activity monitor — detects silent connection loss using Pusher protocol `activity_timeout` and `pusher:ping`/`pusher:pong`. Automatically reconnects when the server stops responding
- **Broadcasting**: Random jitter (up to 30%) on reconnection backoff delay — prevents thundering herd when many clients reconnect simultaneously after a server restart
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ final user = await User.find(1);
| 💾 | **Caching** | Memory and file drivers with TTL and `remember()` |
| 🌍 | **Localization** | JSON-based i18n with `:attribute` placeholders |
| 🎨 | **Wind UI** | Built-in Tailwind CSS-like styling with `className` syntax |
| 📡 | **Broadcasting** | Laravel Echo equivalent — real-time WebSocket channels via `Echo` facade, `ReverbBroadcastDriver`, presence channels, and `Echo.fake()` for testing |
| 📡 | **Broadcasting** | Laravel Echo equivalent — real-time WebSocket channels via `Echo` facade, `ReverbBroadcastDriver` with activity monitoring, reconnection jitter, connection timeout, presence channels, and `Echo.fake()` for testing |
| 🧪 | **Testing** | Laravel-style `Http.fake()`, `Auth.fake()`, `Cache.fake()`, `Vault.fake()`, `Log.fake()`, `Echo.fake()` — no mockito needed |
| 🧰 | **Magic CLI** | Artisan-style code generation: `magic make:model`, `magic make:controller` |

Expand Down
12 changes: 7 additions & 5 deletions lib/src/broadcasting/drivers/reverb_broadcast_driver.dart
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,11 @@ class ReverbBroadcastDriver implements BroadcastDriver {
bool _isConnected = false;
String? _socketId;

/// The activity timeout in seconds reported by the server.
/// The activity timeout in seconds.
///
/// Parsed from the `pusher:connection_established` frame. Used to determine
/// how frequently the server expects keepalive traffic.
int activityTimeout = 30;
/// Parsed from the `pusher:connection_established` frame if provided by the
/// server; otherwise falls back to the `activity_timeout` config key.
late int activityTimeout = _config['activity_timeout'] as int? ?? 120;
Completer<void>? _connectionCompleter;

/// Broadcast controller that re-exposes the single-subscription
Expand Down Expand Up @@ -448,7 +448,9 @@ class ReverbBroadcastDriver implements BroadcastDriver {
void _handleConnectionEstablished(Map<String, dynamic> json) {
final data = jsonDecode(json['data'] as String) as Map<String, dynamic>;
_socketId = data['socket_id'] as String;
activityTimeout = data['activity_timeout'] as int? ?? 30;
activityTimeout =
data['activity_timeout'] as int? ??
(_config['activity_timeout'] as int? ?? 120);
_isConnected = true;
_attempt = 0;

Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: magic
description: "A Laravel-inspired Flutter framework with Eloquent ORM, routing, and MVC architecture."
version: 1.0.0-alpha.11
version: 1.0.0-alpha.12
homepage: https://magic.fluttersdk.com
repository: https://github.com/fluttersdk/magic
issue_tracker: https://github.com/fluttersdk/magic/issues
Expand Down
2 changes: 1 addition & 1 deletion skills/magic-framework/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Use `configFactories` (not `configs`) when any value depends on `Env.get()`. The
| `Schema` | Migrations | `create()`, `drop()`, `hasTable()` |
| `Log` | Logging | `info()`, `error()`, `warning()`, `debug()` |
| `Event` | Events | `dispatch(event)` |
| `Echo` | Broadcasting | `channel()`, `private()`, `join()`, `listen()`, `leave()`, `connect()`, `disconnect()`, `socketId`, `fake()` |
| `Echo` | Broadcasting | `channel()`, `private()`, `join()`, `listen()`, `leave()`, `connect()`, `disconnect()`, `socketId`, `connectionState`, `onReconnect`, `fake()` |
| `MagicRoute` | Routing | `page()`, `group()`, `layout()`, `to()`, `back({fallback?})`, `replace()`, `push()`, `toNamed()` |
| `Gate` | Authorization | `allows()`, `denies()`, `define()`, `policy()` |
| `Lang` | Localization | `get()`, `locale()` |
Expand Down
12 changes: 11 additions & 1 deletion skills/magic-framework/references/secondary-systems.md
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ Register interceptors via `Echo.addInterceptor(interceptor)` or `driver.addInter

### ReverbBroadcastDriver (Pusher Protocol)

Handles the full Pusher protocol over WebSocket: connection handshake, ping/pong keepalive, public/private/presence subscriptions, event deduplication via ring buffer, and automatic reconnection with exponential backoff.
Handles the full Pusher protocol over WebSocket: connection handshake, ping/pong keepalive, public/private/presence subscriptions, event deduplication via ring buffer, automatic reconnection with exponential backoff + 30% random jitter, client-side activity monitoring, and configurable connection establishment timeout.

Config keys under `broadcasting.connections.reverb`:

Expand All @@ -894,11 +894,21 @@ Config keys under `broadcasting.connections.reverb`:
| `auth_endpoint` | `'/broadcasting/auth'` | HTTP endpoint for private/presence auth |
| `reconnect` | `true` | Auto-reconnect on disconnect |
| `max_reconnect_delay` | `30000` | Max backoff delay in ms |
| `activity_timeout` | `120` | Seconds of inactivity before ping is sent |
| `connection_timeout` | `15` | Seconds to wait for connection establishment |
| `dedup_buffer_size` | `100` | Ring buffer size for deduplication |

Constructor DI parameters for testing:
- `channelFactory` — overrides WebSocket creation (inject mock channels)
- `authFactory` — overrides the HTTP auth call for private/presence channels (inject mock auth responses)
- `pongTimeout` — override the 30-second pong deadline (use short durations in tests)
- `random` — inject a seeded `Random` for deterministic backoff jitter in tests

**Connection health**: Activity monitor detects silent connection loss using the Pusher protocol `activity_timeout` (provided by server in handshake; falls back to `activity_timeout` config key, default 120s). After `activity_timeout` seconds of inactivity → sends `pusher:ping`. If no `pusher:pong` within `pongTimeout` (30s default) → closes socket, triggers reconnect. Timer resets on ANY inbound message.

**Reconnection backoff**: Exponential backoff with 30% random jitter — `base = 500ms × 2^attempt` (capped at `max_reconnect_delay`), `delay = base + random(0..base×0.3)`. Jitter prevents thundering herd when many clients reconnect simultaneously.

**Connection timeout**: Configurable via `connection_timeout` (default 15s). If the server doesn't complete the Pusher handshake within this window → closes socket, schedules reconnect, throws `TimeoutException`.

Auth failures in `_authenticateAndSubscribe()` are logged via `Log.error()` and routed through the interceptor `onError()` chain. On reconnect, all channels are re-subscribed with `await` — `onReconnect` emits only after completion. Per-channel error handling ensures partial failures don't block other channels.

Expand Down
34 changes: 30 additions & 4 deletions test/broadcasting/drivers/reverb_broadcast_driver_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,16 @@ void _simulateConnectionEstablished(
_MockWebSocketChannel mock, {
String socketId = 'test-socket-id',
int activityTimeout = 30,
bool includeActivityTimeout = true,
}) {
Future<void>.delayed(Duration.zero, () {
final data = <String, dynamic>{'socket_id': socketId};
if (includeActivityTimeout) {
data['activity_timeout'] = activityTimeout;
}
mock.simulateMessage({
'event': 'pusher:connection_established',
'data': jsonEncode({
'socket_id': socketId,
'activity_timeout': activityTimeout,
}),
'data': jsonEncode(data),
});
});
}
Expand Down Expand Up @@ -1713,6 +1715,30 @@ void main() {
await driver.disconnect();
});

test(
'falls back to config activity_timeout when server omits it',
() async {
final mock = _MockWebSocketChannel();
final driver = ReverbBroadcastDriver(
_defaultConfig(overrides: {'activity_timeout': 1}),
channelFactory: (_) => mock,
pongTimeout: const Duration(seconds: 1),
);

// Handshake WITHOUT activity_timeout — driver should use config value.
_simulateConnectionEstablished(mock, includeActivityTimeout: false);
await driver.connect();

expect(
driver.activityTimeout,
equals(1),
reason: 'Should fall back to config activity_timeout',
);

await driver.disconnect();
},
);

test('closes socket when pong not received within timeout', () async {
final mock = _MockWebSocketChannel();
final driver = ReverbBroadcastDriver(
Expand Down