Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration
import java.util.concurrent.TimeoutException

class WebsocketPendingTxes(
private val chain: Chain,
Expand All @@ -35,14 +36,23 @@ class WebsocketPendingTxes(

companion object {
private val log = LoggerFactory.getLogger(WebsocketPendingTxes::class.java)
private val IDLE_TIMEOUT: Duration = Duration.ofSeconds(85)
}

override fun createConnection(): Flux<TransactionId> {
val sub = wsSubscriptions.subscribe(ChainRequest("eth_subscribe", ListParams(EthereumEgressSubscription.METHOD_PENDING_TXES)))
return sub
.data
.flatMapMany { it.t2 }
.timeout(Duration.ofSeconds(85), Mono.empty())
// Surface stalls as TimeoutException so the surrounding DurableFlux re-invokes
// createConnection() and issues a fresh eth_subscribe. Without this, a silent
// upstream pubsub leaves the shared Flux completed and clients stop receiving events.
.timeout(
IDLE_TIMEOUT,
Mono.error(
TimeoutException("No events from eth_subscribe newPendingTransactions in $IDLE_TIMEOUT, forcing resubscribe"),
),
)
.map {
// comes as a JS string, i.e., within quotes
val value = ByteArray(it.size - 2)
Expand All @@ -60,7 +70,6 @@ class WebsocketPendingTxes(
log.info("unsubscribed from ${sub.subId.get()}")
}
}
.doOnError { t -> log.warn("Invalid pending transaction", t) }
.onErrorResume { Mono.empty() }
.doOnError { t -> log.warn("Error during pending tx subscription: {}", t.message) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeoutException

class GenericIngressSubscription(
val chain: Chain,
Expand Down Expand Up @@ -49,23 +50,24 @@ class GenericSubscriptionConnect(

companion object {
private val log = LoggerFactory.getLogger(GenericSubscriptionConnect::class.java)
private val IDLE_TIMEOUT: Duration = Duration.ofSeconds(85)
}

@Suppress("UNCHECKED_CAST")
override fun createConnection(): Flux<Any> {
val sub = conn.subscribe(ChainRequest(topic, ListParams(getParams(params) as List<Any>)))
return sub.data
.flatMapMany { it.t2 }
// Some upstreams (notably Solana RPCs) silently stop delivering events on a subscription
// while keeping the WebSocket connection alive. Emit a TimeoutException so the surrounding
// DurableFlux re-invokes createConnection() and re-issues `subscribe` with a fresh subId.
.timeout(
Duration.ofSeconds(85),
Mono.empty<ByteArray>().doOnEach {
log.warn("Timeout during subscription to $topic after 85 seconds")
},
IDLE_TIMEOUT,
Mono.error(
TimeoutException("No events from subscription to $topic in $IDLE_TIMEOUT, forcing resubscribe"),
),
)
.onErrorResume {
log.error("Error during subscription to $topic", it)
Mono.empty()
}
.doOnError { log.warn("Error during subscription to $topic: {}", it.message) }
.doFinally {
if (unsubscribeMethod != "") {
conn.unsubscribe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.test.StepVerifier
import reactor.util.function.Tuples
import spock.lang.Specification

import java.time.Duration
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicReference

class WebsocketPendingTxesSpec extends Specification {
Expand Down Expand Up @@ -55,4 +57,26 @@ class WebsocketPendingTxesSpec extends Specification {
"0x67f22a3b441ea312306f97694ca8159f8d6faaccf0f5ce6442c84b13991f1d23",
]
}

def "Emits TimeoutException when upstream goes silent past idle timeout"() {
// Pins the fix for the silent-stall bug: a healthy WS that simply stops delivering
// newPendingTransactions events must surface a TimeoutException so DurableFlux
// re-issues eth_subscribe.
setup:
def ws = Stub(WsSubscriptions) {
subscribe(new ChainRequest("eth_subscribe", new ListParams(["newPendingTransactions"]))) >> new WsSubscriptions.SubscribeData(
Mono.just(Tuples.of("sub-1", Flux.never())), "conn-1", new AtomicReference<String>("sub-1")
)
unsubscribe(_) >> Mono.empty()
}
def pending = new WebsocketPendingTxes(Chain.ETHEREUM__MAINNET, ws)

expect:
StepVerifier.withVirtualTime { pending.createConnection() }
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(84))
.thenAwait(Duration.ofSeconds(2))
.expectError(TimeoutException)
.verify(Duration.ofSeconds(5))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import org.junit.jupiter.api.Test
import org.mockito.Mockito.verify
import org.mockito.kotlin.any
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.mock
import org.mockito.kotlin.times
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.test.StepVerifier
import reactor.util.function.Tuples
import java.time.Duration
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicReference

class GenericSubscriptionConnectTest {
Expand All @@ -36,4 +39,70 @@ class GenericSubscriptionConnectTest {

verify(ws).subscribe(ChainRequest(topic, ListParams(param)))
}

@Test
fun `emits TimeoutException when upstream goes silent past idle timeout`() {
// Simulates the Solana-like failure: WS subscription is established, but upstream stops
// delivering events without closing the connection. The Flux must surface a TimeoutException
// so the surrounding DurableFlux re-subscribes.
val param: List<Any> = listOf("all")
val topic = "slotSubscribe"
val ws = mock<WsSubscriptions> {
on { subscribe(ChainRequest(topic, ListParams(param))) } doReturn
WsSubscriptions.SubscribeData(
Mono.just(Tuples.of("sub-id-1", Flux.never<ByteArray>())),
"conn-1",
AtomicReference("sub-id-1"),
)
}

val genericSubscriptionConnect = GenericSubscriptionConnect(Chain.SOLANA__MAINNET, ws, topic, param, "")

StepVerifier.withVirtualTime { genericSubscriptionConnect.createConnection() }
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(84))
.thenAwait(Duration.ofSeconds(2))
.expectError(TimeoutException::class.java)
.verify(Duration.ofSeconds(5))
}

@Test
fun `idle timeout triggers resubscribe via DurableFlux retry`() {
// End-to-end on the GenericSubscriptionConnect: when the first subscription stalls,
// DurableFlux must call createConnection() again, which issues a fresh `subscribe` RPC.
val param: List<Any> = listOf("all")
val topic = "slotSubscribe"

val secondAttempt = WsSubscriptions.SubscribeData(
Mono.just(Tuples.of("sub-id-2", Flux.just("recovered".toByteArray()))),
"conn-1",
AtomicReference("sub-id-2"),
)
val firstAttempt = WsSubscriptions.SubscribeData(
Mono.just(Tuples.of("sub-id-1", Flux.never<ByteArray>())),
"conn-1",
AtomicReference("sub-id-1"),
)

val ws = mock<WsSubscriptions> {
on { subscribe(ChainRequest(topic, ListParams(param))) }
.doReturn(firstAttempt, secondAttempt)
on { unsubscribe(any()) } doReturn Mono.empty()
}

val genericSubscriptionConnect = GenericSubscriptionConnect(Chain.SOLANA__MAINNET, ws, topic, param, "slotUnsubscribe")

// GenericPersistentConnect wraps createConnection() with DurableFlux + SharedFluxHolder.
// The first subscription stalls (Flux.never) → timeout (85s) → TimeoutException →
// DurableFlux schedules retry → second subscription delivers "recovered".
StepVerifier.withVirtualTime { genericSubscriptionConnect.connect(io.emeraldpay.dshackle.upstream.Selector.empty) }
.expectSubscription()
.thenAwait(Duration.ofSeconds(90))
.expectNextMatches { it is ByteArray && String(it) == "recovered" }
.thenCancel()
.verify(Duration.ofSeconds(5))

// Verifies we re-subscribed (= sent slotSubscribe again on the same WS).
verify(ws, times(2)).subscribe(ChainRequest(topic, ListParams(param)))
}
}
Loading