Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 109 additions & 7 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1184,9 +1184,65 @@ func (s *Server) defaultForwardRequestWithBodyFunc(w http.ResponseWriter, ctx co
return err
}

// Liveness-detection knobs for the proxied WebSocket. Vars (not consts) so
// tests can shorten them. wsIdleTimeout is the maximum gap between any frames
// (data, ping, pong) received from the upstream before we declare the
// connection dead and tear it down. wsWriteTimeout caps any single forwarded
// data write so a slow downstream peer can't stall the proxy or starve the
// idle detector on the other leg.
var (
wsIdleTimeout = 90 * time.Second
wsWriteTimeout = 30 * time.Second
wsControlWriteTimeout = 10 * time.Second
)

// wsBackendIdleError marks a read-deadline timeout from the backend leg.
// This is the only error condition that unambiguously means "upstream went
// silent"; other timeouts (e.g. a write to a slow client) are downstream
// backpressure and must not be attributed to the upstream endpoint.
type wsBackendIdleError struct{ err error }

func (e *wsBackendIdleError) Error() string { return e.err.Error() }
func (e *wsBackendIdleError) Unwrap() error { return e.err }

// installControlForwarders installs ping/pong handlers on src that forward
// the frame to dst via WriteControl, instead of letting gorilla auto-pong
// inbound pings on src. This makes the proxy transparent for WS control
// frames: a client's ping reaches the upstream RPC node, and the upstream's
// pong reaches the client, proving the full chain.
//
// If bumpDeadline is true, every received control frame also extends src's
// read deadline by wsIdleTimeout — used for the upstream side so quiet
// pong/ping traffic from a healthy upstream resets the idle backstop.
//
// A WriteControl failure on dst is returned from the handler, which causes
// src.ReadMessage to return the same error and triggers the existing
// teardown path.
func installControlForwarders(src, dst *websocket.Conn, bumpDeadline bool) {
src.SetPingHandler(func(appData string) error {
if bumpDeadline {
_ = src.SetReadDeadline(time.Now().Add(wsIdleTimeout))
}
return dst.WriteControl(websocket.PingMessage,
[]byte(appData), time.Now().Add(wsControlWriteTimeout))
})
src.SetPongHandler(func(appData string) error {
if bumpDeadline {
_ = src.SetReadDeadline(time.Now().Add(wsIdleTimeout))
}
return dst.WriteControl(websocket.PongMessage,
[]byte(appData), time.Now().Add(wsControlWriteTimeout))
})
}

// proxyWebSocketCopy copies messages from src to dst, forwarding close frames
// to the destination so both peers receive a proper WebSocket close handshake.
func proxyWebSocketCopy(src, dst *websocket.Conn) error {
// Each forwarded write is bounded by wsWriteTimeout so a slow peer can't
// stall the goroutine indefinitely. If bumpSrcDeadline is true, the src read
// deadline is reset to wsIdleTimeout AFTER every successful end-to-end
// forward, so a slow downstream write doesn't shorten the next read budget
// and cause a false "upstream idle" timeout.
func proxyWebSocketCopy(src, dst *websocket.Conn, bumpSrcDeadline bool) error {
for {
msgType, msg, err := src.ReadMessage()
if err != nil {
Expand All @@ -1202,11 +1258,26 @@ func proxyWebSocketCopy(src, dst *websocket.Conn) error {
_ = dst.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(code, closeErr.Text))
}
// Tag a backend-leg read-deadline timeout as the genuine
// upstream-idle signal. Write timeouts (or read timeouts on
// the client leg, which has no deadline today) must not be
// misattributed to the upstream — they are caught by the
// caller's isExpectedWSClose path and don't mark unhealthy.
if bumpSrcDeadline {
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
return &wsBackendIdleError{err: err}
}
}
return err
}
_ = dst.SetWriteDeadline(time.Now().Add(wsWriteTimeout))
if err := dst.WriteMessage(msgType, msg); err != nil {
return err
}
if bumpSrcDeadline {
_ = src.SetReadDeadline(time.Now().Add(wsIdleTimeout))
}
}
}

Expand Down Expand Up @@ -1307,15 +1378,30 @@ func (s *Server) defaultProxyWebSocket(w http.ResponseWriter, r *http.Request, b
backendConn.Close()
return err
}
// Proxy messages in both directions

