Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ Enabled = false # Default
BlockTime = '10s' # Example
CustomURL = 'https://example.api.io' # Example
DualBroadcast = false # Example
ReadRequestsToMultipleNodes = false # Example
Bundles = false # Example
FastlaneAuctionRequestTimeout = '5s' # Example
```
Expand Down Expand Up @@ -419,6 +420,12 @@ DualBroadcast = false # Example
```
DualBroadcast enables DualBroadcast functionality.

### ReadRequestsToMultipleNodes
```toml
ReadRequestsToMultipleNodes = false # Example
```
ReadRequestsToMultipleNodes controls whether txm chain client reads use multiplexed calls.

### Bundles
```toml
Bundles = false # Example
Expand Down
84 changes: 84 additions & 0 deletions pkg/client/chain_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package client

import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"time"
Expand Down Expand Up @@ -74,7 +76,9 @@ type Client interface {
CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error)
PendingCodeAt(ctx context.Context, account common.Address) ([]byte, error)
PendingNonceAt(ctx context.Context, account common.Address) (uint64, error)
PendingNonceAtWithFallback(ctx context.Context, account common.Address) (uint64, error)
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
NonceAtWithFallback(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
TransactionByHash(ctx context.Context, txHash common.Hash) (*types.Transaction, error)
TransactionByHashWithOpts(ctx context.Context, txHash common.Hash, opts evmtypes.TransactionByHashOpts) (*types.Transaction, error)
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
Expand Down Expand Up @@ -260,6 +264,86 @@ func (c *chainClient) CallContext(ctx context.Context, result interface{}, metho
return r.CallContext(ctx, result, method, args...)
}

func (c *chainClient) PendingNonceAtWithFallback(parentCtx context.Context, account common.Address) (uint64, error) {
return c.nonceAtWithFallback(parentCtx, func(ctx context.Context, rpc *RPCClient) (uint64, error) {
n, err := rpc.PendingSequenceAt(ctx, account)
return uint64(n), err
})
}

func (c *chainClient) NonceAtWithFallback(parentCtx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
return c.nonceAtWithFallback(parentCtx, func(ctx context.Context, rpc *RPCClient) (uint64, error) {
return rpc.NonceAt(ctx, account, blockNumber)
})
}

// nonceAtWithFallback is a helper function that makes a call to the main node and if it fails, it makes a call to all other alive nodes in parallel.
func (c *chainClient) nonceAtWithFallback(parentCtx context.Context, call func(context.Context, *RPCClient) (uint64, error)) (uint64, error) {
// try main node
main, err := c.multiNode.SelectRPC(parentCtx)
if err != nil {
return 0, err
}

nonce, err := call(parentCtx, main)
if err == nil {
return nonce, nil
}
// if main node fails, collect the error and continue with the fallback calls
mainErr := fmt.Errorf("%s: %w", main.Name(), err)

ctx, cancel := context.WithCancel(parentCtx)
defer cancel()

type result struct {
nonce uint64
err error
}
results := make(chan result)
scheduled := 0

doFunc := func(ctx context.Context, rpc *RPCClient, isSendOnly bool) {
if isSendOnly || rpc == main {
return
}

scheduled++
go func(rpc *RPCClient) {
nonce, err := call(ctx, rpc)
if err != nil {
err = fmt.Errorf("%s: %w", rpc.Name(), err)
c.logger.Debugw("Fallback nonce call failed", "rpc", rpc.Name(), "err", err)
} else {
c.logger.Debugw("Fallback nonce call succeeded", "rpc", rpc.Name(), "nonce", nonce)
}

select {
case results <- result{nonce: nonce, err: err}:
case <-ctx.Done():
}
}(rpc)
}

if doErr := c.multiNode.DoAll(ctx, doFunc); doErr != nil {
return 0, doErr
}

errs := []error{mainErr}
for range scheduled {
select {
case <-ctx.Done():
return 0, errors.Join(append(errs, ctx.Err())...)
case res := <-results:
if res.err == nil {
return res.nonce, nil
}
errs = append(errs, res.err)
}
}

return 0, errors.Join(errs...)
}

func (c *chainClient) CallContract(ctx context.Context, msg ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) {
r, err := c.multiNode.SelectRPC(ctx)
if err != nil {
Expand Down
179 changes: 179 additions & 0 deletions pkg/client/chain_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import (
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink-framework/metrics"
"github.com/smartcontractkit/chainlink-framework/multinode"
"github.com/smartcontractkit/chainlink-framework/multinode/mocks"

"github.com/smartcontractkit/chainlink-evm/pkg/client"
"github.com/smartcontractkit/chainlink-evm/pkg/testutils"
Expand Down Expand Up @@ -960,4 +963,180 @@ func TestEthClient_ErroringClient(t *testing.T) {
require.Equal(t, multinode.ErrNodeError, err)
}

func TestChainClient_NonceAtWithFallback(t *testing.T) {
t.Parallel()

t.Run("main succeeds without fallback", func(t *testing.T) {
t.Parallel()

mainCalls := atomic.Int32{}
fallbackCalls := atomic.Int32{}

ethClient := mustNewChainClientWithTestNodes(t,
nonceServer(t, nonceResponse{result: `"0x5"`, calls: &mainCalls}),
nonceServer(t, nonceResponse{result: `"0x6"`, calls: &fallbackCalls}),
)

nonce, err := ethClient.NonceAtWithFallback(t.Context(), testutils.NewAddress(), nil)
require.NoError(t, err)
require.Equal(t, uint64(5), nonce)
require.Equal(t, int32(1), mainCalls.Load())
require.Equal(t, int32(0), fallbackCalls.Load())
})

t.Run("fallback returns first success", func(t *testing.T) {
t.Parallel()

mainCalls := atomic.Int32{}
fastFallbackCalls := atomic.Int32{}

ethClient := mustNewChainClientWithTestNodes(t,
nonceServer(t, nonceResponse{code: -32000, message: "main failed", calls: &mainCalls}),
nonceServer(t, nonceResponse{delay: 200 * time.Millisecond, result: `"0x9"`}),
nonceServer(t, nonceResponse{delay: 25 * time.Millisecond, result: `"0xa"`, calls: &fastFallbackCalls}),
)

startedAt := time.Now()
nonce, err := ethClient.NonceAtWithFallback(t.Context(), testutils.NewAddress(), nil)
duration := time.Since(startedAt)
require.NoError(t, err)
require.Equal(t, uint64(10), nonce)
require.Equal(t, int32(1), mainCalls.Load())
require.Equal(t, int32(1), fastFallbackCalls.Load())
require.Less(t, duration, 100*time.Millisecond)
})

t.Run("all primaries fail", func(t *testing.T) {
t.Parallel()

ethClient := mustNewChainClientWithTestNodes(t,
nonceServer(t, nonceResponse{code: -32000, message: "main failed"}),
nonceServer(t, nonceResponse{code: -32001, message: "fallback one failed"}),
nonceServer(t, nonceResponse{code: -32002, message: "fallback two failed"}),
)

_, err := ethClient.PendingNonceAtWithFallback(t.Context(), testutils.NewAddress())
require.Error(t, err)
requireErrorContainsAll(t, err,
"eth-primary-rpc-0", "main failed",
"eth-primary-rpc-1", "fallback one failed",
"eth-primary-rpc-2", "fallback two failed",
)
})

t.Run("context canceled", func(t *testing.T) {
t.Parallel()

ethClient := mustNewChainClientWithTestNodes(t,
nonceServer(t, nonceResponse{delay: 100 * time.Millisecond, result: `"0x5"`}),
)

ctx, cancel := context.WithCancel(tests.Context(t))
cancel()

_, err := ethClient.NonceAtWithFallback(ctx, testutils.NewAddress(), nil)
require.Error(t, err)
require.ErrorIs(t, err, context.Canceled)
})
}

func mustNewChainClientWithTestNodes(t *testing.T, wsURLs ...string) client.Client {
t.Helper()

chainID := testutils.FixtureChainID
lggr := logger.Test(t)
metricsRecorder, err := metrics.NewGenericMultiNodeMetrics("EVM Test", chainID.String())
require.NoError(t, err)

cfg := client.TestNodePoolConfig{NodeSelectionMode: multinode.NodeSelectionModeRoundRobin}
primaries := make([]multinode.Node[*big.Int, *client.RPCClient], 0, len(wsURLs))
for i, wsURL := range wsURLs {
parsed, err := url.ParseRequestURI(wsURL)
require.NoError(t, err)

rpc := client.NewTestRPCClient(t, client.RPCClientOpts{
Cfg: cfg,
WS: parsed,
Name: "eth-primary-rpc-" + big.NewInt(int64(i)).String(),
ID: i + 1,
ChainID: chainID,
Tier: multinode.Primary,
})

node := multinode.NewNode(
cfg,
mocks.ChainConfig{NoNewHeadsThresholdVal: 0},
lggr,
metricsRecorder,
parsed,
nil,
"eth-primary-node-"+big.NewInt(int64(i)).String(),
i+1,
chainID,
1,
rpc,
"EVM",
false,
)
primaries = append(primaries, node)
}

clientErrors := client.NewTestClientErrors()
ethClient := client.NewChainClient(lggr, metricsRecorder, cfg.SelectionMode(), 0, primaries, nil, chainID, &clientErrors, 0, "")
t.Cleanup(ethClient.Close)
require.NoError(t, ethClient.Dial(t.Context()))
require.Eventually(t, func() bool {
alive := 0
for _, state := range ethClient.NodeStates() {
if state == "Alive" {
alive++
}
}
return alive == len(wsURLs)
}, time.Minute, time.Second)

return ethClient
}

type nonceResponse struct {
delay time.Duration
result string
code int
message string
calls *atomic.Int32
}

func nonceServer(t *testing.T, respCfg nonceResponse) string {
t.Helper()
return testutils.NewWSServer(t, testutils.FixtureChainID, func(method string, params gjson.Result) (resp testutils.JSONRPCResponse) {
switch method {
case "eth_subscribe":
resp.Result = `"0x00"`
resp.Notify = headResult
case "eth_unsubscribe":
resp.Result = "true"
case "eth_getBlockByNumber":
resp.Result = client.MakeHeadMsgForNumber(42)
case "eth_getTransactionCount":
if respCfg.calls != nil {
respCfg.calls.Add(1)
}
if respCfg.delay > 0 {
time.Sleep(respCfg.delay)
}
resp.Result = respCfg.result
resp.Error.Code = respCfg.code
resp.Error.Message = respCfg.message
}
return
}).WSURL().String()
}

func requireErrorContainsAll(t *testing.T, err error, parts ...string) {
t.Helper()
for _, part := range parts {
require.ErrorContains(t, err, part)
}
}

const headResult = client.HeadResult
Loading
Loading