Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/beholder/beholdertest/beholder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
oteltracenoop "go.opentelemetry.io/otel/trace/noop"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"

Check failure on line 16 in pkg/beholder/beholdertest/beholder.go

View workflow job for this annotation

GitHub Actions / lint-module (.)

could not import github.com/smartcontractkit/chainlink-common/pkg/beholder (-: # github.com/smartcontractkit/chainlink-common/pkg/beholder
"github.com/smartcontractkit/chainlink-common/pkg/beholder/pb"
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
)

const (
Expand Down Expand Up @@ -192,3 +193,15 @@

return nil
}

func (e *assertMessageEmitter) BatchEmit(_ context.Context, messages []beholder.Message, _ ...beholder.BatchEmitOption) ([]*chipingress.PublishResult, error) {
e.t.Helper()

e.mu.Lock()
defer e.mu.Unlock()

e.msgs = append(e.msgs, messages...)

return nil, nil
}

54 changes: 39 additions & 15 deletions pkg/beholder/chip_ingress_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
"maps"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
"google.golang.org/protobuf/proto"
)

type ChipIngressEmitter struct {
client chipingress.Client
}

var _ Emitter = (*ChipIngressEmitter)(nil)

func NewChipIngressEmitter(client chipingress.Client) (Emitter, error) {

if client == nil {
Expand All @@ -26,35 +29,56 @@
}

func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
_, err := c.BatchEmit(ctx, []Message{
NewMessage(body, attrKVs...),
})
return err
}