// Forward WS control frames in both directions instead of letting
// gorilla auto-pong inbound pings on each side. This makes liveness
// checks transparent end-to-end: a client ping reaches the upstream RPC
// node, and the upstream pong reaches the client.
//
// On the backend conn we also bump the read deadline on every received
// frame and seed it now. If the upstream goes silent for wsIdleTimeout
// (no data, ping, or pong), ReadMessage returns a Timeout error and the
// teardown below propagates a close frame to the client so it knows to
// reconnect. The client conn intentionally has no read deadline — silent
// clients are fine; only upstream silence is the failure mode.
installControlForwarders(clientConn, backendConn, false)
installControlForwarders(backendConn, clientConn, true)
_ = backendConn.SetReadDeadline(time.Now().Add(wsIdleTimeout))

// Proxy messages in both directions. The backend leg also resets the
// backend read deadline on each received data frame.
errc := make(chan error, 2)
go func() {
err := proxyWebSocketCopy(clientConn, backendConn)
errc <- err
errc <- proxyWebSocketCopy(clientConn, backendConn, false)
}()
go func() {
err := proxyWebSocketCopy(backendConn, clientConn)
errc <- err
errc <- proxyWebSocketCopy(backendConn, clientConn, true)
}()
// Wait for one direction to fail/close, then immediately close both
// connections so the other goroutine unblocks and finishes cleanly.
Expand All @@ -1324,8 +1410,24 @@ func (s *Server) defaultProxyWebSocket(w http.ResponseWriter, r *http.Request, b
backendConn.Close()
<-errc // wait for the second goroutine to finish

// Mark endpoint as unhealthy for WS if error is not a normal closure
if err != nil {
// Backend leg's read deadline fired = upstream sent nothing for
// wsIdleTimeout. Only this specific error is attributed to the
// upstream. Other timeouts (downstream write blocking, control
// frame forwarding) are caught by isExpectedWSClose below and
// don't mark the endpoint unhealthy.
var idleErr *wsBackendIdleError
if errors.As(err, &idleErr) {
log.Warn().
Err(err).
Str("endpoint", helpers.RedactAPIKey(backendURL)).
Dur("idle_timeout", wsIdleTimeout).
Msg("WebSocket upstream idle timeout, marking endpoint unhealthy")
if chain, endpointID, found := s.findChainAndEndpointByURL(backendURL); found {
s.markEndpointUnhealthyProtocol(chain, endpointID, "ws")
}
return err
}
if isExpectedWSClose(err) {
if closeErr, ok := err.(*websocket.CloseError); ok && closeErr.Code == websocket.CloseAbnormalClosure {
log.Debug().Err(err).Str("endpoint", helpers.RedactAPIKey(backendURL)).Msg("WebSocket connection closed abnormally (1006), not counting as failure")
Expand Down
197 changes: 197 additions & 0 deletions internal/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -499,6 +500,202 @@ func TestIsExpectedWSClose(t *testing.T) {
}
}

// TestProxyWebSocket_UpstreamIdleTimeout verifies that when the upstream
// stops sending any frames (data, ping, or pong) for longer than
// wsIdleTimeout, the proxy tears down the client connection and marks the
// endpoint unhealthy. This is the regression test for the silent-subscription
// bug where eth_subscribe stops delivering newHeads but the client connection
// stays open indefinitely.
func TestProxyWebSocket_UpstreamIdleTimeout(t *testing.T) {
origTimeout := wsIdleTimeout
wsIdleTimeout = 300 * time.Millisecond
t.Cleanup(func() { wsIdleTimeout = origTimeout })

// Silent upstream: complete the WS handshake, then do nothing — no
// reads (so no auto-pong on inbound pings), no writes, no close frame.
upgrader := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }}
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
<-r.Context().Done()
c.Close()
}))
t.Cleanup(upstream.Close)

upstreamWSURL := "ws" + strings.TrimPrefix(upstream.URL, "http")

cfg := &config.Config{
Endpoints: map[string]config.ChainEndpoints{
"chainX": {
"ep1": config.Endpoint{Provider: "ep1", WSURL: upstreamWSURL, Role: "primary", Type: "full"},
},
},
}
valkeyClient := store.NewMockValkeyClient()
valkeyClient.PopulateStatuses(map[string]*store.EndpointStatus{
"chainX:ep1": {HasWS: true, HealthyWS: true},
})
srv := NewServer(cfg, valkeyClient, createTestConfig())

proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_ = srv.defaultProxyWebSocket(w, r, upstreamWSURL)
}))
t.Cleanup(proxy.Close)

proxyWSURL := "ws" + strings.TrimPrefix(proxy.URL, "http")
client, resp, err := websocket.DefaultDialer.Dial(proxyWSURL, nil)
if err != nil {
t.Fatalf("client dial failed: %v", err)
}
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
defer client.Close()

