Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions pkg/ocr2transmit/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package ocr2transmit

import (
"context"
"math/big"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)

var (
promTransmitConfirmed = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "ocr2_transmit_tx_confirmed_total",
Help: "OCR2 aggregator transmit transactions that received a successful on-chain receipt.",
}, []string{"chain_id", "contract_address", "from_address"})
promTransmitReverted = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "ocr2_transmit_tx_reverted_total",
Help: "OCR2 aggregator transmit transactions that were included but reverted.",
}, []string{"chain_id", "contract_address", "from_address"})
promTransmitFatal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "ocr2_transmit_tx_fatal_total",
Help: "OCR2 aggregator transmit transactions marked fatally errored by TXM (e.g. could not get receipt).",
}, []string{"chain_id", "contract_address", "from_address"})
promTransmitUnconfirmed = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "ocr2_transmit_unconfirmed_tx_count",
Help: "Count of unconfirmed in-memory transactions whose calldata is an OCR2 transmit call (TXM v2 per from_address).",
}, []string{"chain_id", "from_address"})
Comment on lines +18 to +33
Copy link
Copy Markdown
Contributor

@cll-gg cll-gg Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use otel metrics instead of (or next to) prom metrics, then they will be ingested by Beholder and NOP metrics will be visible in Grafana

)

// OCR2TransmitMetrics records OCR2 transmit outcomes to Prometheus and OpenTelemetry (Beholder), following the same pattern as txmMetrics (package-level Prom vecs, OTel instruments on the metrics type).
type OCR2TransmitMetrics struct {
promConfirmed *prometheus.CounterVec
promReverted *prometheus.CounterVec
promFatal *prometheus.CounterVec
promUnconfirmed *prometheus.GaugeVec

otelConfirmed metric.Int64Counter
otelReverted metric.Int64Counter
otelFatal metric.Int64Counter
otelUnconfirmed metric.Int64Gauge
}

var (
ocr2TransmitMetrics *OCR2TransmitMetrics
ocr2TransmitMetricsOnce sync.Once
)

func newOCR2TransmitMetrics() *OCR2TransmitMetrics {
m := &OCR2TransmitMetrics{
promConfirmed: promTransmitConfirmed,
promReverted: promTransmitReverted,
promFatal: promTransmitFatal,
promUnconfirmed: promTransmitUnconfirmed,
}
meter := beholder.GetMeter()
if c, err := meter.Int64Counter("ocr2_transmit_tx_confirmed_total"); err == nil {
m.otelConfirmed = c
}
if c, err := meter.Int64Counter("ocr2_transmit_tx_reverted_total"); err == nil {
m.otelReverted = c
}
if c, err := meter.Int64Counter("ocr2_transmit_tx_fatal_total"); err == nil {
m.otelFatal = c
}
if g, err := meter.Int64Gauge("ocr2_transmit_unconfirmed_tx_count"); err == nil {
m.otelUnconfirmed = g
}
return m
}

func ocr2TransmitMetricsInstance() *OCR2TransmitMetrics {
ocr2TransmitMetricsOnce.Do(func() {
ocr2TransmitMetrics = newOCR2TransmitMetrics()
})
return ocr2TransmitMetrics
}

func transmitAttrs(chainID *big.Int, contract, from string) metric.MeasurementOption {
return metric.WithAttributes(
attribute.String("chain_id", chainID.String()),
attribute.String("contract_address", contract),
attribute.String("from_address", from),
)
}

func unconfirmedAttrs(chainID *big.Int, from string) metric.MeasurementOption {
return metric.WithAttributes(
attribute.String("chain_id", chainID.String()),
attribute.String("from_address", from),
)
}

