From eb79dc6e960a91b96091815113f056adf731ff5e Mon Sep 17 00:00:00 2001 From: Larry Li Date: Thu, 26 Mar 2026 16:56:59 -0700 Subject: [PATCH 1/3] use metrics.RPCClientMetrics in multinode --- multinode/go.mod | 2 +- multinode/go.sum | 4 +- multinode/rpc_client_base.go | 47 +++++++++- multinode/rpc_client_base_test.go | 139 +++++++++++++++++++++++++++++- 4 files changed, 185 insertions(+), 7 deletions(-) diff --git a/multinode/go.mod b/multinode/go.mod index c18efc1..fb1b203 100644 --- a/multinode/go.mod +++ b/multinode/go.mod @@ -7,7 +7,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_model v0.6.2 github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bfc7 - github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2 + github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260326180413-c69f27e37a13 github.com/stretchr/testify v1.11.1 go.uber.org/zap v1.27.1 ) diff --git a/multinode/go.sum b/multinode/go.sum index 459485f..ee25ebf 100644 --- a/multinode/go.sum +++ b/multinode/go.sum @@ -80,8 +80,8 @@ github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bf github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bfc7/go.mod h1:0ghbAr7tRO0tT5ZqBXhOyzgUO37tNNe33Yn0hskauVM= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2 h1:ysZjKH+BpWlQhF93kr/Lc668UlCvT9NjfcsGdZT19I8= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2/go.mod h1:jo+cUqNcHwN8IF7SInQNXDZ8qzBsyMpnLdYbDswviFc= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260326180413-c69f27e37a13 h1:Homq1KxVUoL1rEtEv1N+BL0JJdMdQcDBnJw53vn+/qY= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260326180413-c69f27e37a13/go.mod h1:HG/aei0MgBOpsyRLexdKGtOUO8yjSJO3iUu0Uu8KBm4= github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e h1:Hv9Mww35LrufCdM9wtS9yVi/rEWGI1UnjHbcKKU0nVY= github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e/go.mod h1:T4zH9R8R8lVWKfU7tUvYz2o2jMv1OpGCdpY2j2QZXzU= github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d h1:LokA9PoCNb8mm8mDT52c3RECPMRsGz1eCQORq+J3n74= diff --git a/multinode/rpc_client_base.go b/multinode/rpc_client_base.go index b4a886c..20eb978 100644 --- a/multinode/rpc_client_base.go +++ b/multinode/rpc_client_base.go @@ -9,6 +9,14 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + frameworkmetrics "github.com/smartcontractkit/chainlink-framework/metrics" +) + +var errInvalidHead = errors.New("invalid head") + +const ( + rpcCallNameLatestBlock = "latest_block" + rpcCallNameLatestFinalizedBlock = "latest_finalized_block" ) type RPCClientBaseConfig interface { @@ -16,6 +24,12 @@ type RPCClientBaseConfig interface { FinalizedBlockPollInterval() time.Duration } +type RPCClientBaseMetricsConfig struct { + RPCClientMetrics frameworkmetrics.RPCClientMetrics + RPCURL string + IsSendOnly bool +} + // RPCClientBase is used to integrate multinode into chain-specific clients. // For new MultiNode integrations, we wrap the RPC client and inherit from the RPCClientBase // to get the required RPCClient methods and enable the use of MultiNode. @@ -46,14 +60,19 @@ type RPCClientBase[HEAD Head] struct { highestUserObservations ChainInfo // most recent chain info observed during current lifecycle latestChainInfo ChainInfo + + rpcMetrics frameworkmetrics.RPCClientMetrics + rpcURL string + isSendOnly bool } func NewRPCClientBase[HEAD Head]( cfg RPCClientBaseConfig, ctxTimeout time.Duration, log logger.Logger, latestBlock func(ctx context.Context) (HEAD, error), latestFinalizedBlock func(ctx context.Context) (HEAD, error), + rpcMetrics *RPCClientBaseMetricsConfig, ) *RPCClientBase[HEAD] { - return &RPCClientBase[HEAD]{ + base := &RPCClientBase[HEAD]{ cfg: cfg, log: log, ctxTimeout: ctxTimeout, @@ -62,6 +81,12 @@ func NewRPCClientBase[HEAD Head]( subs: make(map[Subscription]struct{}), lifeCycleCh: make(chan struct{}), } + if rpcMetrics != nil { + base.rpcMetrics = rpcMetrics.RPCClientMetrics + base.rpcURL = rpcMetrics.RPCURL + base.isSendOnly = rpcMetrics.IsSendOnly + } + return base } func (m *RPCClientBase[HEAD]) lenSubs() int { @@ -155,16 +180,20 @@ func (m *RPCClientBase[HEAD]) LatestBlock(ctx context.Context) (HEAD, error) { // capture lifeCycleCh to ensure we are not updating chainInfo with observations related to previous life cycle ctx, cancel, lifeCycleCh := m.AcquireQueryCtx(ctx, m.ctxTimeout) defer cancel() + start := time.Now() head, err := m.latestBlock(ctx) if err != nil { + m.recordRPCRequest(ctx, rpcCallNameLatestBlock, start, err) return head, err } if !head.IsValid() { - return head, errors.New("invalid head") + m.recordRPCRequest(ctx, rpcCallNameLatestBlock, start, errInvalidHead) + return head, errInvalidHead } + m.recordRPCRequest(ctx, rpcCallNameLatestBlock, start, nil) m.OnNewHead(ctx, lifeCycleCh, head) return head, nil } @@ -172,20 +201,32 @@ func (m *RPCClientBase[HEAD]) LatestBlock(ctx context.Context) (HEAD, error) { func (m *RPCClientBase[HEAD]) LatestFinalizedBlock(ctx context.Context) (HEAD, error) { ctx, cancel, lifeCycleCh := m.AcquireQueryCtx(ctx, m.ctxTimeout) defer cancel() + start := time.Now() head, err := m.latestFinalizedBlock(ctx) if err != nil { + m.recordRPCRequest(ctx, rpcCallNameLatestFinalizedBlock, start, err) return head, err } if !head.IsValid() { - return head, errors.New("invalid head") + m.recordRPCRequest(ctx, rpcCallNameLatestFinalizedBlock, start, errInvalidHead) + return head, errInvalidHead } + m.recordRPCRequest(ctx, rpcCallNameLatestFinalizedBlock, start, nil) m.OnNewFinalizedHead(ctx, lifeCycleCh, head) return head, nil } +func (m *RPCClientBase[HEAD]) recordRPCRequest(ctx context.Context, callName string, startedAt time.Time, err error) { + if m.rpcMetrics == nil { + return + } + + m.rpcMetrics.RecordRequest(ctx, m.rpcURL, m.isSendOnly, callName, time.Since(startedAt), err) +} + func (m *RPCClientBase[HEAD]) OnNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) { if !head.IsValid() { return diff --git a/multinode/rpc_client_base_test.go b/multinode/rpc_client_base_test.go index 25afd4d..8bd3aa0 100644 --- a/multinode/rpc_client_base_test.go +++ b/multinode/rpc_client_base_test.go @@ -2,6 +2,7 @@ package multinode import ( "context" + "errors" "math/big" "testing" "time" @@ -11,6 +12,7 @@ import ( common "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + frameworkmetrics "github.com/smartcontractkit/chainlink-framework/metrics" "github.com/smartcontractkit/chainlink-framework/multinode/config" ) @@ -67,11 +69,35 @@ func newTestRPC(t *testing.T) *testRPC { } rpc := &testRPC{} - rpc.RPCClientBase = NewRPCClientBase[*testHead](cfg, requestTimeout, lggr, rpc.latestBlock, rpc.latestBlock) + rpc.RPCClientBase = NewRPCClientBase[*testHead](cfg, requestTimeout, lggr, rpc.latestBlock, rpc.latestBlock, nil) t.Cleanup(rpc.Close) return rpc } +type recordedRPCRequest struct { + rpcURL string + isSendOnly bool + callName string + latency time.Duration + err error +} + +type spyRPCClientMetrics struct { + requests []recordedRPCRequest +} + +var _ frameworkmetrics.RPCClientMetrics = (*spyRPCClientMetrics)(nil) + +func (s *spyRPCClientMetrics) RecordRequest(_ context.Context, rpcURL string, isSendOnly bool, callName string, latency time.Duration, err error) { + s.requests = append(s.requests, recordedRPCRequest{ + rpcURL: rpcURL, + isSendOnly: isSendOnly, + callName: callName, + latency: latency, + err: err, + }) +} + func TestAdapter_LatestBlock(t *testing.T) { t.Run("LatestBlock", func(t *testing.T) { rpc := newTestRPC(t) @@ -100,6 +126,117 @@ func TestAdapter_LatestBlock(t *testing.T) { }) } +func TestRPCClientBase_RecordsRPCMetrics(t *testing.T) { + requestTimeout := 5 * time.Second + lggr := logger.Test(t) + cfg := &config.MultiNodeConfig{ + MultiNode: config.MultiNode{ + Enabled: ptr(true), + PollFailureThreshold: ptr(uint32(5)), + PollInterval: common.MustNewDuration(15 * time.Second), + SelectionMode: ptr(NodeSelectionModePriorityLevel), + SyncThreshold: ptr(uint32(10)), + LeaseDuration: common.MustNewDuration(time.Minute), + NodeIsSyncingEnabled: ptr(false), + NewHeadsPollInterval: common.MustNewDuration(5 * time.Second), + FinalizedBlockPollInterval: common.MustNewDuration(5 * time.Second), + EnforceRepeatableRead: ptr(true), + DeathDeclarationDelay: common.MustNewDuration(20 * time.Second), + NodeNoNewHeadsThreshold: common.MustNewDuration(20 * time.Second), + NoNewFinalizedHeadsThreshold: common.MustNewDuration(20 * time.Second), + FinalityTagEnabled: ptr(true), + FinalityDepth: ptr(uint32(0)), + FinalizedBlockOffset: ptr(uint32(50)), + }, + } + + t.Run("records successful latest block requests", func(t *testing.T) { + spy := &spyRPCClientMetrics{} + rpc := NewRPCClientBase[*testHead]( + cfg, + requestTimeout, + lggr, + func(context.Context) (*testHead, error) { + return &testHead{blockNumber: 7}, nil + }, + func(context.Context) (*testHead, error) { + return &testHead{blockNumber: 8}, nil + }, + &RPCClientBaseMetricsConfig{ + RPCClientMetrics: spy, + RPCURL: "http://primary.test", + IsSendOnly: false, + }, + ) + + head, err := rpc.LatestBlock(tests.Context(t)) + require.NoError(t, err) + require.Equal(t, int64(7), head.BlockNumber()) + require.Len(t, spy.requests, 1) + require.Equal(t, "http://primary.test", spy.requests[0].rpcURL) + require.False(t, spy.requests[0].isSendOnly) + require.Equal(t, rpcCallNameLatestBlock, spy.requests[0].callName) + require.NoError(t, spy.requests[0].err) + require.Positive(t, spy.requests[0].latency) + }) + + t.Run("records failed finalized block requests", func(t *testing.T) { + spy := &spyRPCClientMetrics{} + expectedErr := errors.New("boom") + rpc := NewRPCClientBase[*testHead]( + cfg, + requestTimeout, + lggr, + func(context.Context) (*testHead, error) { + return &testHead{blockNumber: 7}, nil + }, + func(context.Context) (*testHead, error) { + return nil, expectedErr + }, + &RPCClientBaseMetricsConfig{ + RPCClientMetrics: spy, + RPCURL: "http://sendonly.test", + IsSendOnly: true, + }, + ) + + _, err := rpc.LatestFinalizedBlock(tests.Context(t)) + require.ErrorIs(t, err, expectedErr) + require.Len(t, spy.requests, 1) + require.Equal(t, "http://sendonly.test", spy.requests[0].rpcURL) + require.True(t, spy.requests[0].isSendOnly) + require.Equal(t, rpcCallNameLatestFinalizedBlock, spy.requests[0].callName) + require.ErrorIs(t, spy.requests[0].err, expectedErr) + require.Positive(t, spy.requests[0].latency) + }) + + t.Run("records invalid heads as failed requests", func(t *testing.T) { + spy := &spyRPCClientMetrics{} + rpc := NewRPCClientBase[*testHead]( + cfg, + requestTimeout, + lggr, + func(context.Context) (*testHead, error) { + return &testHead{}, nil + }, + func(context.Context) (*testHead, error) { + return &testHead{blockNumber: 8}, nil + }, + &RPCClientBaseMetricsConfig{ + RPCClientMetrics: spy, + RPCURL: "http://invalid.test", + IsSendOnly: false, + }, + ) + + _, err := rpc.LatestBlock(tests.Context(t)) + require.ErrorIs(t, err, errInvalidHead) + require.Len(t, spy.requests, 1) + require.Equal(t, rpcCallNameLatestBlock, spy.requests[0].callName) + require.ErrorIs(t, spy.requests[0].err, errInvalidHead) + }) +} + func TestAdapter_OnNewHeadFunctions(t *testing.T) { timeout := 10 * time.Second t.Run("OnNewHead and OnNewFinalizedHead updates chain info", func(t *testing.T) { From 77245f905242f63fed9122341fe77edcde869d41 Mon Sep 17 00:00:00 2001 From: Larry Li Date: Thu, 26 Mar 2026 17:06:54 -0700 Subject: [PATCH 2/3] update --- multinode/rpc_client_base_test.go | 33 +++++++++++++++---------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/multinode/rpc_client_base_test.go b/multinode/rpc_client_base_test.go index 8bd3aa0..02e38f7 100644 --- a/multinode/rpc_client_base_test.go +++ b/multinode/rpc_client_base_test.go @@ -11,7 +11,6 @@ import ( common "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" frameworkmetrics "github.com/smartcontractkit/chainlink-framework/metrics" "github.com/smartcontractkit/chainlink-framework/multinode/config" ) @@ -104,7 +103,7 @@ func TestAdapter_LatestBlock(t *testing.T) { latestChainInfo, highestChainInfo := rpc.GetInterceptedChainInfo() require.Equal(t, int64(0), latestChainInfo.BlockNumber) require.Equal(t, int64(0), highestChainInfo.BlockNumber) - head, err := rpc.LatestBlock(tests.Context(t)) + head, err := rpc.LatestBlock(t.Context()) require.NoError(t, err) require.True(t, head.IsValid()) latestChainInfo, highestChainInfo = rpc.GetInterceptedChainInfo() @@ -117,7 +116,7 @@ func TestAdapter_LatestBlock(t *testing.T) { latestChainInfo, highestChainInfo := rpc.GetInterceptedChainInfo() require.Equal(t, int64(0), latestChainInfo.FinalizedBlockNumber) require.Equal(t, int64(0), highestChainInfo.FinalizedBlockNumber) - finalizedHead, err := rpc.LatestFinalizedBlock(tests.Context(t)) + finalizedHead, err := rpc.LatestFinalizedBlock(t.Context()) require.NoError(t, err) require.True(t, finalizedHead.IsValid()) latestChainInfo, highestChainInfo = rpc.GetInterceptedChainInfo() @@ -169,7 +168,7 @@ func TestRPCClientBase_RecordsRPCMetrics(t *testing.T) { }, ) - head, err := rpc.LatestBlock(tests.Context(t)) + head, err := rpc.LatestBlock(t.Context()) require.NoError(t, err) require.Equal(t, int64(7), head.BlockNumber()) require.Len(t, spy.requests, 1) @@ -200,7 +199,7 @@ func TestRPCClientBase_RecordsRPCMetrics(t *testing.T) { }, ) - _, err := rpc.LatestFinalizedBlock(tests.Context(t)) + _, err := rpc.LatestFinalizedBlock(t.Context()) require.ErrorIs(t, err, expectedErr) require.Len(t, spy.requests, 1) require.Equal(t, "http://sendonly.test", spy.requests[0].rpcURL) @@ -229,7 +228,7 @@ func TestRPCClientBase_RecordsRPCMetrics(t *testing.T) { }, ) - _, err := rpc.LatestBlock(tests.Context(t)) + _, err := rpc.LatestBlock(t.Context()) require.ErrorIs(t, err, errInvalidHead) require.Len(t, spy.requests, 1) require.Equal(t, rpcCallNameLatestBlock, spy.requests[0].callName) @@ -247,7 +246,7 @@ func TestAdapter_OnNewHeadFunctions(t *testing.T) { require.Equal(t, int64(0), highestChainInfo.BlockNumber) require.Equal(t, int64(0), highestChainInfo.FinalizedBlockNumber) - ctx, cancel, lifeCycleCh := rpc.AcquireQueryCtx(tests.Context(t), timeout) + ctx, cancel, lifeCycleCh := rpc.AcquireQueryCtx(t.Context(), timeout) defer cancel() rpc.OnNewHead(ctx, lifeCycleCh, &testHead{blockNumber: 10}) rpc.OnNewFinalizedHead(ctx, lifeCycleCh, &testHead{blockNumber: 3}) @@ -269,7 +268,7 @@ func TestAdapter_OnNewHeadFunctions(t *testing.T) { require.Equal(t, int64(0), highestChainInfo.BlockNumber) require.Equal(t, int64(0), highestChainInfo.FinalizedBlockNumber) - healthCheckCtx := CtxAddHealthCheckFlag(tests.Context(t)) + healthCheckCtx := CtxAddHealthCheckFlag(t.Context()) ctx, cancel, lifeCycleCh := rpc.AcquireQueryCtx(healthCheckCtx, timeout) defer cancel() @@ -295,7 +294,7 @@ func TestAdapter_OnNewHeadFunctions(t *testing.T) { require.Equal(t, int64(0), highestChainInfo.BlockNumber) require.Equal(t, int64(0), highestChainInfo.FinalizedBlockNumber) - ctx, cancel, lifeCycleCh := rpc.AcquireQueryCtx(tests.Context(t), timeout) + ctx, cancel, lifeCycleCh := rpc.AcquireQueryCtx(t.Context(), timeout) defer cancel() rpc.CancelLifeCycle() @@ -317,11 +316,11 @@ func TestAdapter_OnNewHeadFunctions(t *testing.T) { func TestAdapter_HeadSubscriptions(t *testing.T) { t.Run("SubscribeToHeads", func(t *testing.T) { rpc := newTestRPC(t) - ch, sub, err := rpc.SubscribeToHeads(tests.Context(t)) + ch, sub, err := rpc.SubscribeToHeads(t.Context()) require.NoError(t, err) defer sub.Unsubscribe() - ctx, cancel := context.WithTimeout(tests.Context(t), time.Minute) + ctx, cancel := context.WithTimeout(t.Context(), time.Minute) defer cancel() select { case head := <-ch: @@ -334,11 +333,11 @@ func TestAdapter_HeadSubscriptions(t *testing.T) { t.Run("SubscribeToFinalizedHeads", func(t *testing.T) { rpc := newTestRPC(t) - finalizedCh, finalizedSub, err := rpc.SubscribeToFinalizedHeads(tests.Context(t)) + finalizedCh, finalizedSub, err := rpc.SubscribeToFinalizedHeads(t.Context()) require.NoError(t, err) defer finalizedSub.Unsubscribe() - ctx, cancel := context.WithTimeout(tests.Context(t), time.Minute) + ctx, cancel := context.WithTimeout(t.Context(), time.Minute) defer cancel() select { case finalizedHead := <-finalizedCh: @@ -351,10 +350,10 @@ func TestAdapter_HeadSubscriptions(t *testing.T) { t.Run("Remove Subscription on Unsubscribe", func(t *testing.T) { rpc := newTestRPC(t) - _, sub1, err := rpc.SubscribeToHeads(tests.Context(t)) + _, sub1, err := rpc.SubscribeToHeads(t.Context()) require.NoError(t, err) require.Equal(t, 1, rpc.lenSubs()) - _, sub2, err := rpc.SubscribeToFinalizedHeads(tests.Context(t)) + _, sub2, err := rpc.SubscribeToFinalizedHeads(t.Context()) require.NoError(t, err) require.Equal(t, 2, rpc.lenSubs()) @@ -366,10 +365,10 @@ func TestAdapter_HeadSubscriptions(t *testing.T) { t.Run("Ensure no deadlock on UnsubscribeAll", func(t *testing.T) { rpc := newTestRPC(t) - _, _, err := rpc.SubscribeToHeads(tests.Context(t)) + _, _, err := rpc.SubscribeToHeads(t.Context()) require.NoError(t, err) require.Equal(t, 1, rpc.lenSubs()) - _, _, err = rpc.SubscribeToFinalizedHeads(tests.Context(t)) + _, _, err = rpc.SubscribeToFinalizedHeads(t.Context()) require.NoError(t, err) require.Equal(t, 2, rpc.lenSubs()) rpc.UnsubscribeAllExcept() From 3bc35104e5a6fd96fb98da26314acac3df3585be Mon Sep 17 00:00:00 2001 From: Larry Li Date: Thu, 26 Mar 2026 22:08:59 -0700 Subject: [PATCH 3/3] update --- multinode/rpc_client_base_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/multinode/rpc_client_base_test.go b/multinode/rpc_client_base_test.go index 02e38f7..a81979a 100644 --- a/multinode/rpc_client_base_test.go +++ b/multinode/rpc_client_base_test.go @@ -37,7 +37,7 @@ type testHead struct { func (t *testHead) BlockNumber() int64 { return t.blockNumber } func (t *testHead) BlockDifficulty() *big.Int { return nil } func (t *testHead) GetTotalDifficulty() *big.Int { return nil } -func (t *testHead) IsValid() bool { return true } +func (t *testHead) IsValid() bool { return t != nil && t.blockNumber > 0 } func ptr[T any](t T) *T { return &t