diff --git a/CONFIG.md b/CONFIG.md index c5f7c0e55c..2396777fb1 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -390,6 +390,7 @@ Enabled = false # Default BlockTime = '10s' # Example CustomURL = 'https://example.api.io' # Example DualBroadcast = false # Example +ReadRequestsToMultipleNodes = false # Example Bundles = false # Example FastlaneAuctionRequestTimeout = '5s' # Example ``` @@ -419,6 +420,12 @@ DualBroadcast = false # Example ``` DualBroadcast enables DualBroadcast functionality. +### ReadRequestsToMultipleNodes +```toml +ReadRequestsToMultipleNodes = false # Example +``` +ReadRequestsToMultipleNodes controls whether txm chain client reads use multiplexed calls. + ### Bundles ```toml Bundles = false # Example diff --git a/pkg/client/chain_client.go b/pkg/client/chain_client.go index 20c11b8bf8..3d234ef279 100644 --- a/pkg/client/chain_client.go +++ b/pkg/client/chain_client.go @@ -2,6 +2,8 @@ package client import ( "context" + "errors" + "fmt" "math/big" "sync" "time" @@ -74,7 +76,9 @@ type Client interface { CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) PendingCodeAt(ctx context.Context, account common.Address) ([]byte, error) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) + PendingNonceAtWithFallback(ctx context.Context, account common.Address) (uint64, error) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) + NonceAtWithFallback(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) TransactionByHash(ctx context.Context, txHash common.Hash) (*types.Transaction, error) TransactionByHashWithOpts(ctx context.Context, txHash common.Hash, opts evmtypes.TransactionByHashOpts) (*types.Transaction, error) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) @@ -260,6 +264,86 @@ func (c *chainClient) CallContext(ctx context.Context, result interface{}, metho return r.CallContext(ctx, result, method, args...) } +func (c *chainClient) PendingNonceAtWithFallback(parentCtx context.Context, account common.Address) (uint64, error) { + return c.nonceAtWithFallback(parentCtx, func(ctx context.Context, rpc *RPCClient) (uint64, error) { + n, err := rpc.PendingSequenceAt(ctx, account) + return uint64(n), err + }) +} + +func (c *chainClient) NonceAtWithFallback(parentCtx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { + return c.nonceAtWithFallback(parentCtx, func(ctx context.Context, rpc *RPCClient) (uint64, error) { + return rpc.NonceAt(ctx, account, blockNumber) + }) +} + +// nonceAtWithFallback is a helper function that makes a call to the main node and if it fails, it makes a call to all other alive nodes in parallel. +func (c *chainClient) nonceAtWithFallback(parentCtx context.Context, call func(context.Context, *RPCClient) (uint64, error)) (uint64, error) { + // try main node + main, err := c.multiNode.SelectRPC(parentCtx) + if err != nil { + return 0, err + } + + nonce, err := call(parentCtx, main) + if err == nil { + return nonce, nil + } + // if main node fails, collect the error and continue with the fallback calls + mainErr := fmt.Errorf("%s: %w", main.Name(), err) + + ctx, cancel := context.WithCancel(parentCtx) + defer cancel() + + type result struct { + nonce uint64 + err error + } + results := make(chan result) + scheduled := 0 + + doFunc := func(ctx context.Context, rpc *RPCClient, isSendOnly bool) { + if isSendOnly || rpc == main { + return + } + + scheduled++ + go func(rpc *RPCClient) { + nonce, err := call(ctx, rpc) + if err != nil { + err = fmt.Errorf("%s: %w", rpc.Name(), err) + c.logger.Debugw("Fallback nonce call failed", "rpc", rpc.Name(), "err", err) + } else { + c.logger.Debugw("Fallback nonce call succeeded", "rpc", rpc.Name(), "nonce", nonce) + } + + select { + case results <- result{nonce: nonce, err: err}: + case <-ctx.Done(): + } + }(rpc) + } + + if doErr := c.multiNode.DoAll(ctx, doFunc); doErr != nil { + return 0, doErr + } + + errs := []error{mainErr} + for range scheduled { + select { + case <-ctx.Done(): + return 0, errors.Join(append(errs, ctx.Err())...) + case res := <-results: + if res.err == nil { + return res.nonce, nil + } + errs = append(errs, res.err) + } + } + + return 0, errors.Join(errs...) +} + func (c *chainClient) CallContract(ctx context.Context, msg ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { r, err := c.multiNode.SelectRPC(ctx) if err != nil { diff --git a/pkg/client/chain_client_test.go b/pkg/client/chain_client_test.go index 250d4e0f22..57c7fc3c6f 100644 --- a/pkg/client/chain_client_test.go +++ b/pkg/client/chain_client_test.go @@ -23,8 +23,11 @@ import ( "github.com/stretchr/testify/require" "github.com/tidwall/gjson" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/smartcontractkit/chainlink-framework/metrics" "github.com/smartcontractkit/chainlink-framework/multinode" + "github.com/smartcontractkit/chainlink-framework/multinode/mocks" "github.com/smartcontractkit/chainlink-evm/pkg/client" "github.com/smartcontractkit/chainlink-evm/pkg/testutils" @@ -960,4 +963,180 @@ func TestEthClient_ErroringClient(t *testing.T) { require.Equal(t, multinode.ErrNodeError, err) } +func TestChainClient_NonceAtWithFallback(t *testing.T) { + t.Parallel() + + t.Run("main succeeds without fallback", func(t *testing.T) { + t.Parallel() + + mainCalls := atomic.Int32{} + fallbackCalls := atomic.Int32{} + + ethClient := mustNewChainClientWithTestNodes(t, + nonceServer(t, nonceResponse{result: `"0x5"`, calls: &mainCalls}), + nonceServer(t, nonceResponse{result: `"0x6"`, calls: &fallbackCalls}), + ) + + nonce, err := ethClient.NonceAtWithFallback(t.Context(), testutils.NewAddress(), nil) + require.NoError(t, err) + require.Equal(t, uint64(5), nonce) + require.Equal(t, int32(1), mainCalls.Load()) + require.Equal(t, int32(0), fallbackCalls.Load()) + }) + + t.Run("fallback returns first success", func(t *testing.T) { + t.Parallel() + + mainCalls := atomic.Int32{} + fastFallbackCalls := atomic.Int32{} + + ethClient := mustNewChainClientWithTestNodes(t, + nonceServer(t, nonceResponse{code: -32000, message: "main failed", calls: &mainCalls}), + nonceServer(t, nonceResponse{delay: 200 * time.Millisecond, result: `"0x9"`}), + nonceServer(t, nonceResponse{delay: 25 * time.Millisecond, result: `"0xa"`, calls: &fastFallbackCalls}), + ) + + startedAt := time.Now() + nonce, err := ethClient.NonceAtWithFallback(t.Context(), testutils.NewAddress(), nil) + duration := time.Since(startedAt) + require.NoError(t, err) + require.Equal(t, uint64(10), nonce) + require.Equal(t, int32(1), mainCalls.Load()) + require.Equal(t, int32(1), fastFallbackCalls.Load()) + require.Less(t, duration, 100*time.Millisecond) + }) + + t.Run("all primaries fail", func(t *testing.T) { + t.Parallel() + + ethClient := mustNewChainClientWithTestNodes(t, + nonceServer(t, nonceResponse{code: -32000, message: "main failed"}), + nonceServer(t, nonceResponse{code: -32001, message: "fallback one failed"}), + nonceServer(t, nonceResponse{code: -32002, message: "fallback two failed"}), + ) + + _, err := ethClient.PendingNonceAtWithFallback(t.Context(), testutils.NewAddress()) + require.Error(t, err) + requireErrorContainsAll(t, err, + "eth-primary-rpc-0", "main failed", + "eth-primary-rpc-1", "fallback one failed", + "eth-primary-rpc-2", "fallback two failed", + ) + }) + + t.Run("context canceled", func(t *testing.T) { + t.Parallel() + + ethClient := mustNewChainClientWithTestNodes(t, + nonceServer(t, nonceResponse{delay: 100 * time.Millisecond, result: `"0x5"`}), + ) + + ctx, cancel := context.WithCancel(tests.Context(t)) + cancel() + + _, err := ethClient.NonceAtWithFallback(ctx, testutils.NewAddress(), nil) + require.Error(t, err) + require.ErrorIs(t, err, context.Canceled) + }) +} + +func mustNewChainClientWithTestNodes(t *testing.T, wsURLs ...string) client.Client { + t.Helper() + + chainID := testutils.FixtureChainID + lggr := logger.Test(t) + metricsRecorder, err := metrics.NewGenericMultiNodeMetrics("EVM Test", chainID.String()) + require.NoError(t, err) + + cfg := client.TestNodePoolConfig{NodeSelectionMode: multinode.NodeSelectionModeRoundRobin} + primaries := make([]multinode.Node[*big.Int, *client.RPCClient], 0, len(wsURLs)) + for i, wsURL := range wsURLs { + parsed, err := url.ParseRequestURI(wsURL) + require.NoError(t, err) + + rpc := client.NewTestRPCClient(t, client.RPCClientOpts{ + Cfg: cfg, + WS: parsed, + Name: "eth-primary-rpc-" + big.NewInt(int64(i)).String(), + ID: i + 1, + ChainID: chainID, + Tier: multinode.Primary, + }) + + node := multinode.NewNode( + cfg, + mocks.ChainConfig{NoNewHeadsThresholdVal: 0}, + lggr, + metricsRecorder, + parsed, + nil, + "eth-primary-node-"+big.NewInt(int64(i)).String(), + i+1, + chainID, + 1, + rpc, + "EVM", + false, + ) + primaries = append(primaries, node) + } + + clientErrors := client.NewTestClientErrors() + ethClient := client.NewChainClient(lggr, metricsRecorder, cfg.SelectionMode(), 0, primaries, nil, chainID, &clientErrors, 0, "") + t.Cleanup(ethClient.Close) + require.NoError(t, ethClient.Dial(t.Context())) + require.Eventually(t, func() bool { + alive := 0 + for _, state := range ethClient.NodeStates() { + if state == "Alive" { + alive++ + } + } + return alive == len(wsURLs) + }, time.Minute, time.Second) + + return ethClient +} + +type nonceResponse struct { + delay time.Duration + result string + code int + message string + calls *atomic.Int32 +} + +func nonceServer(t *testing.T, respCfg nonceResponse) string { + t.Helper() + return testutils.NewWSServer(t, testutils.FixtureChainID, func(method string, params gjson.Result) (resp testutils.JSONRPCResponse) { + switch method { + case "eth_subscribe": + resp.Result = `"0x00"` + resp.Notify = headResult + case "eth_unsubscribe": + resp.Result = "true" + case "eth_getBlockByNumber": + resp.Result = client.MakeHeadMsgForNumber(42) + case "eth_getTransactionCount": + if respCfg.calls != nil { + respCfg.calls.Add(1) + } + if respCfg.delay > 0 { + time.Sleep(respCfg.delay) + } + resp.Result = respCfg.result + resp.Error.Code = respCfg.code + resp.Error.Message = respCfg.message + } + return + }).WSURL().String() +} + +func requireErrorContainsAll(t *testing.T, err error, parts ...string) { + t.Helper() + for _, part := range parts { + require.ErrorContains(t, err, part) + } +} + const headResult = client.HeadResult diff --git a/pkg/client/clienttest/client.go b/pkg/client/clienttest/client.go index af2f68981f..8ea52797af 100644 --- a/pkg/client/clienttest/client.go +++ b/pkg/client/clienttest/client.go @@ -1705,6 +1705,64 @@ func (_c *Client_NonceAt_Call) RunAndReturn(run func(context.Context, common.Add return _c } +// NonceAtWithFallback provides a mock function with given fields: ctx, account, blockNumber +func (_m *Client) NonceAtWithFallback(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { + ret := _m.Called(ctx, account, blockNumber) + + if len(ret) == 0 { + panic("no return value specified for NonceAtWithFallback") + } + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int) (uint64, error)); ok { + return rf(ctx, account, blockNumber) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int) uint64); ok { + r0 = rf(ctx, account, blockNumber) + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Address, *big.Int) error); ok { + r1 = rf(ctx, account, blockNumber) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Client_NonceAtWithFallback_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NonceAtWithFallback' +type Client_NonceAtWithFallback_Call struct { + *mock.Call +} + +// NonceAtWithFallback is a helper method to define mock.On call +// - ctx context.Context +// - account common.Address +// - blockNumber *big.Int +func (_e *Client_Expecter) NonceAtWithFallback(ctx interface{}, account interface{}, blockNumber interface{}) *Client_NonceAtWithFallback_Call { + return &Client_NonceAtWithFallback_Call{Call: _e.mock.On("NonceAtWithFallback", ctx, account, blockNumber)} +} + +func (_c *Client_NonceAtWithFallback_Call) Run(run func(ctx context.Context, account common.Address, blockNumber *big.Int)) *Client_NonceAtWithFallback_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Address), args[2].(*big.Int)) + }) + return _c +} + +func (_c *Client_NonceAtWithFallback_Call) Return(_a0 uint64, _a1 error) *Client_NonceAtWithFallback_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Client_NonceAtWithFallback_Call) RunAndReturn(run func(context.Context, common.Address, *big.Int) (uint64, error)) *Client_NonceAtWithFallback_Call { + _c.Call.Return(run) + return _c +} + // PendingCallContract provides a mock function with given fields: ctx, msg func (_m *Client) PendingCallContract(ctx context.Context, msg ethereum.CallMsg) ([]byte, error) { ret := _m.Called(ctx, msg) @@ -1880,6 +1938,63 @@ func (_c *Client_PendingNonceAt_Call) RunAndReturn(run func(context.Context, com return _c } +// PendingNonceAtWithFallback provides a mock function with given fields: ctx, account +func (_m *Client) PendingNonceAtWithFallback(ctx context.Context, account common.Address) (uint64, error) { + ret := _m.Called(ctx, account) + + if len(ret) == 0 { + panic("no return value specified for PendingNonceAtWithFallback") + } + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, common.Address) (uint64, error)); ok { + return rf(ctx, account) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Address) uint64); ok { + r0 = rf(ctx, account) + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Address) error); ok { + r1 = rf(ctx, account) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Client_PendingNonceAtWithFallback_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PendingNonceAtWithFallback' +type Client_PendingNonceAtWithFallback_Call struct { + *mock.Call +} + +// PendingNonceAtWithFallback is a helper method to define mock.On call +// - ctx context.Context +// - account common.Address +func (_e *Client_Expecter) PendingNonceAtWithFallback(ctx interface{}, account interface{}) *Client_PendingNonceAtWithFallback_Call { + return &Client_PendingNonceAtWithFallback_Call{Call: _e.mock.On("PendingNonceAtWithFallback", ctx, account)} +} + +func (_c *Client_PendingNonceAtWithFallback_Call) Run(run func(ctx context.Context, account common.Address)) *Client_PendingNonceAtWithFallback_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Address)) + }) + return _c +} + +func (_c *Client_PendingNonceAtWithFallback_Call) Return(_a0 uint64, _a1 error) *Client_PendingNonceAtWithFallback_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Client_PendingNonceAtWithFallback_Call) RunAndReturn(run func(context.Context, common.Address) (uint64, error)) *Client_PendingNonceAtWithFallback_Call { + _c.Call.Return(run) + return _c +} + // SendTransaction provides a mock function with given fields: ctx, tx func (_m *Client) SendTransaction(ctx context.Context, tx *coretypes.Transaction) error { ret := _m.Called(ctx, tx) diff --git a/pkg/client/null_client.go b/pkg/client/null_client.go index 06cd56db9a..c8f6b17bc7 100644 --- a/pkg/client/null_client.go +++ b/pkg/client/null_client.go @@ -149,11 +149,21 @@ func (nc *NullClient) PendingNonceAt(ctx context.Context, account common.Address return 0, nil } +func (nc *NullClient) PendingNonceAtWithFallback(ctx context.Context, account common.Address) (uint64, error) { + nc.lggr.Debug("PendingNonceAtWithFallback") + return 0, nil +} + func (nc *NullClient) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { nc.lggr.Debug("NonceAt") return 0, nil } +func (nc *NullClient) NonceAtWithFallback(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { + nc.lggr.Debug("NonceAtWithFallback") + return 0, nil +} + func (nc *NullClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { nc.lggr.Debug("TransactionReceipt") return nil, nil diff --git a/pkg/client/simulated_backend_client.go b/pkg/client/simulated_backend_client.go index 123b13442c..fb60b2edcb 100644 --- a/pkg/client/simulated_backend_client.go +++ b/pkg/client/simulated_backend_client.go @@ -130,6 +130,16 @@ func (c *SimulatedBackendClient) CallContext(ctx context.Context, result interfa } } +func (c *SimulatedBackendClient) PendingNonceAtWithFallback(ctx context.Context, account common.Address) (uint64, error) { + nonce, err := c.PendingNonceAt(ctx, account) + return nonce, err +} + +func (c *SimulatedBackendClient) NonceAtWithFallback(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { + nonce, err := c.NonceAt(ctx, account, blockNumber) + return nonce, err +} + // FilterLogs returns all logs that respect the passed filter query. func (c *SimulatedBackendClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error) { logs, err = c.client.FilterLogs(ctx, q) diff --git a/pkg/config/chain_scoped_transactions.go b/pkg/config/chain_scoped_transactions.go index 2e9d520baa..5206909b12 100644 --- a/pkg/config/chain_scoped_transactions.go +++ b/pkg/config/chain_scoped_transactions.go @@ -64,6 +64,10 @@ func (t *transactionManagerV2Config) DualBroadcast() *bool { return t.c.DualBroadcast } +func (t *transactionManagerV2Config) ReadRequestsToMultipleNodes() *bool { + return t.c.ReadRequestsToMultipleNodes +} + func (t *transactionManagerV2Config) Bundles() *bool { return t.c.Bundles } diff --git a/pkg/config/config.go b/pkg/config/config.go index bdec19f9c4..b3fd2799d3 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -133,6 +133,7 @@ type TransactionManagerV2 interface { BlockTime() *time.Duration CustomURL() *url.URL DualBroadcast() *bool + ReadRequestsToMultipleNodes() *bool Bundles() *bool FastlaneAuctionRequestTimeout() *time.Duration } diff --git a/pkg/config/toml/config.go b/pkg/config/toml/config.go index a43c65a6f6..ca83e4c297 100644 --- a/pkg/config/toml/config.go +++ b/pkg/config/toml/config.go @@ -589,6 +589,7 @@ type TransactionManagerV2Config struct { BlockTime *commonconfig.Duration `toml:",omitempty"` CustomURL *commonconfig.URL `toml:",omitempty"` DualBroadcast *bool `toml:",omitempty"` + ReadRequestsToMultipleNodes *bool `toml:",omitempty"` Bundles *bool `toml:",omitempty"` FastlaneAuctionRequestTimeout *commonconfig.Duration `toml:",omitempty"` } @@ -606,6 +607,9 @@ func (t *TransactionManagerV2Config) setFrom(f *TransactionManagerV2Config) { if v := f.DualBroadcast; v != nil { t.DualBroadcast = f.DualBroadcast } + if v := f.ReadRequestsToMultipleNodes; v != nil { + t.ReadRequestsToMultipleNodes = f.ReadRequestsToMultipleNodes + } if v := f.Bundles; v != nil { t.Bundles = f.Bundles } diff --git a/pkg/config/toml/config_test.go b/pkg/config/toml/config_test.go index f71423f289..c833ff9fb0 100644 --- a/pkg/config/toml/config_test.go +++ b/pkg/config/toml/config_test.go @@ -64,6 +64,7 @@ func TestDefaults_fieldsNotNil(t *testing.T) { unknown.Transactions.TransactionManagerV2.BlockTime = new(config.Duration) unknown.Transactions.TransactionManagerV2.CustomURL = new(config.URL) unknown.Transactions.TransactionManagerV2.DualBroadcast = ptr(false) + unknown.Transactions.TransactionManagerV2.ReadRequestsToMultipleNodes = ptr(false) unknown.Transactions.TransactionManagerV2.Bundles = ptr(false) unknown.Transactions.TransactionManagerV2.FastlaneAuctionRequestTimeout = new(config.Duration) unknown.Transactions.AutoPurge.Threshold = ptr(uint32(0)) @@ -161,6 +162,7 @@ func TestDocs(t *testing.T) { docDefaults.Transactions.TransactionManagerV2.BlockTime = nil docDefaults.Transactions.TransactionManagerV2.CustomURL = nil docDefaults.Transactions.TransactionManagerV2.DualBroadcast = nil + docDefaults.Transactions.TransactionManagerV2.ReadRequestsToMultipleNodes = nil docDefaults.Transactions.TransactionManagerV2.Bundles = nil docDefaults.Transactions.TransactionManagerV2.FastlaneAuctionRequestTimeout = nil @@ -287,6 +289,7 @@ var fullConfig = EVMConfig{ TransactionManagerV2: TransactionManagerV2Config{ Enabled: ptr(false), DualBroadcast: ptr(true), + ReadRequestsToMultipleNodes: ptr(false), Bundles: ptr(false), BlockTime: config.MustNewDuration(42 * time.Second), CustomURL: config.MustParseURL("http://txs.org"), diff --git a/pkg/config/toml/docs.toml b/pkg/config/toml/docs.toml index a01aad9330..67eabcec61 100644 --- a/pkg/config/toml/docs.toml +++ b/pkg/config/toml/docs.toml @@ -171,6 +171,8 @@ BlockTime = '10s' # Example CustomURL = 'https://example.api.io' # Example # DualBroadcast enables DualBroadcast functionality. DualBroadcast = false # Example +# ReadRequestsToMultipleNodes controls whether txm chain client reads use multiplexed calls. +ReadRequestsToMultipleNodes = false # Example # Bundles enables sending bundles for auctioning (not compatible with all OFAs). Bundles = false # Example # FastlaneAuctionRequestTimeout configures the HTTP request timeout for Fastlane Atlas auction requests. Defaults to 5s if not set. diff --git a/pkg/config/toml/testdata/config-full.toml b/pkg/config/toml/testdata/config-full.toml index 564ea7d668..af219ae27e 100644 --- a/pkg/config/toml/testdata/config-full.toml +++ b/pkg/config/toml/testdata/config-full.toml @@ -47,6 +47,7 @@ Enabled = false BlockTime = '42s' CustomURL = 'http://txs.org' DualBroadcast = true +ReadRequestsToMultipleNodes = false Bundles = false FastlaneAuctionRequestTimeout = '15s' diff --git a/pkg/txm/clientwrappers/chain_client.go b/pkg/txm/clientwrappers/chain_client.go index 1f05f38165..c360f51b7d 100644 --- a/pkg/txm/clientwrappers/chain_client.go +++ b/pkg/txm/clientwrappers/chain_client.go @@ -2,27 +2,97 @@ package clientwrappers import ( "context" + "fmt" "math/big" + "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + evmtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-evm/pkg/client" "github.com/smartcontractkit/chainlink-evm/pkg/txm/types" ) +// MultiCallMaxTimeout is the maximum timeout for the multi-call operation. +// Given how frequently reads are made, we're making a tradeoff between latency and read availability. +const MultiCallMaxTimeout = 1500 * time.Millisecond + type ChainClient struct { - c client.Client + lggr logger.SugaredLogger + c client.Client + readRequestsToMultipleNodes bool + metrics *chainClientMetrics +} + +func NewChainClient(lggr logger.Logger, client client.Client, readRequestsToMultipleNodes bool) (*ChainClient, error) { + chainClientLogger := logger.Sugared(logger.Named(lggr, "Txm.ChainClient")) + + metrics, err := newChainClientMetrics(client.ConfiguredChainID()) + if err != nil { + return nil, fmt.Errorf("failed to initialize chain client metrics: %w", err) + } + if metrics == nil { + return nil, fmt.Errorf("failed to initialize chain client metrics: nil metrics recorder") + } + + return &ChainClient{ + lggr: chainClientLogger, + c: client, + readRequestsToMultipleNodes: readRequestsToMultipleNodes, + metrics: metrics, + }, nil } -func NewChainClient(client client.Client) *ChainClient { - return &ChainClient{c: client} +func (c *ChainClient) BlockByNumber(ctx context.Context, number *big.Int) (*evmtypes.Block, error) { + return c.c.BlockByNumber(ctx, number) } func (c *ChainClient) NonceAt(ctx context.Context, address common.Address, blockNumber *big.Int) (uint64, error) { + if c.readRequestsToMultipleNodes { + blockTag := "latest" + if blockNumber != nil { + blockTag = hexutil.EncodeBig(blockNumber) + } + + ctx, cancel := context.WithTimeout(ctx, MultiCallMaxTimeout) + defer cancel() + + startedAt := time.Now() + nonce, err := c.c.NonceAtWithFallback(ctx, address, blockNumber) + callDuration := time.Since(startedAt) + if err != nil { + err = fmt.Errorf("error calling NonceAtWithFallback: %w", err) + c.metrics.recordMultiCallDuration(ctx, "eth_getTransactionCount", blockTag, callDuration, err) + return 0, err + } + + c.metrics.recordMultiCallDuration(ctx, "eth_getTransactionCount", blockTag, callDuration, nil) + c.lggr.Debugw("eth_getTransactionCount", "address", address, "nonce", nonce, "callDuration", callDuration) + return nonce, nil + } return c.c.NonceAt(ctx, address, blockNumber) } func (c *ChainClient) PendingNonceAt(ctx context.Context, address common.Address) (uint64, error) { + if c.readRequestsToMultipleNodes { + ctx, cancel := context.WithTimeout(ctx, MultiCallMaxTimeout) + defer cancel() + + startedAt := time.Now() + nonce, err := c.c.PendingNonceAtWithFallback(ctx, address) + callDuration := time.Since(startedAt) + if err != nil { + err = fmt.Errorf("error calling PendingNonceAtWithFallback: %w", err) + c.metrics.recordMultiCallDuration(ctx, "eth_getTransactionCount", "pending", callDuration, err) + return 0, err + } + + c.metrics.recordMultiCallDuration(ctx, "eth_getTransactionCount", "pending", callDuration, nil) + c.lggr.Debugw("eth_getTransactionCount", "address", address, "blockTag", "pending", "nonce", nonce, "callDuration", callDuration) + return nonce, nil + } return c.c.PendingNonceAt(ctx, address) } diff --git a/pkg/txm/clientwrappers/chain_client_metrics.go b/pkg/txm/clientwrappers/chain_client_metrics.go new file mode 100644 index 0000000000..6561d6f98a --- /dev/null +++ b/pkg/txm/clientwrappers/chain_client_metrics.go @@ -0,0 +1,49 @@ +package clientwrappers + +import ( + "context" + "errors" + "fmt" + "math/big" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" +) + +type chainClientMetrics struct { + chainID string + multiCallDurationHistogram metric.Float64Histogram +} + +func newChainClientMetrics(chainID *big.Int) (*chainClientMetrics, error) { + multiCallDurationHistogram, err := beholder.GetMeter().Float64Histogram("txm_multicall_duration_ms") + if err != nil { + return nil, fmt.Errorf("failed to register txm multicall duration metric: %w", err) + } + + chainIDStr := "unknown" + if chainID != nil { + chainIDStr = chainID.String() + } + + return &chainClientMetrics{ + chainID: chainIDStr, + multiCallDurationHistogram: multiCallDurationHistogram, + }, nil +} + +func (m *chainClientMetrics) recordMultiCallDuration(ctx context.Context, method, blockTag string, duration time.Duration, err error) { + success := err == nil + timedOut := errors.Is(err, context.DeadlineExceeded) + + m.multiCallDurationHistogram.Record(ctx, float64(duration)/float64(time.Millisecond), metric.WithAttributes( + attribute.String("chainID", m.chainID), + attribute.String("method", method), + attribute.String("blockTag", blockTag), + attribute.Bool("success", success), + attribute.Bool("timedOut", timedOut), + )) +} diff --git a/pkg/txm/clientwrappers/chain_client_test.go b/pkg/txm/clientwrappers/chain_client_test.go new file mode 100644 index 0000000000..84e54759d4 --- /dev/null +++ b/pkg/txm/clientwrappers/chain_client_test.go @@ -0,0 +1,73 @@ +package clientwrappers + +import ( + "errors" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-evm/pkg/client/clienttest" +) + +func TestPendingNonceAtWithFallback_ReturnsNonce(t *testing.T) { + t.Parallel() + + m := clienttest.NewClientWithDefaultChainID(t) + address := common.HexToAddress("0x1111111111111111111111111111111111111111") + c, err := NewChainClient(logger.Test(t), m, true) + require.NoError(t, err) + + m.On("PendingNonceAtWithFallback", mock.Anything, address).Return(uint64(10), nil).Once() + + nonce, err := c.PendingNonceAt(t.Context(), address) + require.NoError(t, err) + require.Equal(t, uint64(10), nonce) +} + +func TestPendingNonceAtWithFallback_ErrorsWhenNoSuccessfulResults(t *testing.T) { + t.Parallel() + + m := clienttest.NewClientWithDefaultChainID(t) + address := common.HexToAddress("0x2222222222222222222222222222222222222222") + c, err := NewChainClient(logger.Test(t), m, true) + require.NoError(t, err) + + m.On("PendingNonceAtWithFallback", mock.Anything, address).Return(uint64(0), errors.New("all nodes failed for pending nonce")).Once() + + _, err = c.PendingNonceAt(t.Context(), address) + require.ErrorContains(t, err, "all nodes failed") +} + +func TestNonceAtWithFallback_ReturnsNonce(t *testing.T) { + t.Parallel() + + m := clienttest.NewClientWithDefaultChainID(t) + address := common.HexToAddress("0x3333333333333333333333333333333333333333") + blockNumber := big.NewInt(7) + c, err := NewChainClient(logger.Test(t), m, true) + require.NoError(t, err) + + m.On("NonceAtWithFallback", mock.Anything, address, blockNumber).Return(uint64(11), nil).Once() + + nonce, err := c.NonceAt(t.Context(), address, blockNumber) + require.NoError(t, err) + require.Equal(t, uint64(11), nonce) +} + +func TestNonceAtWithFallback_ErrorsWhenNoSuccessfulResults(t *testing.T) { + t.Parallel() + + m := clienttest.NewClientWithDefaultChainID(t) + address := common.HexToAddress("0x4444444444444444444444444444444444444444") + c, err := NewChainClient(logger.Test(t), m, true) + require.NoError(t, err) + + m.On("NonceAtWithFallback", mock.Anything, address, (*big.Int)(nil)).Return(uint64(0), errors.New("all nodes failed for nonce")).Once() + + _, err = c.NonceAt(t.Context(), address, nil) + require.ErrorContains(t, err, "all nodes failed") +} diff --git a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go index ca7fd8f87b..d6f1de0a27 100644 --- a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go +++ b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client.go @@ -37,7 +37,7 @@ type FlashbotsTxStore interface { type FlashbotsClientRPC interface { BlockByNumber(ctx context.Context, number *big.Int) (*evmtypes.Block, error) NonceAt(context.Context, common.Address, *big.Int) (uint64, error) - SendTransaction(context.Context, *evmtypes.Transaction) error + SendTransaction(context.Context, *types.Transaction, *types.Attempt) error } type FlashbotsClient struct { @@ -116,7 +116,7 @@ func (d *FlashbotsClient) SendTransaction(ctx context.Context, tx *types.Transac return nil } - return d.c.SendTransaction(ctx, attempt.SignedTransaction) + return d.c.SendTransaction(ctx, nil, attempt) } func (d *FlashbotsClient) signAndPostMessage(ctx context.Context, address common.Address, body []byte, urlParams string) (json.RawMessage, error) { diff --git a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client_test.go b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client_test.go index dff8feee4b..2218095cce 100644 --- a/pkg/txm/clientwrappers/dualbroadcast/flashbots_client_test.go +++ b/pkg/txm/clientwrappers/dualbroadcast/flashbots_client_test.go @@ -33,7 +33,7 @@ func (m *testFlashbotsRPC) NonceAt(context.Context, common.Address, *big.Int) (u return 0, nil } -func (m *testFlashbotsRPC) SendTransaction(context.Context, *evmtypes.Transaction) error { +func (m *testFlashbotsRPC) SendTransaction(context.Context, *txmtypes.Transaction, *txmtypes.Attempt) error { return nil } diff --git a/pkg/txm/clientwrappers/dualbroadcast/meta_client.go b/pkg/txm/clientwrappers/dualbroadcast/meta_client.go index 2b7a989f8e..21c0c9d818 100644 --- a/pkg/txm/clientwrappers/dualbroadcast/meta_client.go +++ b/pkg/txm/clientwrappers/dualbroadcast/meta_client.go @@ -132,7 +132,7 @@ type MetaClientKeystore interface { type MetaClientRPC interface { NonceAt(context.Context, common.Address, *big.Int) (uint64, error) PendingNonceAt(context.Context, common.Address) (uint64, error) - SendTransaction(context.Context, *evmtypes.Transaction) error + SendTransaction(context.Context, *types.Transaction, *types.Attempt) error } type MetaClient struct { @@ -215,13 +215,13 @@ func (a *MetaClient) SendTransaction(ctx context.Context, tx *types.Transaction, first := tx.Attempts[0] if first.SignedTransaction != nil { a.lggr.Infow("Intercepted attempt for tx(rebroadcasting first attempt)", "txID", tx.ID, "attempt", first) - return a.c.SendTransaction(ctx, first.SignedTransaction) + return a.c.SendTransaction(ctx, nil, first) } } // #3 a.lggr.Infow("Broadcasting attempt to public mempool", "tx", tx) - return a.c.SendTransaction(ctx, attempt.SignedTransaction) + return a.c.SendTransaction(ctx, nil, attempt) } type Parameters struct { @@ -594,7 +594,8 @@ func (a *MetaClient) SendOperation(ctx context.Context, tx *types.Transaction, a if err := a.txStore.UpdateSignedAttempt(ctx, tx.ID, attempt.ID, signedTx, tx.FromAddress); err != nil { return fmt.Errorf("failed to update signed attempt for txID: %v, err: %w", tx.ID, err) } + attempt.SignedTransaction = signedTx a.lggr.Infow("Intercepted attempt for tx", "txID", tx.ID, "hash", signedTx.Hash(), "toAddress", meta.ToAddress, "gasLimit", meta.GasLimit, "TipCap", tip, "FeeCap", meta.MaxFeePerGas, "transactionLifecycleID", tx.GetTransactionLifecycleID(a.lggr)) - return a.c.SendTransaction(ctx, signedTx) + return a.c.SendTransaction(ctx, nil, attempt) } diff --git a/pkg/txm/clientwrappers/dualbroadcast/selector.go b/pkg/txm/clientwrappers/dualbroadcast/selector.go index 838fea7e10..c7b4bba3f1 100644 --- a/pkg/txm/clientwrappers/dualbroadcast/selector.go +++ b/pkg/txm/clientwrappers/dualbroadcast/selector.go @@ -11,15 +11,21 @@ import ( "github.com/smartcontractkit/chainlink-evm/pkg/client" "github.com/smartcontractkit/chainlink-evm/pkg/keys" "github.com/smartcontractkit/chainlink-evm/pkg/txm" + "github.com/smartcontractkit/chainlink-evm/pkg/txm/clientwrappers" ) -func SelectClient(lggr logger.Logger, client client.Client, keyStore keys.ChainStore, url *url.URL, chainID *big.Int, txStore txm.TxStore, bundles *bool, auctionRequestTimeout *time.Duration) (txm.Client, txm.ErrorHandler, error) { +func SelectClient(lggr logger.Logger, client client.Client, keyStore keys.ChainStore, url *url.URL, chainID *big.Int, txStore txm.TxStore, readRequestsToMultipleNodes bool, bundles *bool, auctionRequestTimeout *time.Duration) (txm.Client, txm.ErrorHandler, error) { + chainClient, err := clientwrappers.NewChainClient(lggr, client, readRequestsToMultipleNodes) + if err != nil { + return nil, nil, err + } + urlString := url.String() switch { case strings.Contains(urlString, "flashbots"): - return NewFlashbotsClient(lggr, client, keyStore, url, txStore, bundles), nil, nil + return NewFlashbotsClient(lggr, chainClient, keyStore, url, txStore, bundles), nil, nil default: - mc, err := NewMetaClient(lggr, client, keyStore, url, chainID, txStore, auctionRequestTimeout) + mc, err := NewMetaClient(lggr, chainClient, keyStore, url, chainID, txStore, auctionRequestTimeout) if err != nil { return nil, nil, err } diff --git a/pkg/txmgr/builder.go b/pkg/txmgr/builder.go index 4974dfcbe1..eb0d1436b4 100644 --- a/pkg/txmgr/builder.go +++ b/pkg/txmgr/builder.go @@ -143,6 +143,10 @@ func NewTxmV2( attemptBuilder := txm.NewAttemptBuilder(fCfg.PriceMaxKey, estimator, keyStore, gasEstimatorConfig.LimitTransfer()) inMemoryStoreManager := storage.NewInMemoryStoreManager(lggr, chainID) + readRequestsToMultipleNodes := false + if txmV2Config.ReadRequestsToMultipleNodes() != nil && *txmV2Config.ReadRequestsToMultipleNodes() { + readRequestsToMultipleNodes = true + } config := txm.Config{ EIP1559: fCfg.EIP1559DynamicFees(), BlockTime: *txmV2Config.BlockTime(), @@ -154,12 +158,16 @@ func NewTxmV2( var c txm.Client if txmV2Config.DualBroadcast() != nil && *txmV2Config.DualBroadcast() && txmV2Config.CustomURL() != nil { var err error - c, eh, err = dualbroadcast.SelectClient(lggr, client, keyStore, txmV2Config.CustomURL(), chainID, inMemoryStoreManager, txmV2Config.Bundles(), txmV2Config.FastlaneAuctionRequestTimeout()) + c, eh, err = dualbroadcast.SelectClient(lggr, client, keyStore, txmV2Config.CustomURL(), chainID, inMemoryStoreManager, readRequestsToMultipleNodes, txmV2Config.Bundles(), txmV2Config.FastlaneAuctionRequestTimeout()) if err != nil { return nil, fmt.Errorf("failed to create dual broadcast client: %w", err) } } else { - c = clientwrappers.NewChainClient(client) + var err error + c, err = clientwrappers.NewChainClient(lggr, client, readRequestsToMultipleNodes) + if err != nil { + return nil, fmt.Errorf("failed to create chain client wrapper: %w", err) + } } t := txm.NewTxm(lggr, chainID, c, attemptBuilder, inMemoryStoreManager, stuckTxDetector, config, keyStore, eh) return txm.NewTxmOrchestrator(lggr, chainID, t, inMemoryStoreManager, fwdMgr, keyStore, attemptBuilder), nil