// RecordOutcome increments confirmed / reverted / fatal counters when calldata matches OCR2 transmit.
func (m *OCR2TransmitMetrics) RecordOutcome(ctx context.Context, chainID *big.Int, from, to common.Address, encodedPayload []byte, fwdrDest *common.Address, outcome string) {
if chainID == nil || !IsTransmitCalldata(encodedPayload) {
return
}
contract := ContractLabel(to, fwdrDest)
labels := []string{chainID.String(), contract, from.Hex()}
opts := transmitAttrs(chainID, contract, from.Hex())
switch outcome {
case "confirmed":
m.promConfirmed.WithLabelValues(labels...).Inc()
if m.otelConfirmed != nil {
m.otelConfirmed.Add(ctx, 1, opts)
}
case "reverted":
m.promReverted.WithLabelValues(labels...).Inc()
if m.otelReverted != nil {
m.otelReverted.Add(ctx, 1, opts)
}
case "fatal":
m.promFatal.WithLabelValues(labels...).Inc()
if m.otelFatal != nil {
m.otelFatal.Add(ctx, 1, opts)
}
default:
return
}
}

// SetUnconfirmedGauge sets the gauge for OCR2-shaped unconfirmed txs for TXM v2 (optional / phase 3).
func (m *OCR2TransmitMetrics) SetUnconfirmedGauge(ctx context.Context, chainID *big.Int, from common.Address, n int) {
if chainID == nil {
return
}
m.promUnconfirmed.WithLabelValues(chainID.String(), from.Hex()).Set(float64(n))
if m.otelUnconfirmed != nil {
m.otelUnconfirmed.Record(ctx, int64(n), unconfirmedAttrs(chainID, from.Hex()))
}
}

// RecordOutcome increments confirmed / reverted / fatal counters when calldata matches OCR2 transmit.
func RecordOutcome(ctx context.Context, chainID *big.Int, from, to common.Address, encodedPayload []byte, fwdrDest *common.Address, outcome string) {
ocr2TransmitMetricsInstance().RecordOutcome(ctx, chainID, from, to, encodedPayload, fwdrDest, outcome)
}

// SetUnconfirmedGauge sets the gauge for OCR2-shaped unconfirmed txs for TXM v2 (optional / phase 3).
func SetUnconfirmedGauge(ctx context.Context, chainID *big.Int, from common.Address, n int) {
ocr2TransmitMetricsInstance().SetUnconfirmedGauge(ctx, chainID, from, n)
}
47 changes: 47 additions & 0 deletions pkg/ocr2transmit/ocr2transmit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Package ocr2transmit detects OCR2 aggregator transmit calldata for metrics (DF-22761 / DF-22643).
package ocr2transmit

import (
"sync"

"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/libocr/gethwrappers2/ocr2aggregator"
)

var (
transmitMethodID []byte
transmitMethodIDOnce sync.Once
)

func transmitSig() []byte {
transmitMethodIDOnce.Do(func() {
parsed, err := ocr2aggregator.OCR2AggregatorMetaData.GetAbi()
if err != nil {
return
}
m, ok := parsed.Methods["transmit"]
if !ok {
return
}
transmitMethodID = m.ID
})
return transmitMethodID
}

// IsTransmitCalldata returns true if data begins with the OCR2Aggregator transmit function selector.
func IsTransmitCalldata(data []byte) bool {
sig := transmitSig()
if len(sig) != 4 || len(data) < 4 {
return false
}
return data[0] == sig[0] && data[1] == sig[1] && data[2] == sig[2] && data[3] == sig[3]
}

// ContractLabel returns the logical aggregator address for metrics: meta forwarder dest if set, else to address.
func ContractLabel(to common.Address, fwdrDest *common.Address) string {
if fwdrDest != nil && *fwdrDest != (common.Address{}) {
return fwdrDest.Hex()
}
return to.Hex()
}
31 changes: 31 additions & 0 deletions pkg/ocr2transmit/ocr2transmit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package ocr2transmit

import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/libocr/gethwrappers2/ocr2aggregator"
)

func TestIsTransmitCalldata(t *testing.T) {
parsed, err := ocr2aggregator.OCR2AggregatorMetaData.GetAbi()
require.NoError(t, err)
m := parsed.Methods["transmit"]
require.NotNil(t, m)

// Minimal non-empty args for selector match only (Pack may fail on full args); we only need first 4 bytes.
data := append([]byte{}, m.ID...)
require.True(t, IsTransmitCalldata(data))

require.False(t, IsTransmitCalldata([]byte{1, 2, 3}))
require.False(t, IsTransmitCalldata(nil))
}

