Fix silent stalls in native_subscribe WS subscriptions (Solana, eth pending tx)#827
Open
a10zn8 wants to merge 2 commits into
Open
Fix silent stalls in native_subscribe WS subscriptions (Solana, eth pending tx)#827a10zn8 wants to merge 2 commits into
a10zn8 wants to merge 2 commits into
Conversation
…ing tx)
When an upstream WebSocket subscription stops emitting events while the
TCP/WS connection stays alive (common on Solana RPCs), the subscription
Flux silently completed: `.timeout(85s, Mono.empty())` switched to an
empty fallback (no error), and `.onErrorResume { Mono.empty() }` swallowed
any real errors. The wrapping DurableFlux only reconnects on `onError`,
so no resubscribe happened; clients kept receiving heartbeats from
NativeSubscribe but no data.
Surface stalls as TimeoutException and let real errors propagate so
DurableFlux re-invokes createConnection() and issues a fresh subscribe
with a new subId. Fixes both:
- GenericSubscriptionConnect (Solana / Polkadot / Ripple / generic WS subs)
- WebsocketPendingTxes (Ethereum newPendingTransactions)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
TimeoutException(instead of silent completion) so the wrappingDurableFluxre-issuessubscribewith a fresh subId.onErrorResume { Mono.empty() }— let them propagate toDurableFlux, which already handles reconnect with exponential backoff.GenericSubscriptionConnect(SolanaslotSubscribe, plus all chains that go throughGenericIngressSubscription: Polkadot, Ripple, etc.) andWebsocketPendingTxes(EthereumnewPendingTransactions).Root cause
Solana RPC nodes commonly stop delivering events on a specific subscription while keeping the WS TCP/frame layer fully alive (no
ping/pongon our side either —WsConnectionImplruns withhandlePing(false)and no idle handler). The previous code inGenericSubscriptionConnect.createConnection()had:When the upstream stalled for 85 s:
.timeout(d, Mono.empty())switched to an empty fallback → Flux completed (no error).DurableFlux.connect()only retries ononError, notonComplete→ no resubscribe.SharedFluxHolderpropagatedonCompleteto all downstream subscribers and clearedcurrent.NativeSubscribe.nativeSubscribekept firing → the client connection looked healthy but never received another data event.WebsocketPendingTxes.createConnection()had the identical anti-pattern.The healthy reference is
GenericWsHead.listenNewHeads()which already uses.timeout(d, Mono.error(...))and resubscribes on the error.Test plan
GenericSubscriptionConnectTest.emits TimeoutException when upstream goes silent past idle timeout— virtual-time test pinning the new error signal.GenericSubscriptionConnectTest.idle timeout triggers resubscribe via DurableFlux retry— end-to-end: first subscription stalls (Flux.never), virtual time advances 90 s, secondsubscribe()delivers a "recovered" event; verifiessubscribe(...)was called twice.WebsocketPendingTxesSpec.Emits TimeoutException when upstream goes silent past idle timeout— same pattern for the Ethereum pending-tx path.Produces valuestest still passes (happy path unchanged)../gradlew compileKotlin compileTestKotlinclean.Follow-up (out of scope)
WsConnectionImplhas no WS-level ping/idle handler, so a half-open TCP connection still relies on OS TCP timeouts (~minutes). With this PR, the subscription layer at least heals on its own; the connection layer is a separate hardening task.🤖 Generated with Claude Code