Skip to content
Merged
68 changes: 61 additions & 7 deletions pkg/capabilities/base_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type BaseTriggerMetrics interface {
IncInboxFull(triggerID string)
EmitUndeliveredWarning(triggerID, eventID string)
EmitUndeliveredCritical(triggerID, eventID string)
// IncAckError counts ACK paths that return an error (e.g. store delete failure). reason is a stable identifier for dashboards.
IncAckError(reason string)
// IncAckMemoryOutcome records how an ACK related to the in-memory pending map: hit, miss_no_trigger_bucket, miss_no_event, miss_nil_record.
IncAckMemoryOutcome(outcome string)
}

type undeliveredState struct {
Expand Down Expand Up @@ -192,8 +196,12 @@ func (b *BaseTriggerCapability[T]) DeliverEvent(
}

if err := b.store.Insert(ctx, rec); err != nil {
b.lggr.Errorw("base trigger failed to persist pending event",
"capabilityID", b.capabilityId, "triggerID", triggerID, "eventID", te.ID, "err", err)
return err
}
b.lggr.Infow("base trigger persisted pending event for ACK tracking",
"capabilityID", b.capabilityId, "triggerID", triggerID, "eventID", te.ID)

b.mu.Lock()
if b.pending[triggerID] == nil {
Expand Down Expand Up @@ -236,27 +244,45 @@ func (b *BaseTriggerCapability[T]) sendToInbox(triggerID, eventID string, payloa
func (b *BaseTriggerCapability[T]) AckEvent(ctx context.Context, triggerId string, eventId string) error {
b.lggr.Infow("Event ACK", "triggerID", triggerId, "eventID", eventId)
if !b.retransmitEnabled() {
b.lggr.Debugw("base trigger ACK skipped (retransmit disabled, no persistence/ACK tracking)",
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId)
b.metrics.IncAckMemoryOutcome("skipped_retransmit_disabled")
return nil
}

var (
attempts int
firstAt time.Time
found bool
attempts int
firstAt time.Time
found bool
hadTriggerBucket bool
hadEventKey bool
hadNilPendingRecord bool
)

b.mu.Lock()
if eventsForTrigger, ok := b.pending[triggerId]; ok && eventsForTrigger != nil {
if rec, recOk := eventsForTrigger[eventId]; recOk && rec != nil {
eventsForTrigger, ok := b.pending[triggerId]
hadTriggerBucket = ok && eventsForTrigger != nil
if hadTriggerBucket {
rec, recOk := eventsForTrigger[eventId]
hadEventKey = recOk
switch {
case recOk && rec != nil:
attempts = rec.Attempts
firstAt = rec.FirstAt
found = true
case recOk && rec == nil:
hadNilPendingRecord = true
b.metrics.IncAckMemoryOutcome("miss_nil_record")
default:
b.metrics.IncAckMemoryOutcome("miss_no_event")
}

delete(eventsForTrigger, eventId)
if len(eventsForTrigger) == 0 {
delete(b.pending, triggerId)
}
} else {
b.metrics.IncAckMemoryOutcome("miss_no_trigger_bucket")
}

if m, ok := b.undeliveredAlertStates[triggerId]; ok {
Expand All @@ -267,12 +293,40 @@ func (b *BaseTriggerCapability[T]) AckEvent(ctx context.Context, triggerId strin
}
b.mu.Unlock()

if found {
switch {
case found:
b.lggr.Infow("base trigger ACK matched in-memory pending event",
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId,
"attempts", attempts, "firstAt", firstAt)
b.metrics.IncAckMemoryOutcome("hit")
b.metrics.IncAck(triggerId, eventId)
b.metrics.ObserveTimeToAck(triggerId, eventId, time.Since(firstAt), attempts)
case hadNilPendingRecord:
b.lggr.Warnw("base trigger ACK: pending map had nil record for event (treating as miss; reconciling store)",
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId)
case hadTriggerBucket && !hadEventKey:
b.lggr.Infow("base trigger ACK: event id not in in-memory pending map for trigger (may exist only in store; reconciling)",
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId)
case !hadTriggerBucket:
Copy link
Copy Markdown
Contributor

@pavel-raykov pavel-raykov Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly, here - can't you just output "hadTriggerBucket", hadTriggerBucket (and other params) in the Infow params?

b.lggr.Infow("base trigger ACK: no in-memory pending bucket for trigger (not pending here; still deleting from store if row exists)",
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId)
}

return b.store.DeleteEvent(ctx, triggerId, eventId)
if err := b.store.DeleteEvent(ctx, triggerId, eventId); err != nil {
b.lggr.Errorw("base trigger ACK failed to delete event from store",
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId,
"foundInMemory", found, "err", err)
b.metrics.IncAckError("store_delete_failed")
return err
}
if found {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, why not just add found to Infow? I.e., Infow(,..., "found", found).

b.lggr.Debugw("base trigger ACK store delete succeeded",
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId)
} else {
b.lggr.Infow("base trigger ACK store delete succeeded (memory miss path; store row removed if present)",
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId)
}
return nil
}

func (b *BaseTriggerCapability[T]) retransmitLoop() {
Expand Down
61 changes: 61 additions & 0 deletions pkg/capabilities/base_trigger_cre.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package capabilities

import (
"context"
"fmt"
"time"

"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/settings"
"github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings"
)

// ResolveBaseTriggerRetryInterval returns the retransmit ticker interval for [BaseTriggerCapability].
// When [cresettings.Default.BaseTriggerRetransmitEnabled] is false, it returns (0, nil) so the base
// trigger delivers fire-and-forget without persistence or ACK tracking.
// When enabled, [cresettings.Default.BaseTriggerRetryInterval] must be positive.
func ResolveBaseTriggerRetryInterval(ctx context.Context, g settings.Getter, lggr logger.Logger) (retryInterval time.Duration, err error) {
enabled, gerr := cresettings.Default.BaseTriggerRetransmitEnabled.GetOrDefault(ctx, g)
if gerr != nil {
lggr.Errorw("CRE settings read failed for base trigger retransmit flag; using default", "err", gerr)
}
if !enabled {
return 0, nil
}
retryInterval, gerr = cresettings.Default.BaseTriggerRetryInterval.GetOrDefault(ctx, g)
if gerr != nil {
lggr.Errorw("CRE settings read failed for base trigger retry interval; using default", "err", gerr)
}
if retryInterval <= 0 {
return 0, fmt.Errorf(
"BaseTriggerRetransmitEnabled is true but BaseTriggerRetryInterval must be positive (got %s)",
retryInterval,
)
}
return retryInterval, nil
}

// NewBaseTriggerCapabilityWithCRESettings builds a [BaseTriggerCapability] using global CRE settings
// for retransmit enablement and interval. Undelivered warning/critical thresholds are derived from
// the resolved interval when retransmit is enabled.
func NewBaseTriggerCapabilityWithCRESettings[T proto.Message](
ctx context.Context,
store EventStore,
newMsg func() T,
lggr logger.Logger,
capabilityID string,
getter settings.Getter,
) (*BaseTriggerCapability[T], error) {
retry, err := ResolveBaseTriggerRetryInterval(ctx, getter, lggr)
if err != nil {
return nil, err
}
var undeliveredWarning, undeliveredCritical time.Duration
if retry > 0 {
undeliveredWarning = 5 * retry
undeliveredCritical = 20 * retry
}
return NewBaseTriggerCapability(store, newMsg, lggr, capabilityID, retry, undeliveredWarning, undeliveredCritical), nil
}
32 changes: 32 additions & 0 deletions pkg/capabilities/base_trigger_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type BaseTriggerBeholderMetrics struct {
capabilityID string
retryCount metric.Int64Counter
ackCount metric.Int64Counter
ackErrorCount metric.Int64Counter
ackMemoryOutcomeCount metric.Int64Counter
inboxMissingCount metric.Int64Counter
inboxFullCount metric.Int64Counter
undeliveredWarningCount metric.Int64Counter
Expand All @@ -34,6 +36,14 @@ func NewBaseTriggerBeholderMetrics(capabilityID string) (BaseTriggerMetrics, err
if err != nil {
return nil, err
}
ackErrorCount, err := beholder.GetMeter().Int64Counter("capabilities_base_trigger_ack_error_total")
if err != nil {
return nil, err
}
ackMemoryOutcomeCount, err := beholder.GetMeter().Int64Counter("capabilities_base_trigger_ack_memory_outcome_total")
if err != nil {
return nil, err
}
inboxMissingCount, err := beholder.GetMeter().Int64Counter("capabilities_base_trigger_inbox_missing_total")
if err != nil {
return nil, err
Expand Down Expand Up @@ -69,6 +79,8 @@ func NewBaseTriggerBeholderMetrics(capabilityID string) (BaseTriggerMetrics, err
capabilityID: capabilityID,
retryCount: retryCount,
ackCount: ackCount,
ackErrorCount: ackErrorCount,
ackMemoryOutcomeCount: ackMemoryOutcomeCount,
inboxMissingCount: inboxMissingCount,
inboxFullCount: inboxFullCount,
undeliveredWarningCount: undeliveredWarningCount,
Expand Down Expand Up @@ -111,6 +123,24 @@ func (m *BaseTriggerBeholderMetrics) IncAck(triggerID, eventID string) {
)
}

func (m *BaseTriggerBeholderMetrics) IncAckError(reason string) {
m.ackErrorCount.Add(context.Background(), 1,
metric.WithAttributes(
attribute.String("capability_id", m.capabilityID),
attribute.String("reason", reason),
),
)
}

func (m *BaseTriggerBeholderMetrics) IncAckMemoryOutcome(outcome string) {
m.ackMemoryOutcomeCount.Add(context.Background(), 1,
metric.WithAttributes(
attribute.String("capability_id", m.capabilityID),
attribute.String("outcome", outcome),
),
)
}

func (m *BaseTriggerBeholderMetrics) ObserveTimeToAck(triggerID, eventID string, d time.Duration, attempts int) {
m.timeToAckMs.Record(context.Background(), d.Milliseconds(),
metric.WithAttributes(m.attrs(triggerID, eventID)...),
Expand Down Expand Up @@ -163,3 +193,5 @@ func (noopBaseTriggerMetrics) IncInboxMissing(string)
func (noopBaseTriggerMetrics) IncInboxFull(string) {}
func (noopBaseTriggerMetrics) EmitUndeliveredWarning(string, string) {}
func (noopBaseTriggerMetrics) EmitUndeliveredCritical(string, string) {}
func (noopBaseTriggerMetrics) IncAckError(string) {}
func (noopBaseTriggerMetrics) IncAckMemoryOutcome(string) {}
52 changes: 52 additions & 0 deletions pkg/capabilities/base_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,60 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/settings"
)

func TestResolveBaseTriggerRetryInterval(t *testing.T) {
lggr, err := logger.New()
require.NoError(t, err)
ctx := context.Background()

t.Run("nil getter uses defaults", func(t *testing.T) {
d, err := ResolveBaseTriggerRetryInterval(ctx, nil, lggr)
require.NoError(t, err)
require.Zero(t, d, "default BaseTriggerRetransmitEnabled is false, so retry interval is disabled")
})

t.Run("global JSON enables interval", func(t *testing.T) {
getter, err := settings.NewJSONGetter([]byte(`{
"global": {
"BaseTriggerRetransmitEnabled": "true",
"BaseTriggerRetryInterval": "7s"
}
}`))
require.NoError(t, err)
d, err := ResolveBaseTriggerRetryInterval(ctx, getter, lggr)
require.NoError(t, err)
require.Equal(t, 7*time.Second, d)
})

t.Run("disabled returns zero", func(t *testing.T) {
getter, err := settings.NewJSONGetter([]byte(`{
"global": {
"BaseTriggerRetransmitEnabled": "false",
"BaseTriggerRetryInterval": "7s"
}
}`))
require.NoError(t, err)
d, err := ResolveBaseTriggerRetryInterval(ctx, getter, lggr)
require.NoError(t, err)
require.Zero(t, d)
})

t.Run("enabled with zero interval errors", func(t *testing.T) {
getter, err := settings.NewJSONGetter([]byte(`{
"global": {
"BaseTriggerRetransmitEnabled": "true",
"BaseTriggerRetryInterval": "0s"
}
}`))
require.NoError(t, err)
_, err = ResolveBaseTriggerRetryInterval(ctx, getter, lggr)
require.Error(t, err)
require.Contains(t, err.Error(), "BaseTriggerRetryInterval must be positive")
})
}

func newBase(t *testing.T, store EventStore) *BaseTriggerCapability[*wrapperspb.BytesValue] {
return newBaseWithRetransmit(t, store, 100*time.Millisecond)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/settings/cresettings/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ flowchart
PerWorkflow.LogTrigger.FilterAddressLimit{{FilterAddressLimit}}:::bound
PerWorkflow.LogTrigger.FilterTopicsPerSlotLimit{{FilterTopicsPerSlotLimit}}:::bound
end
subgraph EVMLogTriggerCapability[EVM log trigger capability startup]
BaseTriggerRetransmitEnabled[/BaseTriggerRetransmitEnabled\]:::gate
BaseTriggerRetryInterval>BaseTriggerRetryInterval]:::time
end
end

subgraph Engine.handleAllTriggerEvents
Expand Down
2 changes: 2 additions & 0 deletions pkg/settings/cresettings/defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"GatewayHTTPGlobalRate": "500rps:500",
"GatewayHTTPPerNodeRate": "100rps:100",
"TriggerRegistrationStatusUpdateTimeout": "0s",
"BaseTriggerRetransmitEnabled": "false",
"BaseTriggerRetryInterval": "30s",
"VaultCiphertextSizeLimit": "2kb",
"VaultShareSizeLimit": "600b",
"VaultIdentifierKeySizeLimit": "64b",
Expand Down
2 changes: 2 additions & 0 deletions pkg/settings/cresettings/defaults.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ VaultOrgIdAsSecretOwnerEnabled = 'false'
GatewayHTTPGlobalRate = '500rps:500'
GatewayHTTPPerNodeRate = '100rps:100'
TriggerRegistrationStatusUpdateTimeout = '0s'
BaseTriggerRetransmitEnabled = 'false'
BaseTriggerRetryInterval = '30s'
VaultCiphertextSizeLimit = '2kb'
VaultShareSizeLimit = '600b'
VaultIdentifierKeySizeLimit = '64b'
Expand Down
6 changes: 6 additions & 0 deletions pkg/settings/cresettings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ var Default = Schema{
GatewayHTTPGlobalRate: Rate(rate.Limit(500), 500),
GatewayHTTPPerNodeRate: Rate(rate.Limit(100), 100),
TriggerRegistrationStatusUpdateTimeout: Duration(0 * time.Second),
BaseTriggerRetransmitEnabled: Bool(false),
BaseTriggerRetryInterval: Duration(30 * time.Second),

// DANGER(cedric): Be extremely careful changing these vault limits as they act as a default value
// used by the Vault OCR plugin -- changing these values could cause issues with the plugin during an image
// upgrade as nodes apply the old and new values inconsistently. A safe upgrade path
Expand Down Expand Up @@ -233,6 +236,9 @@ type Schema struct {
GatewayHTTPPerNodeRate Setting[config.Rate]
TriggerRegistrationStatusUpdateTimeout Setting[time.Duration]

BaseTriggerRetransmitEnabled Setting[bool]
BaseTriggerRetryInterval Setting[time.Duration]

VaultCiphertextSizeLimit Setting[config.Size]
VaultShareSizeLimit Setting[config.Size]
VaultIdentifierKeySizeLimit Setting[config.Size]
Expand Down
Loading