diff --git a/pkg/ocr2transmit/metrics.go b/pkg/ocr2transmit/metrics.go new file mode 100644 index 0000000000..b8889d8e7f --- /dev/null +++ b/pkg/ocr2transmit/metrics.go @@ -0,0 +1,147 @@ +package ocr2transmit + +import ( + "context" + "math/big" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" +) + +var ( + promTransmitConfirmed = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "ocr2_transmit_tx_confirmed_total", + Help: "OCR2 aggregator transmit transactions that received a successful on-chain receipt.", + }, []string{"chain_id", "contract_address", "from_address"}) + promTransmitReverted = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "ocr2_transmit_tx_reverted_total", + Help: "OCR2 aggregator transmit transactions that were included but reverted.", + }, []string{"chain_id", "contract_address", "from_address"}) + promTransmitFatal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "ocr2_transmit_tx_fatal_total", + Help: "OCR2 aggregator transmit transactions marked fatally errored by TXM (e.g. could not get receipt).", + }, []string{"chain_id", "contract_address", "from_address"}) + promTransmitUnconfirmed = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "ocr2_transmit_unconfirmed_tx_count", + Help: "Count of unconfirmed in-memory transactions whose calldata is an OCR2 transmit call (TXM v2 per from_address).", + }, []string{"chain_id", "from_address"}) +) + +// OCR2TransmitMetrics records OCR2 transmit outcomes to Prometheus and OpenTelemetry (Beholder), following the same pattern as txmMetrics (package-level Prom vecs, OTel instruments on the metrics type). +type OCR2TransmitMetrics struct { + promConfirmed *prometheus.CounterVec + promReverted *prometheus.CounterVec + promFatal *prometheus.CounterVec + promUnconfirmed *prometheus.GaugeVec + + otelConfirmed metric.Int64Counter + otelReverted metric.Int64Counter + otelFatal metric.Int64Counter + otelUnconfirmed metric.Int64Gauge +} + +var ( + ocr2TransmitMetrics *OCR2TransmitMetrics + ocr2TransmitMetricsOnce sync.Once +) + +func newOCR2TransmitMetrics() *OCR2TransmitMetrics { + m := &OCR2TransmitMetrics{ + promConfirmed: promTransmitConfirmed, + promReverted: promTransmitReverted, + promFatal: promTransmitFatal, + promUnconfirmed: promTransmitUnconfirmed, + } + meter := beholder.GetMeter() + if c, err := meter.Int64Counter("ocr2_transmit_tx_confirmed_total"); err == nil { + m.otelConfirmed = c + } + if c, err := meter.Int64Counter("ocr2_transmit_tx_reverted_total"); err == nil { + m.otelReverted = c + } + if c, err := meter.Int64Counter("ocr2_transmit_tx_fatal_total"); err == nil { + m.otelFatal = c + } + if g, err := meter.Int64Gauge("ocr2_transmit_unconfirmed_tx_count"); err == nil { + m.otelUnconfirmed = g + } + return m +} + +func ocr2TransmitMetricsInstance() *OCR2TransmitMetrics { + ocr2TransmitMetricsOnce.Do(func() { + ocr2TransmitMetrics = newOCR2TransmitMetrics() + }) + return ocr2TransmitMetrics +} + +func transmitAttrs(chainID *big.Int, contract, from string) metric.MeasurementOption { + return metric.WithAttributes( + attribute.String("chain_id", chainID.String()), + attribute.String("contract_address", contract), + attribute.String("from_address", from), + ) +} + +func unconfirmedAttrs(chainID *big.Int, from string) metric.MeasurementOption { + return metric.WithAttributes( + attribute.String("chain_id", chainID.String()), + attribute.String("from_address", from), + ) +} + +// RecordOutcome increments confirmed / reverted / fatal counters when calldata matches OCR2 transmit. +func (m *OCR2TransmitMetrics) RecordOutcome(ctx context.Context, chainID *big.Int, from, to common.Address, encodedPayload []byte, fwdrDest *common.Address, outcome string) { + if chainID == nil || !IsTransmitCalldata(encodedPayload) { + return + } + contract := ContractLabel(to, fwdrDest) + labels := []string{chainID.String(), contract, from.Hex()} + opts := transmitAttrs(chainID, contract, from.Hex()) + switch outcome { + case "confirmed": + m.promConfirmed.WithLabelValues(labels...).Inc() + if m.otelConfirmed != nil { + m.otelConfirmed.Add(ctx, 1, opts) + } + case "reverted": + m.promReverted.WithLabelValues(labels...).Inc() + if m.otelReverted != nil { + m.otelReverted.Add(ctx, 1, opts) + } + case "fatal": + m.promFatal.WithLabelValues(labels...).Inc() + if m.otelFatal != nil { + m.otelFatal.Add(ctx, 1, opts) + } + default: + return + } +} + +// SetUnconfirmedGauge sets the gauge for OCR2-shaped unconfirmed txs for TXM v2 (optional / phase 3). +func (m *OCR2TransmitMetrics) SetUnconfirmedGauge(ctx context.Context, chainID *big.Int, from common.Address, n int) { + if chainID == nil { + return + } + m.promUnconfirmed.WithLabelValues(chainID.String(), from.Hex()).Set(float64(n)) + if m.otelUnconfirmed != nil { + m.otelUnconfirmed.Record(ctx, int64(n), unconfirmedAttrs(chainID, from.Hex())) + } +} + +// RecordOutcome increments confirmed / reverted / fatal counters when calldata matches OCR2 transmit. +func RecordOutcome(ctx context.Context, chainID *big.Int, from, to common.Address, encodedPayload []byte, fwdrDest *common.Address, outcome string) { + ocr2TransmitMetricsInstance().RecordOutcome(ctx, chainID, from, to, encodedPayload, fwdrDest, outcome) +} + +// SetUnconfirmedGauge sets the gauge for OCR2-shaped unconfirmed txs for TXM v2 (optional / phase 3). +func SetUnconfirmedGauge(ctx context.Context, chainID *big.Int, from common.Address, n int) { + ocr2TransmitMetricsInstance().SetUnconfirmedGauge(ctx, chainID, from, n) +} diff --git a/pkg/ocr2transmit/ocr2transmit.go b/pkg/ocr2transmit/ocr2transmit.go new file mode 100644 index 0000000000..b4abf90941 --- /dev/null +++ b/pkg/ocr2transmit/ocr2transmit.go @@ -0,0 +1,47 @@ +// Package ocr2transmit detects OCR2 aggregator transmit calldata for metrics (DF-22761 / DF-22643). +package ocr2transmit + +import ( + "sync" + + "github.com/ethereum/go-ethereum/common" + + "github.com/smartcontractkit/libocr/gethwrappers2/ocr2aggregator" +) + +var ( + transmitMethodID []byte + transmitMethodIDOnce sync.Once +) + +func transmitSig() []byte { + transmitMethodIDOnce.Do(func() { + parsed, err := ocr2aggregator.OCR2AggregatorMetaData.GetAbi() + if err != nil { + return + } + m, ok := parsed.Methods["transmit"] + if !ok { + return + } + transmitMethodID = m.ID + }) + return transmitMethodID +} + +// IsTransmitCalldata returns true if data begins with the OCR2Aggregator transmit function selector. +func IsTransmitCalldata(data []byte) bool { + sig := transmitSig() + if len(sig) != 4 || len(data) < 4 { + return false + } + return data[0] == sig[0] && data[1] == sig[1] && data[2] == sig[2] && data[3] == sig[3] +} + +// ContractLabel returns the logical aggregator address for metrics: meta forwarder dest if set, else to address. +func ContractLabel(to common.Address, fwdrDest *common.Address) string { + if fwdrDest != nil && *fwdrDest != (common.Address{}) { + return fwdrDest.Hex() + } + return to.Hex() +} diff --git a/pkg/ocr2transmit/ocr2transmit_test.go b/pkg/ocr2transmit/ocr2transmit_test.go new file mode 100644 index 0000000000..e3809b9a4b --- /dev/null +++ b/pkg/ocr2transmit/ocr2transmit_test.go @@ -0,0 +1,31 @@ +package ocr2transmit + +import ( + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/libocr/gethwrappers2/ocr2aggregator" +) + +func TestIsTransmitCalldata(t *testing.T) { + parsed, err := ocr2aggregator.OCR2AggregatorMetaData.GetAbi() + require.NoError(t, err) + m := parsed.Methods["transmit"] + require.NotNil(t, m) + + // Minimal non-empty args for selector match only (Pack may fail on full args); we only need first 4 bytes. + data := append([]byte{}, m.ID...) + require.True(t, IsTransmitCalldata(data)) + + require.False(t, IsTransmitCalldata([]byte{1, 2, 3})) + require.False(t, IsTransmitCalldata(nil)) +} + +func TestContractLabel(t *testing.T) { + to := common.HexToAddress("0x0000000000000000000000000000000000000001") + dest := common.HexToAddress("0x0000000000000000000000000000000000000002") + require.Equal(t, dest.Hex(), ContractLabel(to, &dest)) + require.Equal(t, to.Hex(), ContractLabel(to, nil)) +} diff --git a/pkg/txm/storage/inmemory_store.go b/pkg/txm/storage/inmemory_store.go index 83db34a854..c6cb9afc64 100644 --- a/pkg/txm/storage/inmemory_store.go +++ b/pkg/txm/storage/inmemory_store.go @@ -1,6 +1,7 @@ package storage import ( + "context" "fmt" "math/big" "sort" @@ -11,6 +12,7 @@ import ( evmtypes "github.com/ethereum/go-ethereum/core/types" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-evm/pkg/ocr2transmit" "github.com/smartcontractkit/chainlink-evm/pkg/txm/types" "github.com/smartcontractkit/chainlink-framework/chains/txmgr" ) @@ -395,6 +397,12 @@ func (m *InMemoryStore) MarkTxFatal(txToMark *types.Transaction) error { m.Lock() defer m.Unlock() + var fwdr *common.Address + if meta, err := txToMark.GetMeta(); err == nil && meta != nil && meta.FwdrDestAddress != nil { + fwdr = meta.FwdrDestAddress + } + ocr2transmit.RecordOutcome(context.Background(), m.chainID, txToMark.FromAddress, txToMark.ToAddress, txToMark.Data, fwdr, "fatal") + // TODO: for now do the simple thing and drop the transaction instead of adding it to the fatal queue. delete(m.UnconfirmedTransactions, *txToMark.Nonce) delete(m.Transactions, txToMark.ID) @@ -420,6 +428,19 @@ func (m *InMemoryStore) UpdateSignedAttempt(txID uint64, attemptID uint64, signe return fmt.Errorf("attempt was not found for attemptID: %v", attemptID) } +// CountOCR2UnconfirmedTransmit returns how many unconfirmed txs look like OCR2 transmit calls. +func (m *InMemoryStore) CountOCR2UnconfirmedTransmit() int { + m.RLock() + defer m.RUnlock() + n := 0 + for _, tx := range m.UnconfirmedTransactions { + if ocr2transmit.IsTransmitCalldata(tx.Data) { + n++ + } + } + return n +} + // Orchestrator func (m *InMemoryStore) FindTxWithIdempotencyKey(idempotencyKey string) *types.Transaction { m.RLock() diff --git a/pkg/txm/storage/inmemory_store_manager.go b/pkg/txm/storage/inmemory_store_manager.go index 4d6d421e18..ab73a621cf 100644 --- a/pkg/txm/storage/inmemory_store_manager.go +++ b/pkg/txm/storage/inmemory_store_manager.go @@ -10,6 +10,7 @@ import ( evmtypes "github.com/ethereum/go-ethereum/core/types" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-evm/pkg/ocr2transmit" "github.com/smartcontractkit/chainlink-evm/pkg/txm/types" ) @@ -127,6 +128,7 @@ func (m *InMemoryStoreManager) DeleteAttemptForUnconfirmedTx(_ context.Context, } func (m *InMemoryStoreManager) MarkTxFatal(_ context.Context, tx *types.Transaction, fromAddress common.Address) error { + // Add metric here to emit tx has failed if store, exists := m.InMemoryStoreMap[fromAddress]; exists { return store.MarkTxFatal(tx) } @@ -140,6 +142,15 @@ func (m *InMemoryStoreManager) UpdateSignedAttempt(_ context.Context, txID uint6 return fmt.Errorf(StoreNotFoundForAddress, fromAddress) } +// RefreshOCR2UnconfirmedGauges updates ocr2_transmit_unconfirmed_tx_count for every managed from_address (TXM v2). +func (m *InMemoryStoreManager) RefreshOCR2UnconfirmedGauges() { + ctx := context.Background() + for addr, store := range m.InMemoryStoreMap { + n := store.CountOCR2UnconfirmedTransmit() + ocr2transmit.SetUnconfirmedGauge(ctx, m.chainID, addr, n) + } +} + func (m *InMemoryStoreManager) FindTxWithIdempotencyKey(_ context.Context, idempotencyKey string) (*types.Transaction, error) { for _, store := range m.InMemoryStoreMap { tx := store.FindTxWithIdempotencyKey(idempotencyKey) diff --git a/pkg/txm/txm.go b/pkg/txm/txm.go index 8ce3da1087..9c9384aa8a 100644 --- a/pkg/txm/txm.go +++ b/pkg/txm/txm.go @@ -15,6 +15,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/utils" "github.com/smartcontractkit/chainlink-evm/pkg/keys" + "github.com/smartcontractkit/chainlink-evm/pkg/ocr2transmit" + "github.com/smartcontractkit/chainlink-evm/pkg/txm/storage" "github.com/smartcontractkit/chainlink-evm/pkg/txm/types" ) @@ -357,6 +359,8 @@ func (t *Txm) sendTransactionWithError(ctx context.Context, tx *types.Transactio } func (t *Txm) BackfillTransactions(ctx context.Context, address common.Address) error { + defer t.refreshOCR2UnconfirmedGauges() + latestNonce, err := t.client.NonceAt(ctx, address, nil) if err != nil { return err @@ -440,6 +444,17 @@ func (t *Txm) extractMetrics(ctx context.Context, txs []*types.Transaction) []ui if tx.InitialBroadcastAt != nil { t.Metrics.RecordTimeUntilTxConfirmed(ctx, float64(time.Since(*tx.InitialBroadcastAt))) } + var fwdr *common.Address + if meta, err := tx.GetMeta(); err == nil && meta != nil && meta.FwdrDestAddress != nil { + fwdr = meta.FwdrDestAddress + } + ocr2transmit.RecordOutcome(ctx, t.chainID, tx.FromAddress, tx.ToAddress, tx.Data, fwdr, "confirmed") } return confirmedTxIDs } + +func (t *Txm) refreshOCR2UnconfirmedGauges() { + if mgr, ok := t.txStore.(*storage.InMemoryStoreManager); ok { + mgr.RefreshOCR2UnconfirmedGauges() + } +} diff --git a/pkg/txmgr/finalizer.go b/pkg/txmgr/finalizer.go index 64ef4b7b4e..9ff0d9a7c4 100644 --- a/pkg/txmgr/finalizer.go +++ b/pkg/txmgr/finalizer.go @@ -504,8 +504,10 @@ func (f *evmFinalizer) validateReceipt(ctx context.Context, receipt *types.Recei } // This might increment more than once e.g. in case of re-orgs going back and forth we might re-fetch the same receipt f.metrics.IncrementNumRevertedTxs(ctx) + recordOCR2TransmitOutcomeV1(ctx, f.chainID, &attempt.Tx, "reverted") } else { f.metrics.IncrementNumSuccessfulTxs(ctx) + recordOCR2TransmitOutcomeV1(ctx, f.chainID, &attempt.Tx, "confirmed") } // This is only recording forwarded tx that were mined and have a status. @@ -600,6 +602,8 @@ func (f *evmFinalizer) ProcessOldTxsWithoutReceipts(ctx context.Context, oldTxID oldTx.Error = null.StringFrom(ErrCouldNotGetReceipt) if err = f.txStore.UpdateTxFatalErrorAndDeleteAttempts(ctx, oldTx); err != nil { errorList = append(errorList, fmt.Errorf("failed to mark tx with ID %d as fatal: %w", oldTx.ID, err)) + } else { + recordOCR2TransmitOutcomeV1(ctx, f.chainID, oldTx, "fatal") } } if len(errorList) > 0 { diff --git a/pkg/txmgr/ocr2_transmit_metrics.go b/pkg/txmgr/ocr2_transmit_metrics.go new file mode 100644 index 0000000000..d88fa5de47 --- /dev/null +++ b/pkg/txmgr/ocr2_transmit_metrics.go @@ -0,0 +1,25 @@ +package txmgr + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/common" + + "github.com/smartcontractkit/chainlink-evm/pkg/ocr2transmit" +) + +func recordOCR2TransmitOutcomeV1(ctx context.Context, chainID *big.Int, tx *Tx, outcome string) { + if tx == nil { + return + } + var fwdr *common.Address + if meta, err := tx.GetMeta(); err == nil && meta != nil && meta.FwdrDestAddress != nil { + fwdr = meta.FwdrDestAddress + } + cid := chainID + if cid == nil { + cid = tx.ChainID + } + ocr2transmit.RecordOutcome(ctx, cid, tx.FromAddress, tx.ToAddress, tx.EncodedPayload, fwdr, outcome) +}