// Give the proxy a generous read deadline; we expect it to hang up well
// before this fires.
_ = client.SetReadDeadline(time.Now().Add(wsIdleTimeout + 2*time.Second))
start := time.Now()
_, _, err = client.ReadMessage()
elapsed := time.Since(start)

if err == nil {
t.Fatalf("expected client ReadMessage to fail after upstream idle timeout, got nil after %s", elapsed)
}
if elapsed < wsIdleTimeout {
t.Errorf("client torn down too early: elapsed=%s wsIdleTimeout=%s", elapsed, wsIdleTimeout)
}
if elapsed > wsIdleTimeout+2*time.Second {
t.Errorf("client torn down too late: elapsed=%s wsIdleTimeout=%s", elapsed, wsIdleTimeout)
}

// The proxy should mark the upstream endpoint unhealthy. The status
// write happens just before defaultProxyWebSocket returns, after both
// goroutines have joined; allow a brief moment for it to land.
deadline := time.Now().Add(500 * time.Millisecond)
for {
status, _ := valkeyClient.GetEndpointStatus(context.Background(), "chainX", "ep1")
if !status.HealthyWS {
break
}
if time.Now().After(deadline) {
t.Errorf("expected HealthyWS=false after upstream idle timeout, still true")
break
}
time.Sleep(10 * time.Millisecond)
}
}

// TestProxyWebSocket_PingPongForwarded verifies that a client ping is
// forwarded all the way to the upstream and the upstream's pong is forwarded
// back to the client (rather than aetherlay auto-ponging at the proxy
// layer). Payload bytes must round-trip unchanged.
func TestProxyWebSocket_PingPongForwarded(t *testing.T) {
origTimeout := wsIdleTimeout
wsIdleTimeout = 5 * time.Second
t.Cleanup(func() { wsIdleTimeout = origTimeout })

pingReceived := make(chan string, 1)
upgrader := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }}
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
// Override default ping handler so we can capture the payload AND
// reply with a pong carrying the same payload (default behavior, but
// explicit here).
c.SetPingHandler(func(appData string) error {
select {
case pingReceived <- appData:
default:
}
return c.WriteControl(websocket.PongMessage,
[]byte(appData), time.Now().Add(time.Second))
})
// Drive the read loop so the ping handler actually fires.
for {
if _, _, err := c.ReadMessage(); err != nil {
return
}
}
}))
t.Cleanup(upstream.Close)

upstreamWSURL := "ws" + strings.TrimPrefix(upstream.URL, "http")

cfg := &config.Config{
Endpoints: map[string]config.ChainEndpoints{
"chainX": {
"ep1": config.Endpoint{Provider: "ep1", WSURL: upstreamWSURL, Role: "primary", Type: "full"},
},
},
}
valkeyClient := store.NewMockValkeyClient()
valkeyClient.PopulateStatuses(map[string]*store.EndpointStatus{
"chainX:ep1": {HasWS: true, HealthyWS: true},
})
srv := NewServer(cfg, valkeyClient, createTestConfig())

proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_ = srv.defaultProxyWebSocket(w, r, upstreamWSURL)
}))
t.Cleanup(proxy.Close)

proxyWSURL := "ws" + strings.TrimPrefix(proxy.URL, "http")
client, resp, err := websocket.DefaultDialer.Dial(proxyWSURL, nil)
if err != nil {
t.Fatalf("client dial failed: %v", err)
}
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
defer client.Close()

Comment thread
coderabbitai[bot] marked this conversation as resolved.
pongReceived := make(chan string, 1)
client.SetPongHandler(func(appData string) error {
select {
case pongReceived <- appData:
default:
}
return nil
})
// Read loop required to drive the pong handler.
go func() {
for {
if _, _, err := client.ReadMessage(); err != nil {
return
}
}
}()

payload := "abc-correlation-123"
if err := client.WriteControl(websocket.PingMessage,
[]byte(payload), time.Now().Add(time.Second)); err != nil {
t.Fatalf("client ping write failed: %v", err)
}

select {
case got := <-pingReceived:
if got != payload {
t.Errorf("upstream got ping payload %q, want %q", got, payload)
}
case <-time.After(2 * time.Second):
t.Fatal("upstream never received the forwarded ping (proxy is auto-ponging instead of forwarding)")
}

select {
case got := <-pongReceived:
if got != payload {
t.Errorf("client got pong payload %q, want %q", got, payload)
}
case <-time.After(2 * time.Second):
t.Fatal("client never received the forwarded pong from upstream")
}
}

// TestMarkEndpointUnhealthy_HTTP tests marking an endpoint unhealthy for HTTP.
func TestMarkEndpointUnhealthy_HTTP(t *testing.T) {
cfg := &config.Config{
Expand Down