diff --git a/README.md b/README.md index 4863735f..3778aa16 100644 --- a/README.md +++ b/README.md @@ -85,7 +85,7 @@ WiretapKMP can inspect **Server-Sent Events (SSE)** streams — log every connec ### Ktor -Install the SSE plugin and wrap your session: +Install the SSE plugin — sessions are wrapped automatically: ```kotlin @OptIn(ExperimentalWiretapSseApi::class) @@ -96,8 +96,7 @@ val client = HttpClient { } client.sse("https://api.example.com/stream") { - val session = this.wiretapped() - session.incoming.collect { event -> + incoming.collect { event -> println("Event: ${event.event} — ${event.data}") } } diff --git a/composeApp/src/commonMain/kotlin/dev/skymansandy/wiretapsample/di/SampleAppModule.kt b/composeApp/src/commonMain/kotlin/dev/skymansandy/wiretapsample/di/SampleAppModule.kt index bd3791b1..aff6c49e 100644 --- a/composeApp/src/commonMain/kotlin/dev/skymansandy/wiretapsample/di/SampleAppModule.kt +++ b/composeApp/src/commonMain/kotlin/dev/skymansandy/wiretapsample/di/SampleAppModule.kt @@ -23,7 +23,6 @@ import io.ktor.serialization.ContentConverter import kotlinx.serialization.json.Json import org.koin.core.module.dsl.viewModelOf import org.koin.dsl.module -import kotlin.time.Duration.Companion.seconds @OptIn(ExperimentalWiretapSseApi::class) val sampleAppModule = module { @@ -43,9 +42,7 @@ val sampleAppModule = module { single { HttpClient { install(HttpTimeout) - install(WebSockets) { - pingIntervalMillis = 5.seconds.inWholeMilliseconds - } + install(WebSockets) install(SSE) install(WiretapKtorWebSocketPlugin) diff --git a/composeApp/src/commonMain/kotlin/dev/skymansandy/wiretapsample/viewmodel/KtorSseViewModel.kt b/composeApp/src/commonMain/kotlin/dev/skymansandy/wiretapsample/viewmodel/KtorSseViewModel.kt index 8ff32e6c..ea16707c 100644 --- a/composeApp/src/commonMain/kotlin/dev/skymansandy/wiretapsample/viewmodel/KtorSseViewModel.kt +++ b/composeApp/src/commonMain/kotlin/dev/skymansandy/wiretapsample/viewmodel/KtorSseViewModel.kt @@ -4,8 +4,6 @@ import androidx.compose.runtime.mutableStateListOf import androidx.compose.runtime.snapshots.SnapshotStateList import androidx.lifecycle.ViewModel import androidx.lifecycle.viewModelScope -import dev.skymansandy.wiretap.helper.markers.ExperimentalWiretapSseApi -import dev.skymansandy.wiretap.plugin.sse.wiretapped import dev.skymansandy.wiretapsample.model.SampleMessage import dev.skymansandy.wiretapsample.model.SampleMessage.MessageType import dev.skymansandy.wiretapsample.model.SseSampleActions @@ -23,7 +21,6 @@ import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.launch -@OptIn(ExperimentalWiretapSseApi::class) class KtorSseViewModel( private val client: HttpClient, ) : ViewModel(), SseSampleActions { @@ -71,13 +68,12 @@ class KtorSseViewModel( headers.remove("Accept") headers.append("Accept", "text/event-stream") }) { - val session = this.wiretapped() _isConnected.value = true _isConnecting.value = false eventLog.add(SampleMessage(MessageType.System, "Connected!")) try { - session.incoming.collect { event -> + incoming.collect { event -> val text = buildString { event.event?.let { append("[$it] ") } append(event.data ?: "") diff --git a/composeApp/src/commonMain/kotlin/dev/skymansandy/wiretapsample/viewmodel/KtorWebSocketViewModel.kt b/composeApp/src/commonMain/kotlin/dev/skymansandy/wiretapsample/viewmodel/KtorWebSocketViewModel.kt index b29ecef8..70261130 100644 --- a/composeApp/src/commonMain/kotlin/dev/skymansandy/wiretapsample/viewmodel/KtorWebSocketViewModel.kt +++ b/composeApp/src/commonMain/kotlin/dev/skymansandy/wiretapsample/viewmodel/KtorWebSocketViewModel.kt @@ -4,13 +4,12 @@ import androidx.compose.runtime.mutableStateListOf import androidx.compose.runtime.snapshots.SnapshotStateList import androidx.lifecycle.ViewModel import androidx.lifecycle.viewModelScope -import dev.skymansandy.wiretap.plugin.ws.WiretapWebSocketSession -import dev.skymansandy.wiretap.plugin.ws.wiretapped import dev.skymansandy.wiretapsample.model.SampleMessage import dev.skymansandy.wiretapsample.model.SampleMessage.MessageType import dev.skymansandy.wiretapsample.model.WsSampleActions import dev.skymansandy.wiretapsample.model.wsServers import io.ktor.client.HttpClient +import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession import io.ktor.client.plugins.websocket.webSocket import io.ktor.websocket.CloseReason import io.ktor.websocket.Frame @@ -43,7 +42,7 @@ class KtorWebSocketViewModel( override val messageLog: SnapshotStateList = mutableStateListOf() private var wsUrl = wsServers[0].first - private var session: WiretapWebSocketSession? = null // nullable because it's only set during active connection + private var session: DefaultClientWebSocketSession? = null private var connectionJob: Job? = null private val exceptionHandler = CoroutineExceptionHandler { _, _ -> } @@ -69,14 +68,13 @@ class KtorWebSocketViewModel( connectionJob = viewModelScope.launch(Dispatchers.IO + exceptionHandler) { try { client.webSocket(wsUrl) { - val wrapped = this.wiretapped() - session = wrapped + session = this _isConnected.value = true _isConnecting.value = false messageLog.add(SampleMessage(MessageType.System, "Connected!")) try { - for (frame in wrapped.incoming) { + for (frame in incoming) { if (frame is Frame.Text) { val text = frame.readText() messageLog.add(SampleMessage(MessageType.Received, text)) diff --git a/docs/index.md b/docs/index.md index f3ae02d7..01e9c30f 100644 --- a/docs/index.md +++ b/docs/index.md @@ -105,7 +105,7 @@ Full WebSocket lifecycle tracking: - Connection open/close/failure with status transitions - Close codes and reasons - Every sent and received message (text and binary) with timestamps and byte counts -- Ktor: wrap session with `wiretapped()` for automatic message interception +- Ktor: automatically intercepted by `WiretapKtorWebSocketPlugin` - OkHttp: wrap listener with `WiretapOkHttpWebSocketListener` for automatic event capture ## SSE (Server-Sent Events) Logging @@ -114,7 +114,7 @@ Full SSE connection lifecycle tracking: - Connection open/close/failure with status transitions - Every incoming event with event type, data payload, event ID, retry interval, and byte count -- Ktor: wrap SSE session with `wiretapped()` for automatic event interception +- Ktor: automatically intercepted by `WiretapKtorSsePlugin` - OkHttp: wrap listener with `WiretapOkHttpEventSourceListener` for automatic event capture ## API Mocking diff --git a/docs/ktor/api.md b/docs/ktor/api.md index 8805ca20..383e7523 100644 --- a/docs/ktor/api.md +++ b/docs/ktor/api.md @@ -39,40 +39,7 @@ HttpClient { val WiretapKtorWebSocketPlugin: ClientPlugin ``` -Intercepts WebSocket upgrades (101 responses) to log connections. - ---- - -## wiretapped() - -```kotlin -suspend fun DefaultClientWebSocketSession.wiretapped(): WiretapWebSocketSession? -``` - -Extension to wrap a Ktor WebSocket session for message logging. Returns `null` if `WiretapKtorWebSocketPlugin` is not installed. - ---- - -## WiretapWebSocketSession - -```kotlin -class WiretapWebSocketSession( - val delegate: DefaultClientWebSocketSession, -) -``` - -### Properties - -| Property | Type | Description | -|----------|------|-------------| -| `incoming` | `ReceiveChannel` | Incoming frames with automatic logging (all frame types) | - -### Methods - -| Method | Description | -|--------|-------------| -| `suspend fun send(frame: Frame)` | Logs the frame and sends via delegate | -| `suspend fun close(code: Short, reason: String?)` | Logs status as Closed and closes the delegate | +Intercepts WebSocket upgrades (101 responses) and automatically wraps sessions to log all sent and received frames. No extra calls needed — just use the session directly. --- @@ -96,35 +63,7 @@ class WiretapHttpConfig { val WiretapKtorSsePlugin: ClientPlugin ``` -Placeholder plugin for SSE inspection. SSE connection tracking is handled by the `wiretapped()` extension on `ClientSSESession`. - ---- - -## wiretapped() (SSE) - -```kotlin -suspend fun ClientSSESession.wiretapped(): WiretapSseSession -``` - -Extension to wrap a Ktor SSE session for event logging. Creates a connection entry in Wiretap and returns a `WiretapSseSession` that intercepts incoming events. - ---- - -## WiretapSseSession - -```kotlin -interface WiretapSseSession { - val call: HttpClientCall - val incoming: Flow -} -``` - -### Properties - -| Property | Type | Description | -|----------|------|-------------| -| `call` | `HttpClientCall` | The underlying HTTP call for this SSE connection | -| `incoming` | `Flow` | Incoming events with automatic logging | +SSE plugin that automatically wraps SSE sessions to log all incoming events. No extra calls needed — just use the session directly. --- diff --git a/docs/ktor/migration.md b/docs/ktor/migration.md index 103ed09f..7a0e8951 100644 --- a/docs/ktor/migration.md +++ b/docs/ktor/migration.md @@ -35,7 +35,7 @@ The `install` DSL is unchanged — only the config type name changed: ### SSE — new experimental API -SSE logging is new in RC10. Install the plugin and use `wiretapped()` on your SSE session: +SSE logging is new in RC10. Install the plugin and sessions are wrapped automatically: ```kotlin val client = HttpClient { @@ -46,9 +46,32 @@ val client = HttpClient { @OptIn(ExperimentalWiretapSseApi::class) client.sse("https://example.com/events") { - val session = this.wiretapped() - session.incoming.collect { event -> + incoming.collect { event -> println("Event: ${event.data}") } } ``` + +## RC11 → RC12 + +### `wiretapped()` removed for WebSocket and SSE + +`WiretapKtorWebSocketPlugin` and `WiretapKtorSsePlugin` now wrap sessions automatically. Remove all `wiretapped()` calls: + +```diff + client.webSocket("wss://example.com/ws") { +- val session = this.wiretapped() +- session?.send(Frame.Text("Hello!")) +- for (frame in (session?.incoming ?: incoming)) { ... } ++ send(Frame.Text("Hello!")) ++ for (frame in incoming) { ... } + } + + client.sse("https://example.com/events") { +- val session = this.wiretapped() +- session.incoming.collect { event -> ... } ++ incoming.collect { event -> ... } + } +``` + +The `wiretapped()` extension is now deprecated with `ERROR` level and simply returns `this`. diff --git a/docs/ktor/setup.md b/docs/ktor/setup.md index c66b13a7..710db9f1 100644 --- a/docs/ktor/setup.md +++ b/docs/ktor/setup.md @@ -68,12 +68,11 @@ val client = HttpClient { } ``` -Then wrap your SSE session with `wiretapped()`: +Sessions are wrapped automatically — just use the session directly: ```kotlin client.sse("https://example.com/events") { - val session = this.wiretapped() - session.incoming.collect { event -> + incoming.collect { event -> println("Event: ${event.event} — ${event.data}") } } diff --git a/docs/ktor/sse.md b/docs/ktor/sse.md index 77c2ba44..60ef79fc 100644 --- a/docs/ktor/sse.md +++ b/docs/ktor/sse.md @@ -16,23 +16,19 @@ val client = HttpClient { } ``` -## Session Wrapping +## Automatic Session Wrapping -Wrap your SSE session with `wiretapped()` to log incoming events. This creates a connection entry in Wiretap and returns a logging wrapper that intercepts all incoming events: +`WiretapKtorSsePlugin` wraps SSE sessions automatically — no extra calls needed. Just use the session directly and all incoming events are logged: ```kotlin @OptIn(ExperimentalWiretapSseApi::class) client.sse("https://example.com/events") { - val session = this.wiretapped() - - session.incoming.collect { event -> + incoming.collect { event -> println("Event: ${event.event} — ${event.data}") } } ``` -The `wiretapped()` extension is available on `ClientSSESession` — the standard Ktor SSE session type. - ## WiretapSseSession API | Property | Type | Description | @@ -42,8 +38,8 @@ The `wiretapped()` extension is available on `ClientSSESession` — the standard ## How It Works -1. **`wiretapped()`** creates a connection entry via `SseLogManager` with status `Open` -2. Returns a `LoggingSseSession` that wraps the Ktor `ClientSSESession` +1. **`WiretapKtorSsePlugin`** automatically wraps SSE sessions and creates a connection entry via `SseLogManager` with status `Open` +2. The wrapped `LoggingSseSession` intercepts the Ktor `ClientSSESession` 3. Every incoming event is logged as it flows through the `incoming` flow 4. When the flow completes (server close, cancellation, or error), the connection status is updated to `Closed` or `Failed` @@ -78,9 +74,7 @@ val client = HttpClient { @OptIn(ExperimentalWiretapSseApi::class) client.sse("https://api.example.com/stream") { - val session = this.wiretapped() - - session.incoming.collect { event -> + incoming.collect { event -> when (event.event) { "message" -> handleMessage(event.data) "heartbeat" -> { /* ignore */ } diff --git a/docs/ktor/websockets.md b/docs/ktor/websockets.md index 95bd9664..6fe7f06c 100644 --- a/docs/ktor/websockets.md +++ b/docs/ktor/websockets.md @@ -20,19 +20,17 @@ val client = HttpClient { } ``` -## Session Wrapping +## Automatic Session Wrapping -Wrap your WebSocket session with `wiretapped()` to log outgoing and incoming messages. Returns `null` if `WiretapKtorWebSocketPlugin` is not installed: +`WiretapKtorWebSocketPlugin` wraps WebSocket sessions automatically — no extra calls needed. All sent and received frames are logged: ```kotlin client.webSocket("wss://echo.websocket.org") { - val session = this.wiretapped() // null if plugin not installed - - // Send — automatically logged when session is available - session?.send(Frame.Text("Hello, server!")) + // Send — automatically logged + send(Frame.Text("Hello, server!")) // Receive — automatically logged as frames are consumed - for (frame in (session?.incoming ?: incoming)) { + for (frame in incoming) { when (frame) { is Frame.Text -> println("Received: ${frame.readText()}") is Frame.Binary -> println("Received ${frame.readBytes().size} bytes") @@ -59,7 +57,7 @@ client.webSocket("wss://echo.websocket.org") { 1. **`WiretapKtorWebSocketPlugin`** hooks into `onResponse` for 101 Switching Protocols responses 2. Creates a `SocketEntry` via the orchestrator with status `Open` 3. Stores the socket ID on request attributes -4. **`wiretapped()`** creates a `WiretapWebSocketSession` that intercepts `send()` and auto-logs `incoming` frames +4. The plugin automatically wraps the session to intercept `send()` and auto-log `incoming` frames 5. Connection close/failure is detected automatically via job completion ## What Gets Logged diff --git a/wiretap-ktor-noop/api/jvm/wiretap-ktor-noop.api b/wiretap-ktor-noop/api/jvm/wiretap-ktor-noop.api index 89e5986d..ea9bcc74 100644 --- a/wiretap-ktor-noop/api/jvm/wiretap-ktor-noop.api +++ b/wiretap-ktor-noop/api/jvm/wiretap-ktor-noop.api @@ -12,22 +12,14 @@ public abstract interface class dev/skymansandy/wiretap/plugin/sse/WiretapSseSes } public final class dev/skymansandy/wiretap/plugin/sse/WiretapWrapKt { - public static final fun wiretapped (Lio/ktor/client/plugins/sse/ClientSSESession;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun wiretapped (Lio/ktor/client/plugins/sse/ClientSSESession;)Lio/ktor/client/plugins/sse/ClientSSESession; } public final class dev/skymansandy/wiretap/plugin/ws/WiretapKtorWebSocketPluginKt { public static final fun getWiretapKtorWebSocketPlugin ()Lio/ktor/client/plugins/api/ClientPlugin; } -public abstract interface class dev/skymansandy/wiretap/plugin/ws/WiretapWebSocketSession : io/ktor/websocket/DefaultWebSocketSession { - public abstract fun getCall ()Lio/ktor/client/call/HttpClientCall; -} - -public final class dev/skymansandy/wiretap/plugin/ws/WiretapWebSocketSession$DefaultImpls { - public static fun send (Ldev/skymansandy/wiretap/plugin/ws/WiretapWebSocketSession;Lio/ktor/websocket/Frame;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; -} - public final class dev/skymansandy/wiretap/plugin/ws/WiretapWrapKt { - public static final fun wiretapped (Lio/ktor/client/plugins/websocket/DefaultClientWebSocketSession;)Ldev/skymansandy/wiretap/plugin/ws/WiretapWebSocketSession; + public static final fun wiretapped (Lio/ktor/client/plugins/websocket/DefaultClientWebSocketSession;)Lio/ktor/client/plugins/websocket/DefaultClientWebSocketSession; } diff --git a/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/DelegatingSseSession.kt b/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/DelegatingSseSession.kt index d56dfa65..be722cdd 100644 --- a/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/DelegatingSseSession.kt +++ b/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/DelegatingSseSession.kt @@ -10,10 +10,12 @@ import io.ktor.client.plugins.sse.ClientSSESession import io.ktor.sse.ServerSentEvent import kotlinx.coroutines.flow.Flow -/** - * No-op passthrough that just exposes the underlying session. - */ @OptIn(ExperimentalWiretapSseApi::class) +@Deprecated( + message = "WiretapKtorSsePlugin now wraps sessions automatically. Use ClientSSESession directly.", + level = DeprecationLevel.ERROR, +) +@Suppress("DEPRECATION_ERROR") internal class DelegatingSseSession( private val delegate: ClientSSESession, ) : WiretapSseSession { diff --git a/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapSseSession.kt b/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapSseSession.kt index ea8d1cab..82a60596 100644 --- a/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapSseSession.kt +++ b/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapSseSession.kt @@ -9,11 +9,11 @@ import io.ktor.client.call.HttpClientCall import io.ktor.sse.ServerSentEvent import kotlinx.coroutines.flow.Flow -/** - * Wraps an SSE session for Wiretap interception. - * In noop builds, delegates directly to the underlying session. - */ @ExperimentalWiretapSseApi +@Deprecated( + message = "WiretapKtorSsePlugin now wraps sessions automatically. Use ClientSSESession directly.", + level = DeprecationLevel.ERROR, +) interface WiretapSseSession { val call: HttpClientCall diff --git a/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapWrap.kt b/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapWrap.kt index bd3f0a8c..7c7cbfb5 100644 --- a/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapWrap.kt +++ b/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapWrap.kt @@ -7,8 +7,10 @@ package dev.skymansandy.wiretap.plugin.sse import dev.skymansandy.wiretap.helper.markers.ExperimentalWiretapSseApi import io.ktor.client.plugins.sse.ClientSSESession -/** - * No-op: returns a passthrough wrapper for API parity. - */ @ExperimentalWiretapSseApi -suspend fun ClientSSESession.wiretapped(): WiretapSseSession = DelegatingSseSession(this) +@Deprecated( + message = "WiretapKtorSsePlugin now wraps sessions automatically. Remove this call.", + replaceWith = ReplaceWith("this"), + level = DeprecationLevel.ERROR, +) +fun ClientSSESession.wiretapped(): ClientSSESession = this diff --git a/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/DelegatingWebSocketSession.kt b/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/DelegatingWebSocketSession.kt deleted file mode 100644 index 06b8e13f..00000000 --- a/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/DelegatingWebSocketSession.kt +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (c) 2026 skymansandy. All rights reserved. - */ - -package dev.skymansandy.wiretap.plugin.ws - -import io.ktor.client.call.HttpClientCall -import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession -import io.ktor.websocket.DefaultWebSocketSession - -/** - * Passthrough [WiretapWebSocketSession] that delegates directly - * to the underlying session without any logging. - */ -internal class DelegatingWebSocketSession( - delegate: DefaultClientWebSocketSession, -) : WiretapWebSocketSession, DefaultWebSocketSession by delegate { - - override val call: HttpClientCall = delegate.call -} diff --git a/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/WiretapWebSocketSession.kt b/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/WiretapWebSocketSession.kt deleted file mode 100644 index 537f372c..00000000 --- a/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/WiretapWebSocketSession.kt +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright (c) 2026 skymansandy. All rights reserved. - */ - -package dev.skymansandy.wiretap.plugin.ws - -import io.ktor.client.call.HttpClientCall -import io.ktor.websocket.DefaultWebSocketSession - -/** - * Wraps a WebSocket session for Wiretap interception. - * - * Exposes the full [DefaultWebSocketSession] surface (incoming, outgoing, - * send, flush, closeReason, etc.) plus [call], so it can be used as a - * drop-in replacement for `DefaultClientWebSocketSession`. - * - * In debug builds, logs all sent/received frames. - * In noop builds, delegates directly to the underlying session. - */ -interface WiretapWebSocketSession : DefaultWebSocketSession { - - val call: HttpClientCall -} diff --git a/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/WiretapWrap.kt b/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/WiretapWrap.kt index e74c513a..7c8c9fc5 100644 --- a/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/WiretapWrap.kt +++ b/wiretap-ktor-noop/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/WiretapWrap.kt @@ -6,11 +6,9 @@ package dev.skymansandy.wiretap.plugin.ws import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession -/** - * No-op — returns a passthrough [WiretapWebSocketSession] that delegates - * directly to the underlying session without any logging. - */ -fun DefaultClientWebSocketSession.wiretapped(): WiretapWebSocketSession { - val session = this - return DelegatingWebSocketSession(session) -} +@Deprecated( + message = "WiretapKtorWebSocketPlugin now wraps sessions automatically. Remove this call.", + replaceWith = ReplaceWith("this"), + level = DeprecationLevel.ERROR, +) +fun DefaultClientWebSocketSession.wiretapped(): DefaultClientWebSocketSession = this diff --git a/wiretap-ktor/README.md b/wiretap-ktor/README.md index b48973a4..82d369e9 100644 --- a/wiretap-ktor/README.md +++ b/wiretap-ktor/README.md @@ -102,7 +102,7 @@ Rules match on method, URL (exact/contains/regex), headers, and body. First matc `WiretapKtorWebSocketPlugin` intercepts WebSocket upgrade requests (status 101) and logs all sent/received frames. It works independently of `WiretapKtorHttpPlugin` — you can install either or both depending on your needs. 1. **On upgrade** — The plugin detects the 101 response and creates a `SocketConnection` entry with URL, request headers, status, and protocol. -2. **Session wrapping** — Call `wiretapped()` inside the `webSocket {}` block to get a `WiretapWebSocketSession` that logs all frames. +2. **Automatic session wrapping** — The plugin wraps WebSocket sessions automatically. All sent and received frames are logged without any extra calls. 3. **Auto-close detection** — The session monitors its coroutine Job and automatically updates status to Closed/Failed when the connection ends. ### 📦 Setup @@ -116,15 +116,15 @@ val client = HttpClient { ### 🔧 Usage +Sessions are wrapped automatically — just use the session directly: + ```kotlin client.webSocket("wss://example.com/ws") { - val session = this.wiretapped() // IMPORTANT: Returns a DelegatingWebSocketSession if plugin is not installed - // Send — automatically logged - session?.send(Frame.Text("Hello!")) + send(Frame.Text("Hello!")) // Receive — automatically logged as frames are consumed - for (frame in (session?.incoming ?: incoming)) { + for (frame in incoming) { if (frame is Frame.Text) { val text = frame.readText() // handle message @@ -161,9 +161,9 @@ Auto-close detection handles unexpected disconnections — no manual status mana ### ⚙️ How It Works -`WiretapKtorSsePlugin` is a placeholder plugin for API consistency. SSE connection tracking is handled by the `wiretapped()` extension on `ClientSSESession`, which creates a connection entry and returns a logging wrapper that intercepts all incoming events. +`WiretapKtorSsePlugin` wraps SSE sessions automatically. It creates a connection entry and intercepts all incoming events — no extra calls needed. -1. **Session wrapping** — Call `wiretapped()` inside the `sse {}` block to get a `WiretapSseSession` that logs all events. +1. **Automatic session wrapping** — The plugin wraps SSE sessions automatically, creating a `WiretapSseSession` that logs all events. 2. **Event logging** — Every incoming `ServerSentEvent` is logged with its event type, data, ID, retry interval, and byte count. 3. **Auto-close detection** — The session monitors flow completion and automatically updates status to Closed/Failed when the connection ends. @@ -178,11 +178,11 @@ val client = HttpClient { ### 🔧 Usage +Sessions are wrapped automatically — just use the session directly: + ```kotlin client.sse("https://example.com/events") { - val session = this.wiretapped() - - session.incoming.collect { event -> + incoming.collect { event -> println("Event: ${event.event} — ${event.data}") } } diff --git a/wiretap-ktor/api/jvm/wiretap-ktor.api b/wiretap-ktor/api/jvm/wiretap-ktor.api index 89e5986d..acd217e0 100644 --- a/wiretap-ktor/api/jvm/wiretap-ktor.api +++ b/wiretap-ktor/api/jvm/wiretap-ktor.api @@ -3,7 +3,11 @@ public final class dev/skymansandy/wiretap/plugin/http/WiretapKtorHttpPluginKt { } public final class dev/skymansandy/wiretap/plugin/sse/WiretapKtorSsePluginKt { - public static final fun getWiretapKtorSsePlugin ()Lio/ktor/client/plugins/api/ClientPlugin; + public static final fun getWiretapKtorSsePlugin ()Lio/ktor/client/plugins/HttpClientPlugin; +} + +public final class dev/skymansandy/wiretap/plugin/sse/WiretapSsePluginHandler { + public final fun getEnabled ()Z } public abstract interface class dev/skymansandy/wiretap/plugin/sse/WiretapSseSession { @@ -12,22 +16,18 @@ public abstract interface class dev/skymansandy/wiretap/plugin/sse/WiretapSseSes } public final class dev/skymansandy/wiretap/plugin/sse/WiretapWrapKt { - public static final fun wiretapped (Lio/ktor/client/plugins/sse/ClientSSESession;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun wiretapped (Lio/ktor/client/plugins/sse/ClientSSESession;)Lio/ktor/client/plugins/sse/ClientSSESession; } public final class dev/skymansandy/wiretap/plugin/ws/WiretapKtorWebSocketPluginKt { - public static final fun getWiretapKtorWebSocketPlugin ()Lio/ktor/client/plugins/api/ClientPlugin; + public static final fun getWiretapKtorWebSocketPlugin ()Lio/ktor/client/plugins/HttpClientPlugin; } -public abstract interface class dev/skymansandy/wiretap/plugin/ws/WiretapWebSocketSession : io/ktor/websocket/DefaultWebSocketSession { - public abstract fun getCall ()Lio/ktor/client/call/HttpClientCall; -} - -public final class dev/skymansandy/wiretap/plugin/ws/WiretapWebSocketSession$DefaultImpls { - public static fun send (Ldev/skymansandy/wiretap/plugin/ws/WiretapWebSocketSession;Lio/ktor/websocket/Frame;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +public final class dev/skymansandy/wiretap/plugin/ws/WiretapWrapKt { + public static final fun wiretapped (Lio/ktor/client/plugins/websocket/DefaultClientWebSocketSession;)Lio/ktor/client/plugins/websocket/DefaultClientWebSocketSession; } -public final class dev/skymansandy/wiretap/plugin/ws/WiretapWrapKt { - public static final fun wiretapped (Lio/ktor/client/plugins/websocket/DefaultClientWebSocketSession;)Ldev/skymansandy/wiretap/plugin/ws/WiretapWebSocketSession; +public final class dev/skymansandy/wiretap/plugin/ws/WiretapWsPluginHandler { + public final fun getEnabled ()Z } diff --git a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/DelegatingSseSession.kt b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/DelegatingSseSession.kt index d56dfa65..be722cdd 100644 --- a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/DelegatingSseSession.kt +++ b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/DelegatingSseSession.kt @@ -10,10 +10,12 @@ import io.ktor.client.plugins.sse.ClientSSESession import io.ktor.sse.ServerSentEvent import kotlinx.coroutines.flow.Flow -/** - * No-op passthrough that just exposes the underlying session. - */ @OptIn(ExperimentalWiretapSseApi::class) +@Deprecated( + message = "WiretapKtorSsePlugin now wraps sessions automatically. Use ClientSSESession directly.", + level = DeprecationLevel.ERROR, +) +@Suppress("DEPRECATION_ERROR") internal class DelegatingSseSession( private val delegate: ClientSSESession, ) : WiretapSseSession { diff --git a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/LoggingSseSession.kt b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/LoggingSseSession.kt index c0e043e5..0754973a 100644 --- a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/LoggingSseSession.kt +++ b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/LoggingSseSession.kt @@ -8,11 +8,10 @@ import dev.skymansandy.wiretap.domain.model.SseConnection import dev.skymansandy.wiretap.domain.model.SseEvent import dev.skymansandy.wiretap.domain.model.SseStatus import dev.skymansandy.wiretap.domain.orchestrator.SseLogManager -import dev.skymansandy.wiretap.helper.markers.ExperimentalWiretapSseApi import dev.skymansandy.wiretap.helper.util.currentTimeMillis -import io.ktor.client.call.HttpClientCall -import io.ktor.client.plugins.sse.ClientSSESession +import io.ktor.client.plugins.sse.SSESession import io.ktor.sse.ServerSentEvent +import io.ktor.utils.io.InternalAPI import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers @@ -22,21 +21,22 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch +import kotlin.coroutines.CoroutineContext /** - * [WiretapSseSession] implementation that logs all incoming SSE events. + * [SSESession] wrapper that logs all incoming SSE events via Wiretap. * - * Detects session completion via flow onCompletion (cancellation, server close, error) - * and updates the connection status accordingly. + * Intercepts the delegate's [incoming] flow with [onEach] / [onCompletion] + * operators so every event and session lifecycle change is recorded. */ -@OptIn(ExperimentalWiretapSseApi::class) internal class LoggingSseSession( - private val delegate: ClientSSESession, + private val delegate: SSESession, private val connectionId: Long, + private val url: String, private val sseLogManager: SseLogManager, -) : WiretapSseSession { +) : SSESession { - override val call: HttpClientCall = delegate.call + override val coroutineContext: CoroutineContext = delegate.coroutineContext private val logScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) @@ -44,8 +44,10 @@ internal class LoggingSseSession( .onEach { event -> logEvent(event) } .onCompletion { cause -> onSessionClosed(cause) } + @InternalAPI + override fun bodyBuffer(): ByteArray = delegate.bodyBuffer() + private fun onSessionClosed(cause: Throwable?) { - val url = delegate.call.request.url.toString() logScope.launch { if (cause != null && cause !is CancellationException) { sseLogManager.updateConnection( diff --git a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapKtorSsePlugin.kt b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapKtorSsePlugin.kt index 3fa509bd..63253f4d 100644 --- a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapKtorSsePlugin.kt +++ b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapKtorSsePlugin.kt @@ -4,18 +4,37 @@ package dev.skymansandy.wiretap.plugin.sse +import dev.skymansandy.wiretap.domain.model.SseConnection +import dev.skymansandy.wiretap.domain.model.SseStatus import dev.skymansandy.wiretap.domain.model.config.sse.WiretapSseConfig import dev.skymansandy.wiretap.helper.markers.ExperimentalWiretapSseApi -import io.ktor.client.plugins.api.createClientPlugin +import dev.skymansandy.wiretap.helper.util.currentTimeMillis +import dev.skymansandy.wiretap.plugin.sse.util.SsePluginDeps +import io.ktor.client.HttpClient +import io.ktor.client.plugins.HttpClientPlugin +import io.ktor.client.plugins.sse.SSESession +import io.ktor.client.request.HttpRequestPipeline +import io.ktor.client.statement.HttpResponseContainer +import io.ktor.client.statement.HttpResponsePipeline import io.ktor.util.AttributeKey internal val WiretapSseEnabledKey = AttributeKey("WiretapSseEnabled") /** - * Ktor client plugin for Wiretap SSE inspection. + * Configuration holder for the installed plugin. + */ +@ExperimentalWiretapSseApi +class WiretapSsePluginHandler internal constructor(val enabled: Boolean) + +/** + * Ktor client plugin that automatically intercepts SSE sessions + * to log connections and events via Wiretap. * - * SSE connection tracking is handled by the [wiretapped] extension on [ClientSSESession]. - * This plugin stores the [WiretapSseConfig] so that [wiretapped] can read it. + * All SSE sessions are wrapped transparently at the [SSESession] + * level — no manual [wiretapped] call is needed. The plugin intercepts + * at [HttpResponsePipeline.Parse] (before Ktor's SSE plugin wraps the + * session into [io.ktor.client.plugins.sse.ClientSSESession] at Transform), + * ensuring all event I/O is logged transparently. * * Usage: * ```kotlin @@ -28,10 +47,53 @@ internal val WiretapSseEnabledKey = AttributeKey("WiretapSseEnabled") * ``` */ @ExperimentalWiretapSseApi -val WiretapKtorSsePlugin = createClientPlugin("WiretapSsePlugin", ::WiretapSseConfig) { - val enabled = pluginConfig.enabled +val WiretapKtorSsePlugin = + object : HttpClientPlugin { + + override val key = AttributeKey("WiretapSsePlugin") + + override fun prepare(block: WiretapSseConfig.() -> Unit): WiretapSsePluginHandler { + val config = WiretapSseConfig().apply(block) + return WiretapSsePluginHandler(config.enabled) + } + + override fun install(plugin: WiretapSsePluginHandler, scope: HttpClient) { + val deps = SsePluginDeps() + + // Store enabled flag in request attributes + scope.requestPipeline.intercept(HttpRequestPipeline.State) { + context.attributes.put(WiretapSseEnabledKey, plugin.enabled) + proceed() + } + + // Wrap the raw SSESession BEFORE Ktor's SSE plugin processes it + // at Transform phase. This ensures all event I/O is logged + // transparently, without requiring an explicit wiretapped() call. + scope.responsePipeline.intercept(HttpResponsePipeline.Parse) { + val (info, body) = subject + val sseSession = body as? SSESession ?: return@intercept + if (!plugin.enabled) return@intercept + + val url = context.request.url.toString() + val requestHeaders = context.request.headers.entries() + .associate { (key, values) -> key to values.joinToString(", ") } + + val connectionId = deps.sseLogManager.createConnection( + SseConnection( + url = url, + requestHeaders = requestHeaders, + status = SseStatus.Open, + timestamp = currentTimeMillis(), + ), + ) - onRequest { request, _ -> - request.attributes.put(WiretapSseEnabledKey, enabled) + val wrappedSession = LoggingSseSession( + delegate = sseSession, + connectionId = connectionId, + url = url, + sseLogManager = deps.sseLogManager, + ) + proceedWith(HttpResponseContainer(info, wrappedSession)) + } + } } -} diff --git a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapSseSession.kt b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapSseSession.kt index ca505911..82a60596 100644 --- a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapSseSession.kt +++ b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapSseSession.kt @@ -9,13 +9,11 @@ import io.ktor.client.call.HttpClientCall import io.ktor.sse.ServerSentEvent import kotlinx.coroutines.flow.Flow -/** - * Wraps an SSE session for Wiretap interception. - * - * In debug builds, logs all incoming SSE events. - * In noop builds, delegates directly to the underlying session. - */ @ExperimentalWiretapSseApi +@Deprecated( + message = "WiretapKtorSsePlugin now wraps sessions automatically. Use ClientSSESession directly.", + level = DeprecationLevel.ERROR, +) interface WiretapSseSession { val call: HttpClientCall diff --git a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapWrap.kt b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapWrap.kt index e57d43b5..7c7cbfb5 100644 --- a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapWrap.kt +++ b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/sse/WiretapWrap.kt @@ -4,48 +4,13 @@ package dev.skymansandy.wiretap.plugin.sse -import dev.skymansandy.wiretap.domain.model.SseConnection -import dev.skymansandy.wiretap.domain.model.SseStatus import dev.skymansandy.wiretap.helper.markers.ExperimentalWiretapSseApi -import dev.skymansandy.wiretap.helper.util.currentTimeMillis -import dev.skymansandy.wiretap.plugin.sse.util.SsePluginDeps import io.ktor.client.plugins.sse.ClientSSESession -/** - * Extension to wrap a Ktor [ClientSSESession] for Wiretap logging. - * - * Creates an SSE connection entry in Wiretap and returns a logging wrapper - * that intercepts incoming events. - * - * ```kotlin - * client.sse("https://example.com/events") { - * val session = this.wiretapped() - * session.incoming.collect { event -> ... } - * } - * ``` - */ @ExperimentalWiretapSseApi -suspend fun ClientSSESession.wiretapped(): WiretapSseSession { - val enabled = call.request.attributes.getOrNull(WiretapSseEnabledKey) ?: true - if (!enabled) return DelegatingSseSession(this) - - val deps = SsePluginDeps() - val url = call.request.url.toString() - val requestHeaders = call.request.headers.entries() - .associate { (key, values) -> key to values.joinToString(", ") } - - val connectionId = deps.sseLogManager.createConnection( - SseConnection( - url = url, - requestHeaders = requestHeaders, - status = SseStatus.Open, - timestamp = currentTimeMillis(), - ), - ) - - return LoggingSseSession( - delegate = this, - connectionId = connectionId, - sseLogManager = deps.sseLogManager, - ) -} +@Deprecated( + message = "WiretapKtorSsePlugin now wraps sessions automatically. Remove this call.", + replaceWith = ReplaceWith("this"), + level = DeprecationLevel.ERROR, +) +fun ClientSSESession.wiretapped(): ClientSSESession = this diff --git a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/DelegatingWebSocketSession.kt b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/DelegatingWebSocketSession.kt deleted file mode 100644 index 06b8e13f..00000000 --- a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/DelegatingWebSocketSession.kt +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (c) 2026 skymansandy. All rights reserved. - */ - -package dev.skymansandy.wiretap.plugin.ws - -import io.ktor.client.call.HttpClientCall -import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession -import io.ktor.websocket.DefaultWebSocketSession - -/** - * Passthrough [WiretapWebSocketSession] that delegates directly - * to the underlying session without any logging. - */ -internal class DelegatingWebSocketSession( - delegate: DefaultClientWebSocketSession, -) : WiretapWebSocketSession, DefaultWebSocketSession by delegate { - - override val call: HttpClientCall = delegate.call -} diff --git a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/LoggingRawWebSocketSession.kt b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/LoggingRawWebSocketSession.kt new file mode 100644 index 00000000..05bcf37d --- /dev/null +++ b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/LoggingRawWebSocketSession.kt @@ -0,0 +1,236 @@ +/* + * Copyright (c) 2026 skymansandy. All rights reserved. + */ + +package dev.skymansandy.wiretap.plugin.ws + +import co.touchlab.stately.concurrency.AtomicBoolean +import dev.skymansandy.wiretap.domain.model.SocketConnection +import dev.skymansandy.wiretap.domain.model.SocketContentType +import dev.skymansandy.wiretap.domain.model.SocketMessage +import dev.skymansandy.wiretap.domain.model.SocketMessageType +import dev.skymansandy.wiretap.domain.model.SocketStatus +import dev.skymansandy.wiretap.domain.orchestrator.SocketLogManager +import dev.skymansandy.wiretap.helper.util.currentTimeMillis +import io.ktor.websocket.Frame +import io.ktor.websocket.WebSocketExtension +import io.ktor.websocket.WebSocketSession +import io.ktor.websocket.readBytes +import io.ktor.websocket.readText +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.IO +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.channels.ChannelIterator +import kotlinx.coroutines.channels.ChannelResult +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.launch +import kotlinx.coroutines.selects.SelectClause1 + +/** + * Wraps a raw [WebSocketSession] from the engine to log all frames + * passing through, **before** Ktor's `WebSockets` plugin wraps it + * in `DefaultWebSocketSession` / `DefaultClientWebSocketSession`. + * + * This enables transparent interception of libraries that manage + * their own WebSocket sessions internally (e.g. SignalRKore), + * without requiring an explicit `wiretapped()` call. + */ +internal class LoggingRawWebSocketSession( + private val delegate: WebSocketSession, + private val socketId: Long, + private val url: String, + private val socketLogManager: SocketLogManager, +) : WebSocketSession { + + override val coroutineContext = delegate.coroutineContext + override var masking: Boolean + get() = runCatching { delegate.masking }.getOrDefault(true) + set(value) { runCatching { delegate.masking = value } } + override var maxFrameSize: Long + get() = runCatching { delegate.maxFrameSize }.getOrDefault(Long.MAX_VALUE) + set(value) { runCatching { delegate.maxFrameSize = value } } + override val extensions: List> + get() = runCatching { delegate.extensions }.getOrDefault(emptyList()) + + private val logScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) + private val closureReported = AtomicBoolean(false) + + private fun reportClosure(cause: Throwable?) { + if (!closureReported.compareAndSet(expected = false, new = true)) return + logScope.launch { + if (cause != null && cause !is CancellationException) { + socketLogManager.updateSocket( + SocketConnection( + id = socketId, + url = url, + status = SocketStatus.Failed, + failureMessage = cause.message ?: cause::class.simpleName ?: "Unknown error", + closedAt = currentTimeMillis(), + timestamp = currentTimeMillis(), + ), + ) + } else { + socketLogManager.updateSocket( + SocketConnection( + id = socketId, + url = url, + status = SocketStatus.Closed, + closeReason = if (cause is CancellationException) "Cancelled" else null, + closedAt = currentTimeMillis(), + timestamp = currentTimeMillis(), + ), + ) + } + } + } + + init { + delegate.coroutineContext[Job]?.invokeOnCompletion { cause -> + // Guard: if the outgoing channel is still open, the raw engine session's + // Job completed due to Ktor wrapping it into DefaultClientWebSocketSession, + // not because the actual WebSocket connection closed. + @OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class) + if (cause == null && !delegate.outgoing.isClosedForSend) return@invokeOnCompletion + reportClosure(cause) + } + } + + /** + * Wraps the delegate's incoming channel without launching any coroutines. + * This avoids scope/lifecycle issues — the channel stays open as long as + * the delegate's channel does, regardless of coroutine Job completion. + */ + override val incoming: ReceiveChannel = LoggingReceiveChannel( + delegate = delegate.incoming, + logAction = { frame -> logFrame(frame, SocketMessageType.Received) }, + onChannelClosed = { cause -> reportClosure(cause) }, + ) + + override val outgoing: SendChannel = LoggingSendChannel( + delegate = delegate.outgoing, + logAction = { frame -> logFrame(frame, SocketMessageType.Sent) }, + ) + + override suspend fun send(frame: Frame) { + logFrame(frame, SocketMessageType.Sent) + delegate.send(frame) + } + + override suspend fun flush() { + delegate.flush() + } + + @Deprecated("Use cancel() instead.", replaceWith = ReplaceWith("cancel()"), level = DeprecationLevel.ERROR) + override fun terminate() { + @Suppress("DEPRECATION_ERROR") + delegate.terminate() + } + + private fun logFrame(frame: Frame, direction: SocketMessageType) { + logScope.launch { + val (contentType, content, byteCount) = when (frame) { + is Frame.Text -> { + val text = frame.readText() + Triple(SocketContentType.Text, text, text.encodeToByteArray().size.toLong()) + } + + is Frame.Binary -> { + val bytes = frame.readBytes() + Triple(SocketContentType.Binary, "[Binary: ${bytes.size} bytes]", bytes.size.toLong()) + } + + is Frame.Ping -> Triple(SocketContentType.Ping, "", frame.data.size.toLong()) + is Frame.Pong -> Triple(SocketContentType.Pong, "", frame.data.size.toLong()) + is Frame.Close -> { + val bytes = frame.data + val closeContent = if (bytes.size >= 2) { + val closeCode = (bytes[0].toInt() and 0xFF shl 8) or (bytes[1].toInt() and 0xFF) + val closeReason = if (bytes.size > 2) bytes.decodeToString(2, bytes.size) else "" + if (closeReason.isNotEmpty()) "$closeCode $closeReason" else "$closeCode" + } else { + "" + } + Triple(SocketContentType.Close, closeContent, bytes.size.toLong()) + } + + else -> return@launch + } + + socketLogManager.logSocketMsg( + SocketMessage( + socketId = socketId, + direction = direction, + contentType = contentType, + content = content, + byteCount = byteCount, + timestamp = currentTimeMillis(), + ), + ) + } + } +} + +/** + * Wraps a [ReceiveChannel] to log frames as they are consumed. + * No coroutines are launched — logging is triggered inline on each receive. + */ +private class LoggingReceiveChannel( + private val delegate: ReceiveChannel, + private val logAction: (Frame) -> Unit, + private val onChannelClosed: (Throwable?) -> Unit, +) : ReceiveChannel by delegate { + + override suspend fun receive(): Frame { + return delegate.receive().also { logAction(it) } + } + + override suspend fun receiveCatching(): ChannelResult { + return delegate.receiveCatching().also { result -> + result.getOrNull()?.let { logAction(it) } + if (result.isClosed) onChannelClosed(result.exceptionOrNull()) + } + } + + override fun tryReceive(): ChannelResult { + return delegate.tryReceive().also { result -> + result.getOrNull()?.let { logAction(it) } + if (result.isClosed) onChannelClosed(result.exceptionOrNull()) + } + } + + override fun iterator(): ChannelIterator { + val delegateIterator = delegate.iterator() + return object : ChannelIterator { + override suspend fun hasNext(): Boolean { + return delegateIterator.hasNext().also { hasMore -> + if (!hasMore) onChannelClosed(null) + } + } + override fun next(): Frame = delegateIterator.next().also { logAction(it) } + } + } + + override val onReceive: SelectClause1 + get() = delegate.onReceive + + override val onReceiveCatching: SelectClause1> + get() = delegate.onReceiveCatching +} + +/** + * Wraps a [SendChannel] to log frames before they are sent. + */ +private class LoggingSendChannel( + private val delegate: SendChannel, + private val logAction: (Frame) -> Unit, +) : SendChannel by delegate { + + override suspend fun send(element: Frame) { + logAction(element) + delegate.send(element) + } +} diff --git a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/LoggingWebSocketSession.kt b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/LoggingWebSocketSession.kt deleted file mode 100644 index fcce9518..00000000 --- a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/LoggingWebSocketSession.kt +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Copyright (c) 2026 skymansandy. All rights reserved. - */ - -package dev.skymansandy.wiretap.plugin.ws - -import co.touchlab.stately.concurrency.AtomicBoolean -import dev.skymansandy.wiretap.domain.model.SocketConnection -import dev.skymansandy.wiretap.domain.model.SocketContentType -import dev.skymansandy.wiretap.domain.model.SocketMessage -import dev.skymansandy.wiretap.domain.model.SocketMessageType -import dev.skymansandy.wiretap.domain.model.SocketStatus -import dev.skymansandy.wiretap.domain.orchestrator.SocketLogManager -import dev.skymansandy.wiretap.helper.util.currentTimeMillis -import dev.skymansandy.wiretap.plugin.ws.util.toWebSocketUrl -import io.ktor.client.call.HttpClientCall -import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession -import io.ktor.websocket.CloseReason -import io.ktor.websocket.DefaultWebSocketSession -import io.ktor.websocket.Frame -import io.ktor.websocket.close -import io.ktor.websocket.readBytes -import io.ktor.websocket.readText -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.IO -import kotlinx.coroutines.Job -import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.channels.produce -import kotlinx.coroutines.launch - -/** - * [WiretapWebSocketSession] implementation that logs all sent and received messages. - * - * Automatically detects session completion (timeout, server close, error) and - * updates the socket status accordingly — no manual `markClosed()`/`markFailed()` needed. - */ -internal class LoggingWebSocketSession( - private val delegate: DefaultClientWebSocketSession, - private val socketId: Long, - private val socketLogManager: SocketLogManager, -) : WiretapWebSocketSession, DefaultWebSocketSession by delegate { - - override val call: HttpClientCall = delegate.call - - @OptIn(ExperimentalCoroutinesApi::class) - override val incoming: ReceiveChannel = delegate.produce { - for (frame in delegate.incoming) { - logFrame(frame, SocketMessageType.Received) - send(frame) - } - } - - private val logScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) - private val statusUpdated = AtomicBoolean(false) - - init { - installAutoClose() - } - - @OptIn(ExperimentalCoroutinesApi::class) - private fun installAutoClose() { - delegate.coroutineContext[Job]?.invokeOnCompletion { cause -> - if (!statusUpdated.compareAndSet(expected = false, new = true)) return@invokeOnCompletion - - val url = delegate.call.request.url.toString().toWebSocketUrl() - logScope.launch { - if (cause != null && cause !is CancellationException) { - socketLogManager.updateSocket( - SocketConnection( - id = socketId, - url = url, - status = SocketStatus.Failed, - failureMessage = cause.message ?: cause::class.simpleName ?: "Unknown error", - closedAt = currentTimeMillis(), - timestamp = currentTimeMillis(), - ), - ) - } else { - val closeReason = runCatching { delegate.closeReason.getCompleted() }.getOrNull() - socketLogManager.updateSocket( - SocketConnection( - id = socketId, - url = url, - status = SocketStatus.Closed, - closeCode = closeReason?.code?.toInt(), - closeReason = closeReason?.message ?: if (cause is CancellationException) "Cancelled" else null, - closedAt = currentTimeMillis(), - timestamp = currentTimeMillis(), - ), - ) - } - } - } - } - - override suspend fun send(frame: Frame) { - logFrame(frame, SocketMessageType.Sent) - delegate.send(frame) - } - - suspend fun close(code: Short = 1000.toShort(), reason: String? = null) { - statusUpdated.value = true - - val url = delegate.call.request.url.toString().toWebSocketUrl() - socketLogManager.updateSocket( - SocketConnection( - id = socketId, - url = url, - status = SocketStatus.Closed, - closeCode = code.toInt(), - closeReason = reason, - closedAt = currentTimeMillis(), - timestamp = currentTimeMillis(), - ), - ) - - delegate.close(CloseReason(code, reason ?: "")) - } - - private suspend fun logFrame(frame: Frame, direction: SocketMessageType) { - val (contentType, content, byteCount) = when (frame) { - is Frame.Text -> { - val text = frame.readText() - Triple(SocketContentType.Text, text, text.encodeToByteArray().size.toLong()) - } - - is Frame.Binary -> { - val bytes = frame.readBytes() - Triple(SocketContentType.Binary, "[Binary: ${bytes.size} bytes]", bytes.size.toLong()) - } - - is Frame.Ping -> Triple(SocketContentType.Ping, "", frame.data.size.toLong()) - is Frame.Pong -> Triple(SocketContentType.Pong, "", frame.data.size.toLong()) - is Frame.Close -> { - val bytes = frame.data - val closeContent = if (bytes.size >= 2) { - val closeCode = (bytes[0].toInt() and 0xFF shl 8) or (bytes[1].toInt() and 0xFF) - val closeReason = if (bytes.size > 2) bytes.decodeToString(2, bytes.size) else "" - if (closeReason.isNotEmpty()) "$closeCode $closeReason" else "$closeCode" - } else { - "" - } - Triple(SocketContentType.Close, closeContent, bytes.size.toLong()) - } - - else -> return - } - - socketLogManager.logSocketMsg( - SocketMessage( - socketId = socketId, - direction = direction, - contentType = contentType, - content = content, - byteCount = byteCount, - timestamp = currentTimeMillis(), - ), - ) - } -} diff --git a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/WiretapKtorWebSocketPlugin.kt b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/WiretapKtorWebSocketPlugin.kt index 9349a645..1c6061c7 100644 --- a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/WiretapKtorWebSocketPlugin.kt +++ b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/WiretapKtorWebSocketPlugin.kt @@ -10,15 +10,31 @@ import dev.skymansandy.wiretap.domain.model.config.ws.WiretapWsConfig import dev.skymansandy.wiretap.helper.util.currentTimeMillis import dev.skymansandy.wiretap.plugin.ws.util.WsPluginDeps import dev.skymansandy.wiretap.plugin.ws.util.toWebSocketUrl -import io.ktor.client.plugins.api.createClientPlugin +import io.ktor.client.HttpClient +import io.ktor.client.plugins.HttpClientPlugin +import io.ktor.client.request.HttpRequestPipeline +import io.ktor.client.statement.HttpReceivePipeline +import io.ktor.client.statement.HttpResponseContainer +import io.ktor.client.statement.HttpResponsePipeline import io.ktor.util.AttributeKey +import io.ktor.websocket.WebSocketSession internal val WiretapSocketIdKey = AttributeKey("WiretapSocketId") internal val WiretapWsEnabledKey = AttributeKey("WiretapWsEnabled") /** - * Ktor client plugin that intercepts WebSocket sessions to log - * connections and messages via Wiretap. + * Configuration holder for the installed plugin. + */ +class WiretapWsPluginHandler internal constructor(val enabled: Boolean) + +/** + * Ktor client plugin that automatically intercepts WebSocket sessions + * to log connections and messages via Wiretap. + * + * All WebSocket sessions are wrapped transparently at the raw engine + * level — no manual [wiretapped] call is needed. This ensures libraries + * that manage their own WebSocket sessions internally (e.g. SignalRKore) + * are also captured. * * Usage: * ```kotlin @@ -29,40 +45,72 @@ internal val WiretapWsEnabledKey = AttributeKey("WiretapWsEnabled") * } * } * ``` - * - * Note: This plugin hooks into 101 Switching Protocols responses. - * For full outgoing message interception, use [WiretapWebSocketSession] - * to wrap your session. */ -val WiretapKtorWebSocketPlugin = createClientPlugin("WiretapWebSocketPlugin", ::WiretapWsConfig) { +val WiretapKtorWebSocketPlugin = + object : HttpClientPlugin { - val enabled = pluginConfig.enabled - val deps = WsPluginDeps() + override val key = AttributeKey("WiretapWebSocketPlugin") - onRequest { request, _ -> - request.attributes.put(WiretapWsEnabledKey, enabled) - } + override fun prepare(block: WiretapWsConfig.() -> Unit): WiretapWsPluginHandler { + val config = WiretapWsConfig().apply(block) + return WiretapWsPluginHandler(config.enabled) + } + + override fun install(plugin: WiretapWsPluginHandler, scope: HttpClient) { + val deps = WsPluginDeps() + + // Store enabled flag in request attributes + scope.requestPipeline.intercept(HttpRequestPipeline.State) { + context.attributes.put(WiretapWsEnabledKey, plugin.enabled) + proceed() + } + + // Detect 101 upgrade, create socket entry, store socketId + scope.receivePipeline.intercept(HttpReceivePipeline.After) { + val response = subject + if (response.status.value != 101 || !plugin.enabled) { + proceed() + return@intercept + } + + val url = response.call.request.url.toString().toWebSocketUrl() + val requestHeaders = response.call.request.headers.entries() + .associate { (key, values) -> key to values.joinToString(", ") } + + val socketId = deps.socketLogManager.createSocket( + SocketConnection( + url = url, + requestHeaders = requestHeaders, + status = SocketStatus.Open, + timestamp = currentTimeMillis(), + protocol = response.version.let { "${it.name}/${it.major}.${it.minor}" }, + ), + ) + + response.call.request.attributes.put(WiretapSocketIdKey, socketId) + proceed() + } + + // Wrap the raw WebSocketSession BEFORE Ktor's WebSockets plugin + // processes it at Transform phase. This ensures all frame I/O + // is logged transparently, even for libraries that manage their + // own sessions internally. + scope.responsePipeline.intercept(HttpResponsePipeline.Parse) { + val (info, body) = subject + val rawSession = body as? WebSocketSession ?: return@intercept + if (!plugin.enabled) return@intercept + + val socketId = context.request.attributes.getOrNull(WiretapSocketIdKey) + ?: return@intercept - onResponse { response -> - // Only intercept WebSocket upgrades (status 101) - if (response.status.value != 101) return@onResponse - if (!enabled) return@onResponse - - val url = response.call.request.url.toString().toWebSocketUrl() - val requestHeaders = response.call.request.headers.entries() - .associate { (key, values) -> key to values.joinToString(", ") } - - val socketId = deps.socketLogManager.createSocket( - SocketConnection( - url = url, - requestHeaders = requestHeaders, - status = SocketStatus.Open, - timestamp = currentTimeMillis(), - protocol = response.version.let { "${it.name}/${it.major}.${it.minor}" }, - ), - ) - - // Store socket ID for later use - response.call.request.attributes.put(WiretapSocketIdKey, socketId) + val url = context.request.url.toString().toWebSocketUrl() + val wrappedSession = LoggingRawWebSocketSession( + delegate = rawSession, + socketId = socketId, + url = url, + socketLogManager = deps.socketLogManager, + ) + proceedWith(HttpResponseContainer(info, wrappedSession)) + } + } } -} diff --git a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/WiretapWebSocketSession.kt b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/WiretapWebSocketSession.kt deleted file mode 100644 index 537f372c..00000000 --- a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/WiretapWebSocketSession.kt +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright (c) 2026 skymansandy. All rights reserved. - */ - -package dev.skymansandy.wiretap.plugin.ws - -import io.ktor.client.call.HttpClientCall -import io.ktor.websocket.DefaultWebSocketSession - -/** - * Wraps a WebSocket session for Wiretap interception. - * - * Exposes the full [DefaultWebSocketSession] surface (incoming, outgoing, - * send, flush, closeReason, etc.) plus [call], so it can be used as a - * drop-in replacement for `DefaultClientWebSocketSession`. - * - * In debug builds, logs all sent/received frames. - * In noop builds, delegates directly to the underlying session. - */ -interface WiretapWebSocketSession : DefaultWebSocketSession { - - val call: HttpClientCall -} diff --git a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/WiretapWrap.kt b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/WiretapWrap.kt index 9f4c2a02..7c8c9fc5 100644 --- a/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/WiretapWrap.kt +++ b/wiretap-ktor/src/commonMain/kotlin/dev/skymansandy/wiretap/plugin/ws/WiretapWrap.kt @@ -4,30 +4,11 @@ package dev.skymansandy.wiretap.plugin.ws -import dev.skymansandy.wiretap.plugin.ws.util.WsPluginDeps import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession -/** - * Extension to wrap a Ktor [DefaultClientWebSocketSession] for Wiretap logging. - * - * Requires [WiretapKtorWebSocketPlugin] to be installed in the HttpClient. - * Returns a passthrough if the plugin is not installed. - * - * ```kotlin - * client.webSocket("wss://example.com/ws") { - * val session = this.wiretapped() - * session.send(Frame.Text("hello")) - * for (frame in session.incoming) { ... } - * } - * ``` - */ -fun DefaultClientWebSocketSession.wiretapped(): WiretapWebSocketSession { - val socketId = call.request.attributes.getOrNull(WiretapSocketIdKey) ?: return DelegatingWebSocketSession(this) - if (socketId < 0) return DelegatingWebSocketSession(this) - - return LoggingWebSocketSession( - delegate = this, - socketId = socketId, - socketLogManager = WsPluginDeps().socketLogManager, - ) -} +@Deprecated( + message = "WiretapKtorWebSocketPlugin now wraps sessions automatically. Remove this call.", + replaceWith = ReplaceWith("this"), + level = DeprecationLevel.ERROR, +) +fun DefaultClientWebSocketSession.wiretapped(): DefaultClientWebSocketSession = this