func TestContractLabel(t *testing.T) {
to := common.HexToAddress("0x0000000000000000000000000000000000000001")
dest := common.HexToAddress("0x0000000000000000000000000000000000000002")
require.Equal(t, dest.Hex(), ContractLabel(to, &dest))
require.Equal(t, to.Hex(), ContractLabel(to, nil))
}
21 changes: 21 additions & 0 deletions pkg/txm/storage/inmemory_store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storage

import (
"context"
"fmt"
"math/big"
"sort"
Expand All @@ -11,6 +12,7 @@ import (
evmtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-evm/pkg/ocr2transmit"
"github.com/smartcontractkit/chainlink-evm/pkg/txm/types"
"github.com/smartcontractkit/chainlink-framework/chains/txmgr"
)
Expand Down Expand Up @@ -395,6 +397,12 @@ func (m *InMemoryStore) MarkTxFatal(txToMark *types.Transaction) error {
m.Lock()
defer m.Unlock()

var fwdr *common.Address
if meta, err := txToMark.GetMeta(); err == nil && meta != nil && meta.FwdrDestAddress != nil {
fwdr = meta.FwdrDestAddress
}
ocr2transmit.RecordOutcome(context.Background(), m.chainID, txToMark.FromAddress, txToMark.ToAddress, txToMark.Data, fwdr, "fatal")

// TODO: for now do the simple thing and drop the transaction instead of adding it to the fatal queue.
delete(m.UnconfirmedTransactions, *txToMark.Nonce)
delete(m.Transactions, txToMark.ID)
Expand All @@ -420,6 +428,19 @@ func (m *InMemoryStore) UpdateSignedAttempt(txID uint64, attemptID uint64, signe
return fmt.Errorf("attempt was not found for attemptID: %v", attemptID)
}

// CountOCR2UnconfirmedTransmit returns how many unconfirmed txs look like OCR2 transmit calls.
func (m *InMemoryStore) CountOCR2UnconfirmedTransmit() int {
m.RLock()
defer m.RUnlock()
n := 0
for _, tx := range m.UnconfirmedTransactions {
if ocr2transmit.IsTransmitCalldata(tx.Data) {
n++
}
}
return n
}

// Orchestrator
func (m *InMemoryStore) FindTxWithIdempotencyKey(idempotencyKey string) *types.Transaction {
m.RLock()
Expand Down
11 changes: 11 additions & 0 deletions pkg/txm/storage/inmemory_store_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
evmtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-evm/pkg/ocr2transmit"
"github.com/smartcontractkit/chainlink-evm/pkg/txm/types"
)

Expand Down Expand Up @@ -127,6 +128,7 @@ func (m *InMemoryStoreManager) DeleteAttemptForUnconfirmedTx(_ context.Context,
}

func (m *InMemoryStoreManager) MarkTxFatal(_ context.Context, tx *types.Transaction, fromAddress common.Address) error {
// Add metric here to emit tx has failed
if store, exists := m.InMemoryStoreMap[fromAddress]; exists {
return store.MarkTxFatal(tx)
}
Expand All @@ -140,6 +142,15 @@ func (m *InMemoryStoreManager) UpdateSignedAttempt(_ context.Context, txID uint6
return fmt.Errorf(StoreNotFoundForAddress, fromAddress)
}

// RefreshOCR2UnconfirmedGauges updates ocr2_transmit_unconfirmed_tx_count for every managed from_address (TXM v2).
func (m *InMemoryStoreManager) RefreshOCR2UnconfirmedGauges() {
ctx := context.Background()
for addr, store := range m.InMemoryStoreMap {
n := store.CountOCR2UnconfirmedTransmit()
ocr2transmit.SetUnconfirmedGauge(ctx, m.chainID, addr, n)
}
}

func (m *InMemoryStoreManager) FindTxWithIdempotencyKey(_ context.Context, idempotencyKey string) (*types.Transaction, error) {
for _, store := range m.InMemoryStoreMap {
tx := store.FindTxWithIdempotencyKey(idempotencyKey)
Expand Down
15 changes: 15 additions & 0 deletions pkg/txm/txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
"github.com/smartcontractkit/chainlink-evm/pkg/keys"
"github.com/smartcontractkit/chainlink-evm/pkg/ocr2transmit"
"github.com/smartcontractkit/chainlink-evm/pkg/txm/storage"
"github.com/smartcontractkit/chainlink-evm/pkg/txm/types"
)

Expand Down Expand Up @@ -357,6 +359,8 @@ func (t *Txm) sendTransactionWithError(ctx context.Context, tx *types.Transactio
}

func (t *Txm) BackfillTransactions(ctx context.Context, address common.Address) error {
defer t.refreshOCR2UnconfirmedGauges()

latestNonce, err := t.client.NonceAt(ctx, address, nil)
if err != nil {
return err
Expand Down Expand Up @@ -440,6 +444,17 @@ func (t *Txm) extractMetrics(ctx context.Context, txs []*types.Transaction) []ui
if tx.InitialBroadcastAt != nil {
t.Metrics.RecordTimeUntilTxConfirmed(ctx, float64(time.Since(*tx.InitialBroadcastAt)))
}
var fwdr *common.Address
if meta, err := tx.GetMeta(); err == nil && meta != nil && meta.FwdrDestAddress != nil {
fwdr = meta.FwdrDestAddress
}
ocr2transmit.RecordOutcome(ctx, t.chainID, tx.FromAddress, tx.ToAddress, tx.Data, fwdr, "confirmed")
}
return confirmedTxIDs
}

func (t *Txm) refreshOCR2UnconfirmedGauges() {
if mgr, ok := t.txStore.(*storage.InMemoryStoreManager); ok {
mgr.RefreshOCR2UnconfirmedGauges()
}
}
4 changes: 4 additions & 0 deletions pkg/txmgr/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,10 @@ func (f *evmFinalizer) validateReceipt(ctx context.Context, receipt *types.Recei
}
// This might increment more than once e.g. in case of re-orgs going back and forth we might re-fetch the same receipt
f.metrics.IncrementNumRevertedTxs(ctx)
recordOCR2TransmitOutcomeV1(ctx, f.chainID, &attempt.Tx, "reverted")
} else {
f.metrics.IncrementNumSuccessfulTxs(ctx)
recordOCR2TransmitOutcomeV1(ctx, f.chainID, &attempt.Tx, "confirmed")
}

// This is only recording forwarded tx that were mined and have a status.
Expand Down Expand Up @@ -600,6 +602,8 @@ func (f *evmFinalizer) ProcessOldTxsWithoutReceipts(ctx context.Context, oldTxID
oldTx.Error = null.StringFrom(ErrCouldNotGetReceipt)
if err = f.txStore.UpdateTxFatalErrorAndDeleteAttempts(ctx, oldTx); err != nil {
errorList = append(errorList, fmt.Errorf("failed to mark tx with ID %d as fatal: %w", oldTx.ID, err))
} else {
recordOCR2TransmitOutcomeV1(ctx, f.chainID, oldTx, "fatal")
}
}
if len(errorList) > 0 {
Expand Down
25 changes: 25 additions & 0 deletions pkg/txmgr/ocr2_transmit_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package txmgr

import (
"context"
"math/big"

"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-evm/pkg/ocr2transmit"
)

func recordOCR2TransmitOutcomeV1(ctx context.Context, chainID *big.Int, tx *Tx, outcome string) {
if tx == nil {
return
}
var fwdr *common.Address
if meta, err := tx.GetMeta(); err == nil && meta != nil && meta.FwdrDestAddress != nil {
fwdr = meta.FwdrDestAddress
}
cid := chainID
if cid == nil {
cid = tx.ChainID
}
ocr2transmit.RecordOutcome(ctx, cid, tx.FromAddress, tx.ToAddress, tx.EncodedPayload, fwdr, outcome)
}
Loading