From 1f33adb548a70c1aedbcd8959f07aa3cd2cd2106 Mon Sep 17 00:00:00 2001 From: Augustus Chang Date: Thu, 26 Mar 2026 17:14:54 -0400 Subject: [PATCH 1/3] add local/rpc nonce metric --- pkg/txm/metrics.go | 32 ++++++++++++++++++++++++++++++++ pkg/txm/metrics_test.go | 38 ++++++++++++++++++++++++++++++++++++++ pkg/txm/txm.go | 9 +++++++++ 3 files changed, 79 insertions(+) diff --git a/pkg/txm/metrics.go b/pkg/txm/metrics.go index d47de2cd5e..d81517bbb1 100644 --- a/pkg/txm/metrics.go +++ b/pkg/txm/metrics.go @@ -40,6 +40,14 @@ var ( Name: "txm_time_until_tx_confirmed", Help: "The amount of time elapsed from a transaction being broadcast to being included in a block.", }, []string{"chainID"}) + promLocalNonce = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "txm_local_nonce", + Help: "The next nonce to be assigned by the TXM for a given address.", + }, []string{"chainID", "address"}) + promRPCNonce = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "txm_rpc_nonce", + Help: "The latest nonce reported by the RPC node for a given address.", + }, []string{"chainID", "address", "source"}) ) type txmMetrics struct { @@ -50,6 +58,8 @@ type txmMetrics struct { numNonceGaps metric.Int64Counter reachedMaxAttempts metric.Int64Gauge timeUntilTxConfirmed metric.Float64Histogram + localNonce metric.Int64Gauge + rpcNonce metric.Int64Gauge } func NewTxmMetrics(chainID *big.Int) (*txmMetrics, error) { @@ -78,6 +88,16 @@ func NewTxmMetrics(chainID *big.Int) (*txmMetrics, error) { return nil, fmt.Errorf("failed to register max attempts indicator: %w", err) } + localNonce, err := beholder.GetMeter().Int64Gauge("txm_local_nonce") + if err != nil { + return nil, fmt.Errorf("failed to register local nonce gauge: %w", err) + } + + rpcNonce, err := beholder.GetMeter().Int64Gauge("txm_rpc_nonce") + if err != nil { + return nil, fmt.Errorf("failed to register rpc nonce gauge: %w", err) + } + return &txmMetrics{ chainID: chainID, Labeler: metrics.NewLabeler().With("chainID", chainID.String()), @@ -86,6 +106,8 @@ func NewTxmMetrics(chainID *big.Int) (*txmMetrics, error) { numNonceGaps: numNonceGaps, reachedMaxAttempts: reachedMaxAttempts, timeUntilTxConfirmed: timeUntilTxConfirmed, + localNonce: localNonce, + rpcNonce: rpcNonce, }, nil } @@ -118,6 +140,16 @@ func (m *txmMetrics) RecordTimeUntilTxConfirmed(ctx context.Context, duration fl m.timeUntilTxConfirmed.Record(ctx, duration) } +func (m *txmMetrics) SetLocalNonce(ctx context.Context, address common.Address, nonce uint64) { + promLocalNonce.WithLabelValues(m.chainID.String(), address.String()).Set(float64(nonce)) + m.localNonce.Record(ctx, int64(nonce)) +} + +func (m *txmMetrics) SetRPCNonce(ctx context.Context, address common.Address, nonce uint64, source string) { + promRPCNonce.WithLabelValues(m.chainID.String(), address.String(), source).Set(float64(nonce)) + m.rpcNonce.Record(ctx, int64(nonce)) +} + func (m *txmMetrics) EmitTxMessage(ctx context.Context, txHash common.Hash, fromAddress common.Address, tx *types.Transaction) error { meta, err := tx.GetMeta() if err != nil { diff --git a/pkg/txm/metrics_test.go b/pkg/txm/metrics_test.go index 15faa1d895..1d55073136 100644 --- a/pkg/txm/metrics_test.go +++ b/pkg/txm/metrics_test.go @@ -132,3 +132,41 @@ func TestReachedMaxAttempts(t *testing.T) { value = testutil.ToFloat64(promReachedMaxAttempts.WithLabelValues(testutils.FixtureChainID.String())) require.InDelta(t, float64(0), value, 0.00001) } + +func TestSetLocalNonce(t *testing.T) { + ctx := t.Context() + chainID := testutils.FixtureChainID + address := testutils.NewAddress() + + m, err := NewTxmMetrics(chainID) + require.NoError(t, err) + + m.SetLocalNonce(ctx, address, 42) + value := testutil.ToFloat64(promLocalNonce.WithLabelValues(chainID.String(), address.String())) + assert.InDelta(t, float64(42), value, 0.00001) + + m.SetLocalNonce(ctx, address, 100) + value = testutil.ToFloat64(promLocalNonce.WithLabelValues(chainID.String(), address.String())) + assert.InDelta(t, float64(100), value, 0.00001) +} + +func TestSetRPCNonce(t *testing.T) { + ctx := t.Context() + chainID := testutils.FixtureChainID + address := testutils.NewAddress() + + m, err := NewTxmMetrics(chainID) + require.NoError(t, err) + + m.SetRPCNonce(ctx, address, 10, "pending") + value := testutil.ToFloat64(promRPCNonce.WithLabelValues(chainID.String(), address.String(), "pending")) + assert.InDelta(t, float64(10), value, 0.00001) + + m.SetRPCNonce(ctx, address, 5, "latest") + value = testutil.ToFloat64(promRPCNonce.WithLabelValues(chainID.String(), address.String(), "latest")) + assert.InDelta(t, float64(5), value, 0.00001) + + // Pending and latest are independent gauges + value = testutil.ToFloat64(promRPCNonce.WithLabelValues(chainID.String(), address.String(), "pending")) + assert.InDelta(t, float64(10), value, 0.00001) +} diff --git a/pkg/txm/txm.go b/pkg/txm/txm.go index 8ce3da1087..cf47d29c99 100644 --- a/pkg/txm/txm.go +++ b/pkg/txm/txm.go @@ -157,6 +157,9 @@ func (t *Txm) initializeNonce(ctx context.Context, address common.Address) { continue } t.SetNonce(address, pendingNonce) + if t.Metrics != nil { + t.Metrics.SetRPCNonce(ctxWithTimeout, address, pendingNonce, "pending") + } t.lggr.Debugf("Set initial nonce for address: %v to %d", address, pendingNonce) return } @@ -210,6 +213,9 @@ func (t *Txm) SetNonce(address common.Address, nonce uint64) { t.nonceMapMu.Lock() defer t.nonceMapMu.Unlock() t.nonceMap[address] = nonce + if t.Metrics != nil { + t.Metrics.SetLocalNonce(context.Background(), address, nonce) + } } func newBackoff(minDuration time.Duration) backoff.Backoff { @@ -284,6 +290,7 @@ func (t *Txm) BroadcastTransaction(ctx context.Context, address common.Address) if e != nil { return false, e } + t.Metrics.SetRPCNonce(ctx, address, pendingNonce, "pending") nonce := t.GetNonce(address) if nonce > pendingNonce { t.lggr.Warnf("Reached transaction limit. LocalNonce: %d, PendingNonce %d, unconfirmedCount: %d", @@ -343,6 +350,7 @@ func (t *Txm) sendTransactionWithError(ctx context.Context, tx *types.Transactio if pErr != nil { return pErr } + t.Metrics.SetRPCNonce(ctx, fromAddress, pendingNonce, "pending") if pendingNonce <= *tx.Nonce { return fmt.Errorf("pending nonce for txID: %v didn't increase. PendingNonce: %d, TxNonce: %d. TxErr: %w", tx.ID, pendingNonce, *tx.Nonce, txErr) } @@ -361,6 +369,7 @@ func (t *Txm) BackfillTransactions(ctx context.Context, address common.Address) if err != nil { return err } + t.Metrics.SetRPCNonce(ctx, address, latestNonce, "latest") confirmedTransactions, unconfirmedTransactionIDs, err := t.txStore.MarkConfirmedAndReorgedTransactions(ctx, latestNonce, address) if err != nil { From 10326f43b9bf31188b18c34ee1c8f25b27b097ad Mon Sep 17 00:00:00 2001 From: Augustus Chang Date: Fri, 27 Mar 2026 11:06:24 -0400 Subject: [PATCH 2/3] remove local nonce metrics and pending rpc nonce metrics --- pkg/txm/metrics.go | 24 ++++-------------------- pkg/txm/metrics_test.go | 31 +++++-------------------------- pkg/txm/txm.go | 10 +--------- 3 files changed, 10 insertions(+), 55 deletions(-) diff --git a/pkg/txm/metrics.go b/pkg/txm/metrics.go index d81517bbb1..800250ca34 100644 --- a/pkg/txm/metrics.go +++ b/pkg/txm/metrics.go @@ -40,14 +40,10 @@ var ( Name: "txm_time_until_tx_confirmed", Help: "The amount of time elapsed from a transaction being broadcast to being included in a block.", }, []string{"chainID"}) - promLocalNonce = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "txm_local_nonce", - Help: "The next nonce to be assigned by the TXM for a given address.", - }, []string{"chainID", "address"}) promRPCNonce = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "txm_rpc_nonce", - Help: "The latest nonce reported by the RPC node for a given address.", - }, []string{"chainID", "address", "source"}) + Help: "The latest confirmed nonce reported by the RPC node for a given address.", + }, []string{"chainID", "address"}) ) type txmMetrics struct { @@ -58,7 +54,6 @@ type txmMetrics struct { numNonceGaps metric.Int64Counter reachedMaxAttempts metric.Int64Gauge timeUntilTxConfirmed metric.Float64Histogram - localNonce metric.Int64Gauge rpcNonce metric.Int64Gauge } @@ -88,11 +83,6 @@ func NewTxmMetrics(chainID *big.Int) (*txmMetrics, error) { return nil, fmt.Errorf("failed to register max attempts indicator: %w", err) } - localNonce, err := beholder.GetMeter().Int64Gauge("txm_local_nonce") - if err != nil { - return nil, fmt.Errorf("failed to register local nonce gauge: %w", err) - } - rpcNonce, err := beholder.GetMeter().Int64Gauge("txm_rpc_nonce") if err != nil { return nil, fmt.Errorf("failed to register rpc nonce gauge: %w", err) @@ -106,7 +96,6 @@ func NewTxmMetrics(chainID *big.Int) (*txmMetrics, error) { numNonceGaps: numNonceGaps, reachedMaxAttempts: reachedMaxAttempts, timeUntilTxConfirmed: timeUntilTxConfirmed, - localNonce: localNonce, rpcNonce: rpcNonce, }, nil } @@ -140,13 +129,8 @@ func (m *txmMetrics) RecordTimeUntilTxConfirmed(ctx context.Context, duration fl m.timeUntilTxConfirmed.Record(ctx, duration) } -func (m *txmMetrics) SetLocalNonce(ctx context.Context, address common.Address, nonce uint64) { - promLocalNonce.WithLabelValues(m.chainID.String(), address.String()).Set(float64(nonce)) - m.localNonce.Record(ctx, int64(nonce)) -} - -func (m *txmMetrics) SetRPCNonce(ctx context.Context, address common.Address, nonce uint64, source string) { - promRPCNonce.WithLabelValues(m.chainID.String(), address.String(), source).Set(float64(nonce)) +func (m *txmMetrics) SetRPCNonce(ctx context.Context, address common.Address, nonce uint64) { + promRPCNonce.WithLabelValues(m.chainID.String(), address.String()).Set(float64(nonce)) m.rpcNonce.Record(ctx, int64(nonce)) } diff --git a/pkg/txm/metrics_test.go b/pkg/txm/metrics_test.go index 1d55073136..338c2247cf 100644 --- a/pkg/txm/metrics_test.go +++ b/pkg/txm/metrics_test.go @@ -133,23 +133,6 @@ func TestReachedMaxAttempts(t *testing.T) { require.InDelta(t, float64(0), value, 0.00001) } -func TestSetLocalNonce(t *testing.T) { - ctx := t.Context() - chainID := testutils.FixtureChainID - address := testutils.NewAddress() - - m, err := NewTxmMetrics(chainID) - require.NoError(t, err) - - m.SetLocalNonce(ctx, address, 42) - value := testutil.ToFloat64(promLocalNonce.WithLabelValues(chainID.String(), address.String())) - assert.InDelta(t, float64(42), value, 0.00001) - - m.SetLocalNonce(ctx, address, 100) - value = testutil.ToFloat64(promLocalNonce.WithLabelValues(chainID.String(), address.String())) - assert.InDelta(t, float64(100), value, 0.00001) -} - func TestSetRPCNonce(t *testing.T) { ctx := t.Context() chainID := testutils.FixtureChainID @@ -158,15 +141,11 @@ func TestSetRPCNonce(t *testing.T) { m, err := NewTxmMetrics(chainID) require.NoError(t, err) - m.SetRPCNonce(ctx, address, 10, "pending") - value := testutil.ToFloat64(promRPCNonce.WithLabelValues(chainID.String(), address.String(), "pending")) + m.SetRPCNonce(ctx, address, 10) + value := testutil.ToFloat64(promRPCNonce.WithLabelValues(chainID.String(), address.String())) assert.InDelta(t, float64(10), value, 0.00001) - m.SetRPCNonce(ctx, address, 5, "latest") - value = testutil.ToFloat64(promRPCNonce.WithLabelValues(chainID.String(), address.String(), "latest")) - assert.InDelta(t, float64(5), value, 0.00001) - - // Pending and latest are independent gauges - value = testutil.ToFloat64(promRPCNonce.WithLabelValues(chainID.String(), address.String(), "pending")) - assert.InDelta(t, float64(10), value, 0.00001) + m.SetRPCNonce(ctx, address, 25) + value = testutil.ToFloat64(promRPCNonce.WithLabelValues(chainID.String(), address.String())) + assert.InDelta(t, float64(25), value, 0.00001) } diff --git a/pkg/txm/txm.go b/pkg/txm/txm.go index cf47d29c99..f3620c58cf 100644 --- a/pkg/txm/txm.go +++ b/pkg/txm/txm.go @@ -157,9 +157,6 @@ func (t *Txm) initializeNonce(ctx context.Context, address common.Address) { continue } t.SetNonce(address, pendingNonce) - if t.Metrics != nil { - t.Metrics.SetRPCNonce(ctxWithTimeout, address, pendingNonce, "pending") - } t.lggr.Debugf("Set initial nonce for address: %v to %d", address, pendingNonce) return } @@ -213,9 +210,6 @@ func (t *Txm) SetNonce(address common.Address, nonce uint64) { t.nonceMapMu.Lock() defer t.nonceMapMu.Unlock() t.nonceMap[address] = nonce - if t.Metrics != nil { - t.Metrics.SetLocalNonce(context.Background(), address, nonce) - } } func newBackoff(minDuration time.Duration) backoff.Backoff { @@ -290,7 +284,6 @@ func (t *Txm) BroadcastTransaction(ctx context.Context, address common.Address) if e != nil { return false, e } - t.Metrics.SetRPCNonce(ctx, address, pendingNonce, "pending") nonce := t.GetNonce(address) if nonce > pendingNonce { t.lggr.Warnf("Reached transaction limit. LocalNonce: %d, PendingNonce %d, unconfirmedCount: %d", @@ -350,7 +343,6 @@ func (t *Txm) sendTransactionWithError(ctx context.Context, tx *types.Transactio if pErr != nil { return pErr } - t.Metrics.SetRPCNonce(ctx, fromAddress, pendingNonce, "pending") if pendingNonce <= *tx.Nonce { return fmt.Errorf("pending nonce for txID: %v didn't increase. PendingNonce: %d, TxNonce: %d. TxErr: %w", tx.ID, pendingNonce, *tx.Nonce, txErr) } @@ -369,7 +361,7 @@ func (t *Txm) BackfillTransactions(ctx context.Context, address common.Address) if err != nil { return err } - t.Metrics.SetRPCNonce(ctx, address, latestNonce, "latest") + t.Metrics.SetRPCNonce(ctx, address, latestNonce) confirmedTransactions, unconfirmedTransactionIDs, err := t.txStore.MarkConfirmedAndReorgedTransactions(ctx, latestNonce, address) if err != nil { From 2a764a2be09bd2ca8a44cf80a4a4cc8deebd5c04 Mon Sep 17 00:00:00 2001 From: Augustus Chang Date: Fri, 27 Mar 2026 11:50:11 -0400 Subject: [PATCH 3/3] nil check --- pkg/txm/txm.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/txm/txm.go b/pkg/txm/txm.go index f3620c58cf..65dfab8019 100644 --- a/pkg/txm/txm.go +++ b/pkg/txm/txm.go @@ -361,7 +361,9 @@ func (t *Txm) BackfillTransactions(ctx context.Context, address common.Address) if err != nil { return err } - t.Metrics.SetRPCNonce(ctx, address, latestNonce) + if t.Metrics != nil { + t.Metrics.SetRPCNonce(ctx, address, latestNonce) + } confirmedTransactions, unconfirmedTransactionIDs, err := t.txStore.MarkConfirmedAndReorgedTransactions(ctx, latestNonce, address) if err != nil {