sourceDomain, entityType, err := ExtractSourceAndType(attrKVs...)
if err != nil {
return err
func (c *ChipIngressEmitter) BatchEmit(ctx context.Context, messages []Message, options ...BatchEmitOption) ([]*chipingress.PublishResult, error) {
emitOpts := DefaultBatchEmitOptions
for _, opt := range options {
opt(&emitOpts)
}

event, err := chipingress.NewEvent(sourceDomain, entityType, body, newAttributes(attrKVs...))
if err != nil {
return err
events := make([]chipingress.CloudEvent, len(messages))
for i, msg := range messages {
sourceDomain, entityType, err := ExtractSourceAndType(msg.Attrs)
if err != nil {
return nil, err
}

event, err := chipingress.NewEvent(sourceDomain, entityType, msg.Body, msg.Attrs)
if err != nil {
return nil, err
}

events[i] = event
}

eventPb, err := chipingress.EventToProto(event)
eventPb, err := chipingress.EventsToBatch(events)
if err != nil {
return fmt.Errorf("failed to convert event to proto: %w", err)
return nil, fmt.Errorf("failed to convert event to proto: %w", err)
}

_, err = c.client.Publish(ctx, eventPb)
eventPb.Options = &chipingress.PublishOptions{

Check failure on line 64 in pkg/beholder/chip_ingress_emitter.go

View workflow job for this annotation

GitHub Actions / build-race-tests

undefined: chipingress.PublishOptions

Check failure on line 64 in pkg/beholder/chip_ingress_emitter.go

View workflow job for this annotation

GitHub Actions / build-race-tests

eventPb.Options undefined (type *chipingress.CloudEventBatch has no field or method Options)

Check failure on line 64 in pkg/beholder/chip_ingress_emitter.go

View workflow job for this annotation

GitHub Actions / build-test

undefined: chipingress.PublishOptions

Check failure on line 64 in pkg/beholder/chip_ingress_emitter.go

View workflow job for this annotation

GitHub Actions / build-test

eventPb.Options undefined (type *chipingress.CloudEventBatch has no field or method Options)

Check failure on line 64 in pkg/beholder/chip_ingress_emitter.go

View workflow job for this annotation

GitHub Actions / benchmark

undefined: chipingress.PublishOptions

Check failure on line 64 in pkg/beholder/chip_ingress_emitter.go

View workflow job for this annotation

GitHub Actions / benchmark

eventPb.Options undefined (type *chipingress.CloudEventBatch has no field or method Options)

Check failure on line 64 in pkg/beholder/chip_ingress_emitter.go

View workflow job for this annotation

GitHub Actions / lint-module (.)

undefined: chipingress.PublishOptions) (typecheck)

Check failure on line 64 in pkg/beholder/chip_ingress_emitter.go

View workflow job for this annotation

GitHub Actions / lint-module (.)

eventPb.Options undefined (type *chipingress.CloudEventBatch has no field or method Options)

Check failure on line 64 in pkg/beholder/chip_ingress_emitter.go

View workflow job for this annotation

GitHub Actions / lint-module (.)

undefined: chipingress.PublishOptions (typecheck)

Check failure on line 64 in pkg/beholder/chip_ingress_emitter.go

View workflow job for this annotation

GitHub Actions / lint-module (.)

eventPb.Options undefined (type *chipingress.CloudEventBatch has no field or method Options)
AllOrNothing: proto.Bool(emitOpts.AllOrNothing),
}

response, err := c.client.PublishBatch(ctx, eventPb)
if err != nil {
return err
return nil, err
}

if response == nil {
return nil, nil
}

return nil
return response.Results, nil
}

// ExtractSourceAndType extracts source domain and entity from the attributes
func ExtractSourceAndType(attrKVs ...any) (string, string, error) {

attributes := newAttributes(attrKVs...)

func ExtractSourceAndType(attributes Attributes) (string, string, error) {
var sourceDomain string
var entityType string

Expand Down
6 changes: 3 additions & 3 deletions pkg/beholder/chip_ingress_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestChipIngressEmit(t *testing.T) {
clientMock := mocks.NewClient(t)

clientMock.
On("Publish", mock.Anything, mock.Anything).
On("PublishBatch", mock.Anything, mock.Anything).
Return(nil, nil)

emitter, err := beholder.NewChipIngressEmitter(clientMock)
Expand All @@ -69,7 +69,7 @@ func TestChipIngressEmit(t *testing.T) {
clientMock := mocks.NewClient(t)

clientMock.
On("Publish", mock.Anything, mock.Anything).
On("PublishBatch", mock.Anything, mock.Anything).
Return(nil, assert.AnError)

emitter, err := beholder.NewChipIngressEmitter(clientMock)
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestExtractSourceAndType(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
domain, entity, err := beholder.ExtractSourceAndType(tt.attrs...)
domain, entity, err := beholder.ExtractSourceAndType(beholder.ExtractAttributes(tt.attrs))

if tt.wantErr {
if err == nil {
Expand Down
19 changes: 18 additions & 1 deletion pkg/beholder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,26 @@ import (

const defaultGRPCCompressor = "gzip"

type BatchEmitOptions struct {
AllOrNothing bool
}

var DefaultBatchEmitOptions = BatchEmitOptions{
AllOrNothing: true,
}

type BatchEmitOption = func(*BatchEmitOptions)

func WithAllOrNothing(v bool) BatchEmitOption {
return func(o *BatchEmitOptions) {
o.AllOrNothing = v
}
}

type Emitter interface {
// Sends message with bytes and attributes to OTel Collector
// Emit Sends message with bytes and attributes to OTel Collector
Emit(ctx context.Context, body []byte, attrKVs ...any) error
BatchEmit(ctx context.Context, messages []Message, options ...BatchEmitOption) ([]*chipingress.PublishResult, error)
io.Closer
}

Expand Down
17 changes: 12 additions & 5 deletions pkg/beholder/dual_source_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sync/atomic"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
)
Expand Down Expand Up @@ -56,28 +57,34 @@ func (d *DualSourceEmitter) Close() error {
}

func (d *DualSourceEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
_, err := d.BatchEmit(ctx, []Message{
NewMessage(body, attrKVs...),
})
return err
}

func (d *DualSourceEmitter) BatchEmit(ctx context.Context, messages []Message, options ...BatchEmitOption) ([]*chipingress.PublishResult, error) {
// Emit via OTLP first
if err := d.otelCollectorEmitter.Emit(ctx, body, attrKVs...); err != nil {
return err
if _, err := d.otelCollectorEmitter.BatchEmit(ctx, messages, options...); err != nil {
return nil, err
}

// Emit via chip ingress async
if err := d.wg.TryAdd(1); err != nil {
return err
return nil, err
}
go func(ctx context.Context) {
defer d.wg.Done()
var cancel context.CancelFunc
ctx, cancel = d.stopCh.Ctx(ctx)
defer cancel()

if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil {
if _, err := d.chipIngressEmitter.BatchEmit(ctx, messages, options...); err != nil {
// If the chip ingress emitter fails, we ONLY log the error
// because we still want to send the data to the OTLP collector and not cause disruption
d.log.Infof("failed to emit to chip ingress: %v", err)
}
}(context.WithoutCancel(ctx))

return nil
return nil, nil
}
13 changes: 13 additions & 0 deletions pkg/beholder/dual_source_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
)

func TestNewDualSourceEmitter(t *testing.T) {
Expand Down Expand Up @@ -89,3 +90,15 @@ func (m *mockEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) err
}
return nil
}

func (m *mockEmitter) BatchEmit(ctx context.Context, messages []beholder.Message, _ ...beholder.BatchEmitOption) ([]*chipingress.PublishResult, error) {
if m.emitFunc != nil {
for _, msg := range messages {
if err := m.emitFunc(ctx, msg.Body); err != nil {
return nil, err
}
}
}
return nil, nil
}

4 changes: 4 additions & 0 deletions pkg/beholder/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ func newAttributes(attrKVs ...any) Attributes {
case Attributes:
maps.Copy(a, t)
i++
case []any:
// Treat a []any element as if its contents were passed directly.
maps.Copy(a, newAttributes(t...))
i++
case string:
if i+1 >= l {
break
Expand Down
19 changes: 14 additions & 5 deletions pkg/beholder/message_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package beholder
import (
"context"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
otellog "go.opentelemetry.io/otel/log"
)

Expand All @@ -22,10 +23,18 @@ func (e messageEmitter) Close() error { return nil }
// Emits logs the message, but does not wait for the message to be processed.
// Open question: what are pros/cons for using use map[]any vs use otellog.KeyValue
func (e messageEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
message := NewMessage(body, attrKVs...)
if err := message.Validate(); err != nil {
return err
_, err := e.BatchEmit(ctx, []Message{
NewMessage(body, attrKVs...),
})
return err
}

func (e messageEmitter) BatchEmit(ctx context.Context, messages []Message, options ...BatchEmitOption) ([]*chipingress.PublishResult, error) {
for _, message := range messages {
if err := message.Validate(); err != nil {
return nil, err
}
e.messageLogger.Emit(ctx, message.OtelRecord())
}
e.messageLogger.Emit(ctx, message.OtelRecord())
return nil
return nil, nil
}
5 changes: 5 additions & 0 deletions pkg/beholder/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func (e noopMessageEmitter) Close() error { return nil }
func (noopMessageEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
return nil
}

func (noopMessageEmitter) BatchEmit(ctx context.Context, messages []Message, options ...BatchEmitOption) ([]*chipingress.PublishResult, error) {
return nil, nil
}

func (noopMessageEmitter) EmitMessage(ctx context.Context, message Message) error {
return nil
}
Expand Down
Loading
Loading