From d2c5cd26f276d94ab11533378af7379c03295857 Mon Sep 17 00:00:00 2001 From: a10zn8 Date: Tue, 12 May 2026 17:13:29 +0300 Subject: [PATCH 1/2] Fix silent stalls in native_subscribe (Solana slotSubscribe, eth pending tx) When an upstream WebSocket subscription stops emitting events while the TCP/WS connection stays alive (common on Solana RPCs), the subscription Flux silently completed: `.timeout(85s, Mono.empty())` switched to an empty fallback (no error), and `.onErrorResume { Mono.empty() }` swallowed any real errors. The wrapping DurableFlux only reconnects on `onError`, so no resubscribe happened; clients kept receiving heartbeats from NativeSubscribe but no data. Surface stalls as TimeoutException and let real errors propagate so DurableFlux re-invokes createConnection() and issues a fresh subscribe with a new subId. Fixes both: - GenericSubscriptionConnect (Solana / Polkadot / Ripple / generic WS subs) - WebsocketPendingTxes (Ethereum newPendingTransactions) Co-Authored-By: Claude Opus 4.7 (1M context) --- .../subscribe/WebsocketPendingTxes.kt | 15 +++- .../generic/GenericIngressSubscription.kt | 18 ++--- .../subscribe/WebsocketPendingTxesSpec.groovy | 24 +++++++ .../generic/GenericSubscriptionConnectTest.kt | 71 +++++++++++++++++++ 4 files changed, 117 insertions(+), 11 deletions(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxes.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxes.kt index ddd71c83..b2097626 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxes.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxes.kt @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.time.Duration +import java.util.concurrent.TimeoutException class WebsocketPendingTxes( private val chain: Chain, @@ -35,6 +36,7 @@ class WebsocketPendingTxes( companion object { private val log = LoggerFactory.getLogger(WebsocketPendingTxes::class.java) + private val IDLE_TIMEOUT: Duration = Duration.ofSeconds(85) } override fun createConnection(): Flux { @@ -42,7 +44,15 @@ class WebsocketPendingTxes( return sub .data .flatMapMany { it.t2 } - .timeout(Duration.ofSeconds(85), Mono.empty()) + // Surface stalls as TimeoutException so the surrounding DurableFlux re-invokes + // createConnection() and issues a fresh eth_subscribe. Without this, a silent + // upstream pubsub leaves the shared Flux completed and clients stop receiving events. + .timeout( + IDLE_TIMEOUT, + Mono.error( + TimeoutException("No events from eth_subscribe newPendingTransactions in $IDLE_TIMEOUT, forcing resubscribe"), + ), + ) .map { // comes as a JS string, i.e., within quotes val value = ByteArray(it.size - 2) @@ -60,7 +70,6 @@ class WebsocketPendingTxes( log.info("unsubscribed from ${sub.subId.get()}") } } - .doOnError { t -> log.warn("Invalid pending transaction", t) } - .onErrorResume { Mono.empty() } + .doOnError { t -> log.warn("Error during pending tx subscription: {}", t.message) } } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt index b572b436..fb5d10c0 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt @@ -13,6 +13,7 @@ import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.time.Duration import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeoutException class GenericIngressSubscription( val chain: Chain, @@ -49,6 +50,7 @@ class GenericSubscriptionConnect( companion object { private val log = LoggerFactory.getLogger(GenericSubscriptionConnect::class.java) + private val IDLE_TIMEOUT: Duration = Duration.ofSeconds(85) } @Suppress("UNCHECKED_CAST") @@ -56,16 +58,16 @@ class GenericSubscriptionConnect( val sub = conn.subscribe(ChainRequest(topic, ListParams(getParams(params) as List))) return sub.data .flatMapMany { it.t2 } + // Some upstreams (notably Solana RPCs) silently stop delivering events on a subscription + // while keeping the WebSocket connection alive. Emit a TimeoutException so the surrounding + // DurableFlux re-invokes createConnection() and re-issues `subscribe` with a fresh subId. .timeout( - Duration.ofSeconds(85), - Mono.empty().doOnEach { - log.warn("Timeout during subscription to $topic after 85 seconds") - }, + IDLE_TIMEOUT, + Mono.error( + TimeoutException("No events from subscription to $topic in $IDLE_TIMEOUT, forcing resubscribe"), + ), ) - .onErrorResume { - log.error("Error during subscription to $topic", it) - Mono.empty() - } + .doOnError { log.warn("Error during subscription to $topic: {}", it.message) } .doFinally { if (unsubscribeMethod != "") { conn.unsubscribe( diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxesSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxesSpec.groovy index 68d453e0..f1dd5a02 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxesSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxesSpec.groovy @@ -22,10 +22,12 @@ import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions import io.emeraldpay.dshackle.upstream.rpcclient.ListParams import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import reactor.test.StepVerifier import reactor.util.function.Tuples import spock.lang.Specification import java.time.Duration +import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicReference class WebsocketPendingTxesSpec extends Specification { @@ -55,4 +57,26 @@ class WebsocketPendingTxesSpec extends Specification { "0x67f22a3b441ea312306f97694ca8159f8d6faaccf0f5ce6442c84b13991f1d23", ] } + + def "Emits TimeoutException when upstream goes silent past idle timeout"() { + // Pins the fix for the silent-stall bug: a healthy WS that simply stops delivering + // newPendingTransactions events must surface a TimeoutException so DurableFlux + // re-issues eth_subscribe. + setup: + def ws = Stub(WsSubscriptions) { + subscribe(new ChainRequest("eth_subscribe", new ListParams(["newPendingTransactions"]))) >> new WsSubscriptions.SubscribeData( + Mono.just(Tuples.of("sub-1", Flux.never())), "conn-1", new AtomicReference("sub-1") + ) + unsubscribe(_) >> Mono.empty() + } + def pending = new WebsocketPendingTxes(Chain.ETHEREUM__MAINNET, ws) + + expect: + StepVerifier.withVirtualTime { pending.createConnection() } + .expectSubscription() + .expectNoEvent(Duration.ofSeconds(84)) + .thenAwait(Duration.ofSeconds(2)) + .expectError(TimeoutException) + .verify(Duration.ofSeconds(5)) + } } diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericSubscriptionConnectTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericSubscriptionConnectTest.kt index f8b04816..ecd98b5e 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericSubscriptionConnectTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericSubscriptionConnectTest.kt @@ -6,13 +6,16 @@ import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions import io.emeraldpay.dshackle.upstream.rpcclient.ListParams import org.junit.jupiter.api.Test import org.mockito.Mockito.verify +import org.mockito.kotlin.any import org.mockito.kotlin.doReturn import org.mockito.kotlin.mock +import org.mockito.kotlin.times import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.test.StepVerifier import reactor.util.function.Tuples import java.time.Duration +import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicReference class GenericSubscriptionConnectTest { @@ -36,4 +39,72 @@ class GenericSubscriptionConnectTest { verify(ws).subscribe(ChainRequest(topic, ListParams(param))) } + + @Test + fun `emits TimeoutException when upstream goes silent past idle timeout`() { + // Simulates the Solana-like failure: WS subscription is established, but upstream stops + // delivering events without closing the connection. The Flux must surface a TimeoutException + // so the surrounding DurableFlux re-subscribes. + val param: List = listOf("all") + val topic = "slotSubscribe" + val ws = mock { + on { subscribe(ChainRequest(topic, ListParams(param))) } doReturn + WsSubscriptions.SubscribeData( + Mono.just(Tuples.of("sub-id-1", Flux.never())), + "conn-1", + AtomicReference("sub-id-1"), + ) + } + + val genericSubscriptionConnect = GenericSubscriptionConnect(Chain.SOLANA__MAINNET, ws, topic, param, "") + + StepVerifier.withVirtualTime { genericSubscriptionConnect.createConnection() } + .expectSubscription() + .expectNoEvent(Duration.ofSeconds(84)) + .thenAwait(Duration.ofSeconds(2)) + .expectError(TimeoutException::class.java) + .verify(Duration.ofSeconds(5)) + } + + @Test + fun `idle timeout triggers resubscribe via DurableFlux retry`() { + // End-to-end on the GenericSubscriptionConnect: when the first subscription stalls, + // DurableFlux must call createConnection() again, which issues a fresh `subscribe` RPC. + val param: List = listOf("all") + val topic = "slotSubscribe" + + @Suppress("UNCHECKED_CAST") + val secondAttempt = WsSubscriptions.SubscribeData( + Mono.just(Tuples.of("sub-id-2", Flux.just("recovered".toByteArray()))), + "conn-1", + AtomicReference("sub-id-2"), + ) + @Suppress("UNCHECKED_CAST") + val firstAttempt = WsSubscriptions.SubscribeData( + Mono.just(Tuples.of("sub-id-1", Flux.never())), + "conn-1", + AtomicReference("sub-id-1"), + ) + + val ws = mock { + on { subscribe(ChainRequest(topic, ListParams(param))) } + .doReturn(firstAttempt, secondAttempt) + on { unsubscribe(any()) } doReturn Mono.empty() + } + + val genericSubscriptionConnect = GenericSubscriptionConnect(Chain.SOLANA__MAINNET, ws, topic, param, "slotUnsubscribe") + + // GenericPersistentConnect wraps createConnection() with DurableFlux + SharedFluxHolder. + // The first subscription stalls (Flux.never) → timeout (85s) → TimeoutException → + // DurableFlux schedules retry → second subscription delivers "recovered". + StepVerifier.withVirtualTime { genericSubscriptionConnect.connect(io.emeraldpay.dshackle.upstream.Selector.empty) } + .expectSubscription() + .thenAwait(Duration.ofSeconds(90)) + .expectNextMatches { it is ByteArray && String(it) == "recovered" } + .thenCancel() + .verify(Duration.ofSeconds(5)) + + // Verifies we re-subscribed (= sent slotSubscribe again on the same WS). + verify(ws, times(2)).subscribe(ChainRequest(topic, ListParams(param))) + } } From db651a43f19477621f13d2399645e5969db9dd6f Mon Sep 17 00:00:00 2001 From: a10zn8 Date: Tue, 12 May 2026 17:39:17 +0300 Subject: [PATCH 2/2] Fix ktlint: blank line between annotated declarations Co-Authored-By: Claude Opus 4.7 (1M context) --- .../dshackle/upstream/generic/GenericSubscriptionConnectTest.kt | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericSubscriptionConnectTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericSubscriptionConnectTest.kt index ecd98b5e..a7af6e93 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericSubscriptionConnectTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericSubscriptionConnectTest.kt @@ -73,13 +73,11 @@ class GenericSubscriptionConnectTest { val param: List = listOf("all") val topic = "slotSubscribe" - @Suppress("UNCHECKED_CAST") val secondAttempt = WsSubscriptions.SubscribeData( Mono.just(Tuples.of("sub-id-2", Flux.just("recovered".toByteArray()))), "conn-1", AtomicReference("sub-id-2"), ) - @Suppress("UNCHECKED_CAST") val firstAttempt = WsSubscriptions.SubscribeData( Mono.just(Tuples.of("sub-id-1", Flux.never())), "conn-1",