diff --git a/core/services/ocr2/plugins/vault/kvstore.go b/core/services/ocr2/plugins/vault/kvstore.go index 466101e90a5..1e27d263d10 100644 --- a/core/services/ocr2/plugins/vault/kvstore.go +++ b/core/services/ocr2/plugins/vault/kvstore.go @@ -1,9 +1,11 @@ package vault import ( + "context" "errors" "fmt" "strconv" + "time" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" "google.golang.org/protobuf/proto" @@ -20,38 +22,44 @@ const ( ) type KVStore struct { - reader ocr3_1types.KeyValueStateReader - writer ocr3_1types.KeyValueStateReadWriter + reader ocr3_1types.KeyValueStateReader + writer ocr3_1types.KeyValueStateReadWriter + metrics *pluginMetrics +} + +func (s *KVStore) trackDuration(ctx context.Context, method string, start time.Time) { + s.metrics.trackKVOperation(ctx, method, time.Since(start).Milliseconds()) } type ReadKVStore interface { - GetSecret(id *vault.SecretIdentifier) (*vault.StoredSecret, error) - GetMetadata(owner string) (*vault.StoredMetadata, error) - GetSecretIdentifiersCountForOwner(owner string) (int, error) - GetPendingQueue() ([]*vault.StoredPendingQueueItem, error) + GetSecret(ctx context.Context, id *vault.SecretIdentifier) (*vault.StoredSecret, error) + GetMetadata(ctx context.Context, owner string) (*vault.StoredMetadata, error) + GetSecretIdentifiersCountForOwner(ctx context.Context, owner string) (int, error) + GetPendingQueue(ctx context.Context) ([]*vault.StoredPendingQueueItem, error) } type WriteKVStore interface { ReadKVStore - WriteSecret(id *vault.SecretIdentifier, secret *vault.StoredSecret) error - WriteMetadata(owner string, metadata *vault.StoredMetadata) error - DeleteSecret(id *vault.SecretIdentifier) error - WritePendingQueue(pending []*vault.StoredPendingQueueItem) error + WriteSecret(ctx context.Context, id *vault.SecretIdentifier, secret *vault.StoredSecret) error + WriteMetadata(ctx context.Context, owner string, metadata *vault.StoredMetadata) error + DeleteSecret(ctx context.Context, id *vault.SecretIdentifier) error + WritePendingQueue(ctx context.Context, pending []*vault.StoredPendingQueueItem) error } -func NewReadStore(reader ocr3_1types.KeyValueStateReader) *KVStore { - return &KVStore{reader: reader} +func NewReadStore(reader ocr3_1types.KeyValueStateReader, metrics *pluginMetrics) *KVStore { + return &KVStore{reader: reader, metrics: metrics} } -func NewWriteStore(writer ocr3_1types.KeyValueStateReadWriter) *KVStore { - return &KVStore{reader: writer, writer: writer} +func NewWriteStore(writer ocr3_1types.KeyValueStateReadWriter, metrics *pluginMetrics) *KVStore { + return &KVStore{reader: writer, writer: writer, metrics: metrics} } -func (s *KVStore) GetSecret(id *vault.SecretIdentifier) (*vault.StoredSecret, error) { +func (s *KVStore) GetSecret(ctx context.Context, id *vault.SecretIdentifier) (*vault.StoredSecret, error) { + defer s.trackDuration(ctx, "GetSecret", time.Now()) if id == nil { return nil, errors.New("id cannot be nil") } - found, err := s.metadataContainsID(id) + found, err := s.metadataContainsID(ctx, id) if err != nil { return nil, fmt.Errorf("failed to check if metadata contains id: %w", err) } @@ -77,7 +85,8 @@ func (s *KVStore) GetSecret(id *vault.SecretIdentifier) (*vault.StoredSecret, er return secret, nil } -func (s *KVStore) GetMetadata(owner string) (*vault.StoredMetadata, error) { +func (s *KVStore) GetMetadata(ctx context.Context, owner string) (*vault.StoredMetadata, error) { + defer s.trackDuration(ctx, "GetMetadata", time.Now()) b, err := s.reader.Read([]byte(metadataPrefix + owner)) if err != nil { return nil, fmt.Errorf("failed to read metadata: %w", err) @@ -95,8 +104,9 @@ func (s *KVStore) GetMetadata(owner string) (*vault.StoredMetadata, error) { return md, nil } -func (s *KVStore) GetSecretIdentifiersCountForOwner(owner string) (int, error) { - md, err := s.GetMetadata(owner) +func (s *KVStore) GetSecretIdentifiersCountForOwner(ctx context.Context, owner string) (int, error) { + defer s.trackDuration(ctx, "GetSecretIdentifiersCountForOwner", time.Now()) + md, err := s.GetMetadata(ctx, owner) if err != nil { return 0, fmt.Errorf("failed to get metadata for owner %s: %w", owner, err) } @@ -108,7 +118,8 @@ func (s *KVStore) GetSecretIdentifiersCountForOwner(owner string) (int, error) { return count, nil } -func (s *KVStore) WriteMetadata(owner string, metadata *vault.StoredMetadata) error { +func (s *KVStore) WriteMetadata(ctx context.Context, owner string, metadata *vault.StoredMetadata) error { + defer s.trackDuration(ctx, "WriteMetadata", time.Now()) if metadata == nil { return errors.New("metadata cannot be nil") } @@ -125,11 +136,11 @@ func (s *KVStore) WriteMetadata(owner string, metadata *vault.StoredMetadata) er return nil } -func (s *KVStore) metadataContainsID(id *vault.SecretIdentifier) (bool, error) { +func (s *KVStore) metadataContainsID(ctx context.Context, id *vault.SecretIdentifier) (bool, error) { if id == nil { return false, errors.New("id cannot be nil") } - md, err := s.GetMetadata(id.Owner) + md, err := s.GetMetadata(ctx, id.Owner) if err != nil { return false, fmt.Errorf("failed to get metadata for owner %s: %w", id.Owner, err) } @@ -147,11 +158,11 @@ func (s *KVStore) metadataContainsID(id *vault.SecretIdentifier) (bool, error) { return false, nil } -func (s *KVStore) addIDToMetadata(id *vault.SecretIdentifier) error { +func (s *KVStore) addIDToMetadata(ctx context.Context, id *vault.SecretIdentifier) error { if id == nil { return errors.New("id cannot be nil") } - md, err := s.GetMetadata(id.Owner) + md, err := s.GetMetadata(ctx, id.Owner) if err != nil { return fmt.Errorf("failed to get metadata for owner %s: %w", id.Owner, err) } @@ -171,7 +182,7 @@ func (s *KVStore) addIDToMetadata(id *vault.SecretIdentifier) error { md.SecretIdentifiers = append(md.SecretIdentifiers, id) } - err = s.WriteMetadata(id.Owner, md) + err = s.WriteMetadata(ctx, id.Owner, md) if err != nil { return fmt.Errorf("failed to write metadata for owner %s: %w", id.Owner, err) } @@ -179,11 +190,11 @@ func (s *KVStore) addIDToMetadata(id *vault.SecretIdentifier) error { return nil } -func (s *KVStore) removeIDFromMetadata(id *vault.SecretIdentifier) error { +func (s *KVStore) removeIDFromMetadata(ctx context.Context, id *vault.SecretIdentifier) error { if id == nil { return errors.New("id cannot be nil") } - md, err := s.GetMetadata(id.Owner) + md, err := s.GetMetadata(ctx, id.Owner) if err != nil { return fmt.Errorf("failed to get metadata for owner %s: %w", id.Owner, err) } @@ -209,7 +220,7 @@ func (s *KVStore) removeIDFromMetadata(id *vault.SecretIdentifier) error { newMd := &vault.StoredMetadata{ SecretIdentifiers: si, } - err = s.WriteMetadata(id.Owner, newMd) + err = s.WriteMetadata(ctx, id.Owner, newMd) if err != nil { return fmt.Errorf("failed to write metadata for owner %s: %w", id.Owner, err) } @@ -217,7 +228,8 @@ func (s *KVStore) removeIDFromMetadata(id *vault.SecretIdentifier) error { return nil } -func (s *KVStore) WriteSecret(id *vault.SecretIdentifier, secret *vault.StoredSecret) error { +func (s *KVStore) WriteSecret(ctx context.Context, id *vault.SecretIdentifier, secret *vault.StoredSecret) error { + defer s.trackDuration(ctx, "WriteSecret", time.Now()) if id == nil { return errors.New("id cannot be nil") } @@ -231,18 +243,19 @@ func (s *KVStore) WriteSecret(id *vault.SecretIdentifier, secret *vault.StoredSe return fmt.Errorf("failed to write secret: %w", err) } - if err := s.addIDToMetadata(id); err != nil { + if err := s.addIDToMetadata(ctx, id); err != nil { return fmt.Errorf("failed to add id to metadata: %w", err) } return nil } -func (s *KVStore) DeleteSecret(id *vault.SecretIdentifier) error { +func (s *KVStore) DeleteSecret(ctx context.Context, id *vault.SecretIdentifier) error { + defer s.trackDuration(ctx, "DeleteSecret", time.Now()) if id == nil { return errors.New("id cannot be nil") } - err := s.removeIDFromMetadata(id) + err := s.removeIDFromMetadata(ctx, id) if err != nil { return fmt.Errorf("failed to remove id from metadata: %w", err) } @@ -255,7 +268,8 @@ func (s *KVStore) DeleteSecret(id *vault.SecretIdentifier) error { return nil } -func (s *KVStore) GetPendingQueue() ([]*vault.StoredPendingQueueItem, error) { +func (s *KVStore) GetPendingQueue(ctx context.Context) ([]*vault.StoredPendingQueueItem, error) { + defer s.trackDuration(ctx, "GetPendingQueue", time.Now()) indexBytes, err := s.reader.Read([]byte(pendingQueueIndex)) if err != nil { return nil, fmt.Errorf("failed to read pending queue index: %w", err) @@ -320,7 +334,8 @@ func (s *KVStore) deletePendingQueue() error { return nil } -func (s *KVStore) WritePendingQueue(pending []*vault.StoredPendingQueueItem) error { +func (s *KVStore) WritePendingQueue(ctx context.Context, pending []*vault.StoredPendingQueueItem) error { + defer s.trackDuration(ctx, "WritePendingQueue", time.Now()) err := s.deletePendingQueue() if err != nil { return fmt.Errorf("failed to delete pending requests: %w", err) diff --git a/core/services/ocr2/plugins/vault/kvstore_test.go b/core/services/ocr2/plugins/vault/kvstore_test.go index a98742f7122..391f4554e40 100644 --- a/core/services/ocr2/plugins/vault/kvstore_test.go +++ b/core/services/ocr2/plugins/vault/kvstore_test.go @@ -3,6 +3,7 @@ package vault import ( "context" "errors" + "sync" "testing" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" @@ -15,6 +16,27 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities/actions/vault" ) +func newTestMetrics(t *testing.T) *pluginMetrics { + t.Helper() + m, err := newPluginMetrics("test") + require.NoError(t, err) + return m +} + +func newTestWriteStore(t *testing.T, writer ocr3_1types.KeyValueStateReadWriter) *KVStore { + t.Helper() + m, err := newPluginMetrics("test") + require.NoError(t, err) + return NewWriteStore(writer, m) +} + +func newTestReadStore(t *testing.T, reader ocr3_1types.KeyValueStateReader) *KVStore { + t.Helper() + m, err := newPluginMetrics("test") + require.NoError(t, err) + return NewReadStore(reader, m) +} + type response struct { data []byte err error @@ -42,17 +64,22 @@ func (k *kv) Write(key []byte, data []byte) error { } type blobber struct { + mu sync.Mutex blobs [][]byte cnt int pendingIdx *int } func (b *blobber) BroadcastBlob(_ context.Context, data []byte, _ ocr3_1types.BlobExpirationHint) (ocr3_1types.BlobHandle, error) { + b.mu.Lock() + defer b.mu.Unlock() b.blobs = append(b.blobs, data) return ocr3_1types.BlobHandle{}, nil } func (b *blobber) FetchBlob(_ context.Context, _ ocr3_1types.BlobHandle) ([]byte, error) { + b.mu.Lock() + defer b.mu.Unlock() if b.pendingIdx != nil { return b.blobs[*b.pendingIdx], nil } @@ -80,7 +107,7 @@ func TestKVStore_Secrets(t *testing.T) { kv.m["Metadata::owner"] = response{ err: errors.New("not found"), } - store := NewWriteStore(kv) + store := newTestWriteStore(t, kv) id := &vault.SecretIdentifier{ Owner: "owner", @@ -88,7 +115,7 @@ func TestKVStore_Secrets(t *testing.T) { Key: "secret1", } - _, err := store.GetSecret(id) + _, err := store.GetSecret(t.Context(), id) require.ErrorContains(t, err, "not found") d, err := proto.Marshal(&vault.StoredSecret{ @@ -105,12 +132,12 @@ func TestKVStore_Secrets(t *testing.T) { kv.m["Metadata::owner"] = response{ data: d, } - s, err := store.GetSecret(id) + s, err := store.GetSecret(t.Context(), id) require.NoError(t, err) assert.Equal(t, s.EncryptedSecret, []byte("encrypted data")) delete(kv.m, "Metadata::owner") - s, err = store.GetSecret(id) + s, err = store.GetSecret(t.Context(), id) assert.Nil(t, s) require.NoError(t, err) @@ -118,10 +145,10 @@ func TestKVStore_Secrets(t *testing.T) { ss := &vault.StoredSecret{ EncryptedSecret: newData, } - err = store.WriteSecret(id, ss) + err = store.WriteSecret(t.Context(), id, ss) require.NoError(t, err) - s, err = store.GetSecret(id) + s, err = store.GetSecret(t.Context(), id) require.NoError(t, err) assert.Equal(t, newData, s.EncryptedSecret) } @@ -130,22 +157,22 @@ func TestKVStore_DeleteSecrets(t *testing.T) { kv := &kv{ m: make(map[string]response), } - store := NewWriteStore(kv) + store := newTestWriteStore(t, kv) id := &vault.SecretIdentifier{ Owner: "owner", Namespace: "main", Key: "secret1", } - err := store.WriteSecret(id, &vault.StoredSecret{ + err := store.WriteSecret(t.Context(), id, &vault.StoredSecret{ EncryptedSecret: []byte("encrypted data"), }) require.NoError(t, err) - err = store.DeleteSecret(id) + err = store.DeleteSecret(t.Context(), id) require.NoError(t, err) - md, err := store.GetMetadata("owner") + md, err := store.GetMetadata(t.Context(), "owner") require.NoError(t, err) assert.Empty(t, md.SecretIdentifiers) @@ -159,9 +186,9 @@ func TestKVStore_Metadata(t *testing.T) { kv.m["Metadata::"+owner] = response{ err: errors.New("not found"), } - store := NewWriteStore(kv) + store := newTestWriteStore(t, kv) - _, err := store.GetMetadata(owner) + _, err := store.GetMetadata(t.Context(), owner) require.ErrorContains(t, err, "not found") id := &vault.SecretIdentifier{ @@ -176,13 +203,13 @@ func TestKVStore_Metadata(t *testing.T) { kv.m["Metadata::owner"] = response{ data: d, } - m, err := store.GetMetadata(owner) + m, err := store.GetMetadata(t.Context(), owner) require.NoError(t, err) assert.Len(t, m.SecretIdentifiers, 1) assert.True(t, proto.Equal(m.SecretIdentifiers[0], id)) delete(kv.m, "Metadata::"+owner) - m, err = store.GetMetadata(owner) + m, err = store.GetMetadata(t.Context(), owner) assert.Nil(t, m) require.NoError(t, err) @@ -200,10 +227,10 @@ func TestKVStore_Metadata(t *testing.T) { }, }, } - err = store.WriteMetadata(owner, m) + err = store.WriteMetadata(t.Context(), owner, m) require.NoError(t, err) - gotM, err := store.GetMetadata(owner) + gotM, err := store.GetMetadata(t.Context(), owner) require.NoError(t, err) assert.True(t, proto.Equal(m, gotM)) @@ -212,10 +239,10 @@ func TestKVStore_Metadata(t *testing.T) { Namespace: "main", Key: "secret3", } - err = store.addIDToMetadata(newKey) + err = store.addIDToMetadata(t.Context(), newKey) require.NoError(t, err) - gotM, err = store.GetMetadata(owner) + gotM, err = store.GetMetadata(t.Context(), owner) require.NoError(t, err) assert.Len(t, gotM.SecretIdentifiers, 2) } @@ -225,7 +252,7 @@ func TestKVStore_Metadata_Delete(t *testing.T) { kv := &kv{ m: make(map[string]response), } - store := NewWriteStore(kv) + store := newTestWriteStore(t, kv) id := &vault.SecretIdentifier{ Owner: "owner", @@ -240,20 +267,20 @@ func TestKVStore_Metadata_Delete(t *testing.T) { data: d, } - err = store.removeIDFromMetadata(id) + err = store.removeIDFromMetadata(t.Context(), id) require.NoError(t, err) - m, err := store.GetMetadata(owner) + m, err := store.GetMetadata(t.Context(), owner) require.NoError(t, err) assert.Empty(t, m.SecretIdentifiers) - err = store.removeIDFromMetadata(id) + err = store.removeIDFromMetadata(t.Context(), id) require.ErrorContains(t, err, "not found in metadata for owner owner") delete(kv.m, "Metadata::owner") - err = store.removeIDFromMetadata(id) + err = store.removeIDFromMetadata(t.Context(), id) require.ErrorContains(t, err, "no metadata found for owner owner") } @@ -261,7 +288,7 @@ func TestKVStore_InconsistentWrites(t *testing.T) { kv := &kv{ m: make(map[string]response), } - store := NewWriteStore(kv) + store := newTestWriteStore(t, kv) id := &vault.SecretIdentifier{ Owner: "owner", @@ -283,7 +310,7 @@ func TestKVStore_InconsistentWrites(t *testing.T) { kv.m["Metadata::owner"] = response{ data: d, } - s, err := store.GetSecret(id) + s, err := store.GetSecret(t.Context(), id) require.NoError(t, err) assert.Equal(t, s.EncryptedSecret, []byte("encrypted data")) @@ -292,21 +319,21 @@ func TestKVStore_InconsistentWrites(t *testing.T) { delete(kv.m, "Metadata::owner") // Now fetching the secret should fail - s, err = store.GetSecret(id) + s, err = store.GetSecret(t.Context(), id) assert.Nil(t, s) require.NoError(t, err) // We can recreate it without an already exists error. - err = store.WriteSecret(id, &vault.StoredSecret{ + err = store.WriteSecret(t.Context(), id, &vault.StoredSecret{ EncryptedSecret: []byte("encrypted data 2"), }) require.NoError(t, err) - md, err := store.GetMetadata("owner") + md, err := store.GetMetadata(t.Context(), "owner") require.NoError(t, err) assert.Len(t, md.SecretIdentifiers, 1) - s, err = store.GetSecret(id) + s, err = store.GetSecret(t.Context(), id) assert.NotNil(t, s) require.NoError(t, err) @@ -318,10 +345,10 @@ func TestKVStore_GetPendingRequests(t *testing.T) { kv := &kv{ m: make(map[string]response), } - store := NewWriteStore(kv) + store := newTestWriteStore(t, kv) // Expect no pending requests on empty store. - requests, err := store.GetPendingQueue() + requests, err := store.GetPendingQueue(t.Context()) require.NoError(t, err) assert.Empty(t, requests) @@ -350,7 +377,7 @@ func TestKVStore_GetPendingRequests(t *testing.T) { kv.m[pendingQueueIndex] = response{data: indexBytes} // Validate retrieval of one pending request. - requests, err = store.GetPendingQueue() + requests, err = store.GetPendingQueue(t.Context()) require.NoError(t, err) assert.Len(t, requests, 2) assert.Equal(t, "test-request-id-123", requests[0].Id) @@ -358,7 +385,7 @@ func TestKVStore_GetPendingRequests(t *testing.T) { // Validate behaviour when the index item is missing. delete(kv.m, pendingQueueIndex) - requests, err = store.GetPendingQueue() + requests, err = store.GetPendingQueue(t.Context()) require.NoError(t, err) assert.Empty(t, requests) @@ -368,7 +395,7 @@ func TestKVStore_GetPendingRequests(t *testing.T) { require.NoError(t, err) kv.m[pendingQueueIndex] = response{data: indexBytes} - requests, err = store.GetPendingQueue() + requests, err = store.GetPendingQueue(t.Context()) require.ErrorContains(t, err, "pending queue item at index 2 not found") assert.Empty(t, requests) } @@ -378,7 +405,7 @@ func TestKVStore_WritePendingRequests(t *testing.T) { kv := &kv{ m: make(map[string]response), } - store := NewWriteStore(kv) + store := newTestWriteStore(t, kv) // Writing mock pending requests. empty, err := anypb.New(&emptypb.Empty{}) @@ -395,7 +422,7 @@ func TestKVStore_WritePendingRequests(t *testing.T) { Id: "test-request-id-3", Item: empty, } - err = store.WritePendingQueue([]*vault.StoredPendingQueueItem{item, item2, item3}) + err = store.WritePendingQueue(t.Context(), []*vault.StoredPendingQueueItem{item, item2, item3}) require.NoError(t, err) // Ensure index is correctly written. @@ -427,7 +454,7 @@ func TestKVStore_WritePendingRequests(t *testing.T) { assert.Equal(t, "test-request-id-3", item2.Id) // Writing a shorter list deletes the old one. - err = store.WritePendingQueue([]*vault.StoredPendingQueueItem{item, item2}) + err = store.WritePendingQueue(t.Context(), []*vault.StoredPendingQueueItem{item, item2}) require.NoError(t, err) _, exists = kv.m[pendingQueueItemPrefix+"3"] diff --git a/core/services/ocr2/plugins/vault/metrics.go b/core/services/ocr2/plugins/vault/metrics.go index 5e4f68e73b6..f7a083499dc 100644 --- a/core/services/ocr2/plugins/vault/metrics.go +++ b/core/services/ocr2/plugins/vault/metrics.go @@ -13,7 +13,8 @@ import ( type pluginMetrics struct { configDigest string - queueOverflow metric.Int64Counter + queueOverflow metric.Int64Counter + kvOperationDuration metric.Int64Histogram } func newPluginMetrics(configDigest string) (*pluginMetrics, error) { @@ -22,13 +23,35 @@ func newPluginMetrics(configDigest string) (*pluginMetrics, error) { return nil, fmt.Errorf("failed to create queue overflow counter: %w", err) } + kvOperationDuration, err := beholder.GetMeter().Int64Histogram( + "platform_vault_plugin_kv_operation_duration_ms", + metric.WithUnit("ms"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create kv operation duration histogram: %w", err) + } + return &pluginMetrics{ - configDigest: configDigest, - queueOverflow: queueOverflow, + configDigest: configDigest, + queueOverflow: queueOverflow, + kvOperationDuration: kvOperationDuration, }, nil } +func (m *pluginMetrics) trackKVOperation(ctx context.Context, method string, durationMs int64) { + if m == nil { + return + } + m.kvOperationDuration.Record(ctx, durationMs, metric.WithAttributes( + attribute.String("configDigest", m.configDigest), + attribute.String("method", method), + )) +} + func (m *pluginMetrics) trackQueueOverflow(ctx context.Context, queueSize int, batchSize int) { + if m == nil { + return + } m.queueOverflow.Add(ctx, 1, metric.WithAttributes( attribute.String("configDigest", m.configDigest), attribute.Int("queueSize", queueSize), diff --git a/core/services/ocr2/plugins/vault/plugin.go b/core/services/ocr2/plugins/vault/plugin.go index 0bff2ef57ea..f6360eadeaa 100644 --- a/core/services/ocr2/plugins/vault/plugin.go +++ b/core/services/ocr2/plugins/vault/plugin.go @@ -12,9 +12,11 @@ import ( "regexp" "slices" "sort" + "time" "golang.org/x/crypto/curve25519" "golang.org/x/crypto/nacl/box" + "golang.org/x/sync/errgroup" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" @@ -263,11 +265,7 @@ func (r *ReportingPluginFactory) NewReportingPlugin(ctx context.Context, config return nil, ocr3_1types.ReportingPluginInfo1{}, fmt.Errorf("could not create max secrets per owner limiter: %w", err) } - batchSize := cresettings.Default.VaultPluginBatchSizeLimit - if configProto.BatchSize != 0 { - batchSize.DefaultValue = int(configProto.BatchSize) - } - cfg.MaxBatchSize, err = limits.MakeUpperBoundLimiter(r.limitsFactory, batchSize) + cfg.MaxBatchSize, err = limits.MakeUpperBoundLimiter(r.limitsFactory, cresettings.Default.VaultPluginBatchSizeLimit) if err != nil { return nil, ocr3_1types.ReportingPluginInfo1{}, fmt.Errorf("could not create max batch size limiter: %w", err) } @@ -357,9 +355,14 @@ func generateRandomNonce() ([]byte, error) { } func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq types.AttributedQuery, keyValueReader ocr3_1types.KeyValueStateReader, blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher) (types.Observation, error) { - readStore := NewReadStore(keyValueReader) + start := time.Now() + defer func() { + r.lggr.Debugw("observation finished", "seqNr", seqNr, "elapsed", time.Since(start)) + }() + + readStore := NewReadStore(keyValueReader, r.metrics) - batch, err := readStore.GetPendingQueue() + batch, err := readStore.GetPendingQueue(ctx) if err != nil { return nil, fmt.Errorf("could not fetch batch of requests: %w", err) } @@ -431,7 +434,7 @@ func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq type // Next, get the current pending queue. We'll use this to dedupe // requests when generating an observation for the next state of the // pending queue. - pendingQueue, ierr := readStore.GetPendingQueue() + pendingQueue, ierr := readStore.GetPendingQueue(ctx) if ierr != nil { return nil, ierr } @@ -441,7 +444,8 @@ func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq type pendingQueueHasID[item.Id] = true } - observedLocalQueue := make([][]byte, 0, len(localQueueItems)) + blobPayloads := make([][]byte, 0, len(localQueueItems)) + maxObservedLocalQueueItems := 0 for _, item := range localQueueItems { // The item is already in the pending queue. We'll be processing it // this round. Let's skip it for now so we don't process duplicates. @@ -464,32 +468,53 @@ func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq type return nil, fmt.Errorf("could not marshal pending queue item: %w", ierr2) } - blobHandle, ierr2 := blobBroadcastFetcher.BroadcastBlob(ctx, itemb, ocr3_1types.BlobExpirationHintSequenceNumber{SeqNr: seqNr + 2}) - if ierr2 != nil { - return nil, fmt.Errorf("could not broadcast pending queue item as blob: %w", ierr2) - } - - blobHandleBytes, ierr2 := r.marshalBlob(blobHandle) - if ierr2 != nil { - return nil, fmt.Errorf("could not marshal blob handle to bytes: %w", ierr2) + if maxObservedLocalQueueItems == 0 { + l, ierr2 := r.cfg.MaxBatchSize.Limit(ctx) + if ierr2 != nil { + return nil, fmt.Errorf("could not fetch max batch size limit: %w", ierr2) + } + maxObservedLocalQueueItems = 2 * l } - observedLocalQueue = append(observedLocalQueue, blobHandleBytes) - - l, ierr2 := r.cfg.MaxBatchSize.Limit(ctx) - if ierr2 != nil { - return nil, fmt.Errorf("could not fetch max batch size limit: %w", ierr2) - } + blobPayloads = append(blobPayloads, itemb) - if len(observedLocalQueue) > 2*l { + if len(blobPayloads) >= maxObservedLocalQueueItems { r.lggr.Warnw("Observed local queue exceeds batch size limit, truncating", - "queueSize", len(observedLocalQueue), - "batchSizeLimit", 2*l) - r.metrics.trackQueueOverflow(ctx, len(observedLocalQueue), 2*l) + "queueSize", len(blobPayloads), + "batchSizeLimit", maxObservedLocalQueueItems) + r.metrics.trackQueueOverflow(ctx, len(blobPayloads), maxObservedLocalQueueItems) break } } + observedLocalQueue := make([][]byte, len(blobPayloads)) + // Broadcast pending-queue blobs in parallel to reduce Observation() latency. + // Shortening this phase helps the OCR round finish within DeltaProgress. + blobBroadcastStart := time.Now() + defer func() { + r.lggr.Debugw("observation blob broadcast finished", "seqNr", seqNr, "blobCount", len(blobPayloads), "elapsed", time.Since(blobBroadcastStart)) + }() + g, broadcastCtx := errgroup.WithContext(ctx) + for i, payload := range blobPayloads { + g.Go(func() error { + blobHandle, ierr2 := blobBroadcastFetcher.BroadcastBlob(broadcastCtx, payload, ocr3_1types.BlobExpirationHintSequenceNumber{SeqNr: seqNr + 2}) + if ierr2 != nil { + return fmt.Errorf("could not broadcast pending queue item as blob: %w", ierr2) + } + + blobHandleBytes, ierr2 := r.marshalBlob(blobHandle) + if ierr2 != nil { + return fmt.Errorf("could not marshal blob handle to bytes: %w", ierr2) + } + + observedLocalQueue[i] = blobHandleBytes + return nil + }) + } + if err = g.Wait(); err != nil { + return nil, err + } + obspb.PendingQueueItems = observedLocalQueue // Second, generate a random nonce that we'll use to sort the observations. @@ -602,7 +627,7 @@ func (r *ReportingPlugin) observeGetSecretsRequest(ctx context.Context, reader R return nil, err } - secret, err := reader.GetSecret(id) + secret, err := reader.GetSecret(ctx, id) if err != nil { return nil, fmt.Errorf("failed to read secret from key-value store: %w", err) } @@ -816,7 +841,7 @@ func (r *ReportingPlugin) processListSecretIdentifiersRequest(ctx context.Contex return nil, errors.New("invalid request: owner cannot be empty") } - md, err := reader.GetMetadata(req.Owner) + md, err := reader.GetMetadata(ctx, req.Owner) if err != nil { return nil, fmt.Errorf("failed to get metadata for owner: %w", err) } @@ -917,7 +942,7 @@ func (r *ReportingPlugin) observeDeleteSecretRequest(ctx context.Context, reader return id, newUserError("duplicate request for secret identifier " + vaulttypes.KeyFor(id)) } - ss, err := reader.GetSecret(id) + ss, err := reader.GetSecret(ctx, id) if err != nil { return id, fmt.Errorf("failed to read secret from key-value store: %w", err) } @@ -1027,8 +1052,8 @@ func (r *ReportingPlugin) ValidateObservation(ctx context.Context, seqNr uint64, // This is because honest nodes will all be reading from // the same deterministic key-value store-based queue. // - that all pending queue items can be fetched as blobs. - store := NewReadStore(keyValueReader) - pendingQueueItems, err := store.GetPendingQueue() + store := NewReadStore(keyValueReader, r.metrics) + pendingQueueItems, err := store.GetPendingQueue(ctx) if err != nil { return fmt.Errorf("could not fetch pending queue from store: %w", err) } @@ -1380,7 +1405,7 @@ func (r *ReportingPlugin) validateListSecretIdentifiersObservation(ctx context.C } func (r *ReportingPlugin) StateTransition(ctx context.Context, seqNr uint64, aq types.AttributedQuery, aos []types.AttributedObservation, keyValueReadWriter ocr3_1types.KeyValueStateReadWriter, blobFetcher ocr3_1types.BlobFetcher) (ocr3_1types.ReportsPlusPrecursor, error) { - store := NewWriteStore(keyValueReadWriter) + store := NewWriteStore(keyValueReadWriter, r.metrics) marshalledObs := map[uint8]*vaultcommon.Observations{} for _, ao := range aos { @@ -1624,7 +1649,7 @@ func (r *ReportingPlugin) stateTransitionPendingQueue(ctx context.Context, store keptItems = keptItems[:errBoundLimited.Limit] } - return store.WritePendingQueue(keptItems) + return store.WritePendingQueue(ctx, keptItems) } func sortKey(id string, nonce []byte) []byte { @@ -1816,7 +1841,7 @@ func (r *ReportingPlugin) stateTransitionCreateSecretsRequest(ctx context.Contex return nil, newUserError("could not decode secret value: invalid hex" + err.Error()) } - secret, err := store.GetSecret(req.Id) + secret, err := store.GetSecret(ctx, req.Id) if err != nil { return nil, fmt.Errorf("failed to read secret from key-value store: %w", err) } @@ -1825,7 +1850,7 @@ func (r *ReportingPlugin) stateTransitionCreateSecretsRequest(ctx context.Contex return nil, newUserError("could not write to key value store: key already exists") } - count, err := store.GetSecretIdentifiersCountForOwner(req.Id.Owner) + count, err := store.GetSecretIdentifiersCountForOwner(ctx, req.Id.Owner) if err != nil { return nil, fmt.Errorf("failed to read secret identifiers count for owner: %w", err) } @@ -1839,7 +1864,7 @@ func (r *ReportingPlugin) stateTransitionCreateSecretsRequest(ctx context.Contex return nil, fmt.Errorf("failed to check max secrets per owner limit: %w", ierr) } - err = store.WriteSecret(req.Id, &vaultcommon.StoredSecret{ + err = store.WriteSecret(ctx, req.Id, &vaultcommon.StoredSecret{ EncryptedSecret: encryptedSecret, }) if err != nil { @@ -1935,7 +1960,7 @@ func (r *ReportingPlugin) stateTransitionUpdateSecretsRequest(ctx context.Contex return nil, newUserError("could not decode secret value: invalid hex" + err.Error()) } - secret, err := store.GetSecret(req.Id) + secret, err := store.GetSecret(ctx, req.Id) if err != nil { return nil, fmt.Errorf("failed to read secret from key-value store: %w", err) } @@ -1944,7 +1969,7 @@ func (r *ReportingPlugin) stateTransitionUpdateSecretsRequest(ctx context.Contex return nil, newUserError("could not write update to key value store: key does not exist") } - err = store.WriteSecret(req.Id, &vaultcommon.StoredSecret{ + err = store.WriteSecret(ctx, req.Id, &vaultcommon.StoredSecret{ EncryptedSecret: encryptedSecret, }) if err != nil { @@ -2035,7 +2060,7 @@ func (r *ReportingPlugin) stateTransitionDeleteSecretsRequest(ctx context.Contex return resp, newUserError(resp.GetError()) } - err := store.DeleteSecret(id) + err := store.DeleteSecret(ctx, id) if err != nil { return nil, fmt.Errorf("failed to delete secret from key value store: %w", err) } diff --git a/core/services/ocr2/plugins/vault/plugin_test.go b/core/services/ocr2/plugins/vault/plugin_test.go index 390462fb0f9..33db905829d 100644 --- a/core/services/ocr2/plugins/vault/plugin_test.go +++ b/core/services/ocr2/plugins/vault/plugin_test.go @@ -1,12 +1,17 @@ package vault import ( + "context" "crypto/rand" "encoding/base64" "encoding/hex" + "errors" "fmt" "strings" + "sync" + "sync/atomic" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/smartcontractkit/libocr/commontypes" @@ -119,8 +124,8 @@ func TestPlugin_ReportingPluginFactory_UsesDefaultsIfNotProvidedInOffchainConfig assert.Equal(t, int(cresettings.Default.VaultMaxPerOracleUnexpiredBlobCumulativePayloadSizeLimit.DefaultValue), infoObject.Limits.MaxPerOracleUnexpiredBlobCumulativePayloadBytes) assert.Equal(t, cresettings.Default.VaultMaxPerOracleUnexpiredBlobCount.DefaultValue, infoObject.Limits.MaxPerOracleUnexpiredBlobCount) - // Verify that configProto overrides apply to BatchSize and MaxSecretsPerOwner, - // while other fields remain at cresettings defaults. + // Verify that configProto overrides apply to MaxSecretsPerOwner, + // while MaxBatchSize and other fields remain at cresettings defaults. cfg = vaultcommon.ReportingPluginConfig{ BatchSize: 2, MaxSecretsPerOwner: 2, @@ -144,7 +149,7 @@ func TestPlugin_ReportingPluginFactory_UsesDefaultsIfNotProvidedInOffchainConfig require.NoError(t, err) typedRP = rp.(*ReportingPlugin) - assertLimit(t, 2, typedRP.cfg.MaxBatchSize) + assertLimit(t, cresettings.Default.VaultPluginBatchSizeLimit.DefaultValue, typedRP.cfg.MaxBatchSize) assertLimit(t, 2, typedRP.cfg.MaxSecretsPerOwner) assertLimit(t, 2000, typedRP.cfg.MaxCiphertextLengthBytes) assertLimit(t, 64, typedRP.cfg.MaxIdentifierOwnerLengthBytes) @@ -290,8 +295,9 @@ func TestPlugin_Observation_NothingInBatch(t *testing.T) { lggr := logger.TestLogger(t) store := requests.NewStore[*vaulttypes.Request]() r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -324,8 +330,9 @@ func TestPlugin_Observation_PendingQueueEnabled_EmptyPendingQueue(t *testing.T) lggr := logger.TestLogger(t) store := requests.NewStore[*vaulttypes.Request]() r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -389,8 +396,10 @@ func TestPlugin_Observation_PendingQueueEnabled_EmptyPendingQueue(t *testing.T) // We expect the pending queue observation to contain the request in the local queue. assert.Len(t, obs.PendingQueueItems, 2) - assertPendingQueueItemsEqual(t, expectedID, bf.blobs[0], p) - assertPendingQueueItemsEqual(t, expectedID2, bf.blobs[1], p) + assertPendingQueueItemsContain(t, bf.blobs, map[string]proto.Message{ + expectedID: p, + expectedID2: p, + }) assert.NotEmpty(t, obs.SortNonce) } @@ -399,8 +408,9 @@ func TestPlugin_Observation_PendingQueueEnabled_WithPendingQueueProvided(t *test lggr := logger.TestLogger(t) store := requests.NewStore[*vaulttypes.Request]() r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -455,7 +465,7 @@ func TestPlugin_Observation_PendingQueueEnabled_WithPendingQueueProvided(t *test } anyd, err := anypb.New(d) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-3", Item: anyd}, }, @@ -484,8 +494,10 @@ func TestPlugin_Observation_PendingQueueEnabled_WithPendingQueueProvided(t *test // We expect the pending queue observation to contain the request in the local queue. assert.Len(t, obs.PendingQueueItems, 2) - assertPendingQueueItemsEqual(t, expectedID, bf.blobs[0], p) - assertPendingQueueItemsEqual(t, expectedID2, bf.blobs[1], p) + assertPendingQueueItemsContain(t, bf.blobs, map[string]proto.Message{ + expectedID: p, + expectedID2: p, + }) assert.NotEmpty(t, obs.SortNonce) } @@ -494,8 +506,9 @@ func TestPlugin_Observation_PendingQueueEnabled_ItemBothInPendingQueueAndLocalQu lggr := logger.TestLogger(t) store := requests.NewStore[*vaulttypes.Request]() r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -547,7 +560,7 @@ func TestPlugin_Observation_PendingQueueEnabled_ItemBothInPendingQueueAndLocalQu anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-2", Item: anyp}, }, @@ -582,6 +595,7 @@ func TestPlugin_Observation_PendingQueueEnabled_ItemBothInPendingQueueAndLocalQu } func assertPendingQueueItemsEqual(t *testing.T, expectedID string, got []byte, expectedPayload proto.Message) { + t.Helper() gotMsg := &vaultcommon.StoredPendingQueueItem{} err := proto.Unmarshal(got, gotMsg) require.NoError(t, err) @@ -593,6 +607,206 @@ func assertPendingQueueItemsEqual(t *testing.T, expectedID string, got []byte, e assert.True(t, proto.Equal(expectedPayload, gotm)) } +func assertPendingQueueItemsContain(t *testing.T, gotItems [][]byte, expected map[string]proto.Message) { + t.Helper() + + require.Len(t, gotItems, len(expected)) + + remaining := make(map[string]proto.Message, len(expected)) + for id, payload := range expected { + remaining[id] = payload + } + + for _, got := range gotItems { + gotMsg := &vaultcommon.StoredPendingQueueItem{} + err := proto.Unmarshal(got, gotMsg) + require.NoError(t, err) + + expectedPayload, ok := remaining[gotMsg.Id] + require.True(t, ok, "unexpected pending queue item id %q", gotMsg.Id) + + gotPayload, err := gotMsg.Item.UnmarshalNew() + require.NoError(t, err) + assert.True(t, proto.Equal(expectedPayload, gotPayload)) + + delete(remaining, gotMsg.Id) + } + + assert.Empty(t, remaining) +} + +type blockingBlobBroadcastFetcher struct { + targetStarts int32 + started atomic.Int32 + maxInFlight atomic.Int32 + inFlight atomic.Int32 + allStarted chan struct{} + release chan struct{} + once sync.Once +} + +func (b *blockingBlobBroadcastFetcher) BroadcastBlob(ctx context.Context, _ []byte, _ ocr3_1types.BlobExpirationHint) (ocr3_1types.BlobHandle, error) { + currentInFlight := b.inFlight.Add(1) + defer b.inFlight.Add(-1) + + for { + maxInFlight := b.maxInFlight.Load() + if currentInFlight <= maxInFlight || b.maxInFlight.CompareAndSwap(maxInFlight, currentInFlight) { + break + } + } + + if b.started.Add(1) == b.targetStarts { + b.once.Do(func() { close(b.allStarted) }) + } + + select { + case <-b.release: + return ocr3_1types.BlobHandle{}, nil + case <-ctx.Done(): + return ocr3_1types.BlobHandle{}, ctx.Err() + } +} + +func (b *blockingBlobBroadcastFetcher) FetchBlob(context.Context, ocr3_1types.BlobHandle) ([]byte, error) { + panic("FetchBlob should not be called in Observation tests") +} + +type errorBlobBroadcastFetcher struct { + err error +} + +func (e *errorBlobBroadcastFetcher) BroadcastBlob(context.Context, []byte, ocr3_1types.BlobExpirationHint) (ocr3_1types.BlobHandle, error) { + return ocr3_1types.BlobHandle{}, e.err +} + +func (e *errorBlobBroadcastFetcher) FetchBlob(context.Context, ocr3_1types.BlobHandle) ([]byte, error) { + panic("FetchBlob should not be called in Observation tests") +} + +func TestPlugin_Observation_PendingQueueEnabled_BroadcastsPendingQueueBlobsInParallel(t *testing.T) { + lggr := logger.TestLogger(t) + store := requests.NewStore[*vaulttypes.Request]() + r := &ReportingPlugin{ + lggr: lggr, + store: store, + cfg: makeReportingPluginConfig( + t, + 10, + nil, + nil, + 1, + 1024, + 100, + 100, + 100, + 10, + ), + marshalBlob: mockMarshalBlob, + unmarshalBlob: mockUnmarshalBlob, + } + + id := &vaultcommon.SecretIdentifier{ + Owner: "owner", + Namespace: "", + Key: "my_secret", + } + + pubK, _, err := box.GenerateKey(rand.Reader) + require.NoError(t, err) + pks := hex.EncodeToString(pubK[:]) + + p := &vaultcommon.GetSecretsRequest{ + Requests: []*vaultcommon.SecretRequest{ + { + Id: id, + EncryptionKeys: []string{pks}, + }, + }, + } + + require.NoError(t, store.Add(&vaulttypes.Request{Payload: p, IDVal: "request-1"})) + require.NoError(t, store.Add(&vaulttypes.Request{Payload: p, IDVal: "request-2"})) + + rdr := &kv{m: make(map[string]response)} + bf := &blockingBlobBroadcastFetcher{ + targetStarts: 2, + allStarted: make(chan struct{}), + release: make(chan struct{}), + } + + errCh := make(chan error, 1) + go func() { + _, err := r.Observation(t.Context(), 1, types.AttributedQuery{}, rdr, bf) + errCh <- err + }() + + select { + case <-bf.allStarted: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for concurrent blob broadcasts") + } + + close(bf.release) + + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for Observation to finish") + } + + assert.Equal(t, int32(2), bf.maxInFlight.Load()) +} + +func TestPlugin_Observation_PendingQueueEnabled_BroadcastBlobError(t *testing.T) { + lggr := logger.TestLogger(t) + store := requests.NewStore[*vaulttypes.Request]() + r := &ReportingPlugin{ + lggr: lggr, + store: store, + cfg: makeReportingPluginConfig( + t, + 10, + nil, + nil, + 1, + 1024, + 100, + 100, + 100, + 10, + ), + marshalBlob: mockMarshalBlob, + unmarshalBlob: mockUnmarshalBlob, + } + + id := &vaultcommon.SecretIdentifier{ + Owner: "owner", + Namespace: "", + Key: "my_secret", + } + + pubK, _, err := box.GenerateKey(rand.Reader) + require.NoError(t, err) + pks := hex.EncodeToString(pubK[:]) + + p := &vaultcommon.GetSecretsRequest{ + Requests: []*vaultcommon.SecretRequest{ + { + Id: id, + EncryptionKeys: []string{pks}, + }, + }, + } + + require.NoError(t, store.Add(&vaulttypes.Request{Payload: p, IDVal: "request-1"})) + rdr := &kv{m: make(map[string]response)} + + _, err = r.Observation(t.Context(), 1, types.AttributedQuery{}, rdr, &errorBlobBroadcastFetcher{err: errors.New("boom")}) + require.ErrorContains(t, err, "could not broadcast pending queue item as blob: boom") +} + func TestPlugin_Observation_GetSecretsRequest_SecretIdentifierInvalid(t *testing.T) { tcs := []struct { name string @@ -638,8 +852,9 @@ func TestPlugin_Observation_GetSecretsRequest_SecretIdentifierInvalid(t *testing maxIDLen = tc.maxIDLen } r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -669,7 +884,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretIdentifierInvalid(t *testing } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -705,8 +920,9 @@ func TestPlugin_Observation_GetSecretsRequest_FillsInNamespace(t *testing.T) { _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -742,7 +958,7 @@ func TestPlugin_Observation_GetSecretsRequest_FillsInNamespace(t *testing.T) { Namespace: "main", Key: "my_secret", } - err = NewWriteStore(rdr).WriteSecret(createdID, &vaultcommon.StoredSecret{ + err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), createdID, &vaultcommon.StoredSecret{ EncryptedSecret: ciphertextBytes, }) require.NoError(t, err) @@ -762,7 +978,7 @@ func TestPlugin_Observation_GetSecretsRequest_FillsInNamespace(t *testing.T) { } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -794,8 +1010,9 @@ func TestPlugin_Observation_GetSecretsRequest_SecretDoesNotExist(t *testing.T) { lggr := logger.TestLogger(t) store := requests.NewStore[*vaulttypes.Request]() r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -831,7 +1048,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretDoesNotExist(t *testing.T) { } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -868,6 +1085,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretExistsButIsIncorrect(t *test r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -893,7 +1111,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretExistsButIsIncorrect(t *test m: make(map[string]response), } - err = NewWriteStore(rdr).WriteSecret(id, &vaultcommon.StoredSecret{ + err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), id, &vaultcommon.StoredSecret{ EncryptedSecret: []byte("invalid-ciphertext"), }) require.NoError(t, err) @@ -908,7 +1126,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretExistsButIsIncorrect(t *test } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -954,6 +1172,7 @@ func TestPlugin_Observation_GetSecretsRequest_PublicKeyIsInvalid(t *testing.T) { r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -985,7 +1204,7 @@ func TestPlugin_Observation_GetSecretsRequest_PublicKeyIsInvalid(t *testing.T) { ciphertextBytes, err := ciphertext.Marshal() require.NoError(t, err) - err = NewWriteStore(rdr).WriteSecret(id, &vaultcommon.StoredSecret{ + err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), id, &vaultcommon.StoredSecret{ EncryptedSecret: ciphertextBytes, }) require.NoError(t, err) @@ -1000,7 +1219,7 @@ func TestPlugin_Observation_GetSecretsRequest_PublicKeyIsInvalid(t *testing.T) { } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1038,6 +1257,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretLabelIsInvalid(t *testing.T) r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -1072,7 +1292,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretLabelIsInvalid(t *testing.T) ciphertextBytes, err := ciphertext.Marshal() require.NoError(t, err) - err = NewWriteStore(rdr).WriteSecret(id, &vaultcommon.StoredSecret{ + err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), id, &vaultcommon.StoredSecret{ EncryptedSecret: ciphertextBytes, }) require.NoError(t, err) @@ -1092,7 +1312,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretLabelIsInvalid(t *testing.T) } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1130,6 +1350,7 @@ func TestPlugin_Observation_GetSecretsRequest_Success(t *testing.T) { r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -1165,7 +1386,7 @@ func TestPlugin_Observation_GetSecretsRequest_Success(t *testing.T) { ciphertextBytes, err := ciphertext.Marshal() require.NoError(t, err) - err = NewWriteStore(rdr).WriteSecret(id, &vaultcommon.StoredSecret{ + err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), id, &vaultcommon.StoredSecret{ EncryptedSecret: ciphertextBytes, }) require.NoError(t, err) @@ -1185,7 +1406,7 @@ func TestPlugin_Observation_GetSecretsRequest_Success(t *testing.T) { } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1287,6 +1508,7 @@ func TestPlugin_Observation_CreateSecretsRequest_SecretIdentifierInvalid(t *test r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -1317,7 +1539,7 @@ func TestPlugin_Observation_CreateSecretsRequest_SecretIdentifierInvalid(t *test } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1353,6 +1575,7 @@ func TestPlugin_Observation_CreateSecretsRequest_DisallowsDuplicateRequests(t *t r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -1392,7 +1615,7 @@ func TestPlugin_Observation_CreateSecretsRequest_DisallowsDuplicateRequests(t *t } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1430,8 +1653,9 @@ func TestPlugin_StateTransition_CreateSecretsRequest_CorrectlyTracksLimits(t *te _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -1545,6 +1769,7 @@ func TestPlugin_Observation_CreateSecretsRequest_InvalidCiphertext(t *testing.T) r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -1581,7 +1806,7 @@ func TestPlugin_Observation_CreateSecretsRequest_InvalidCiphertext(t *testing.T) } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1615,6 +1840,7 @@ func TestPlugin_Observation_CreateSecretsRequest_InvalidCiphertext_TooLong(t *te r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -1652,7 +1878,7 @@ func TestPlugin_Observation_CreateSecretsRequest_InvalidCiphertext_TooLong(t *te } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1692,6 +1918,7 @@ func TestPlugin_Observation_CreateSecretsRequest_InvalidCiphertext_EncryptedWith r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -1734,7 +1961,7 @@ func TestPlugin_Observation_CreateSecretsRequest_InvalidCiphertext_EncryptedWith } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1770,6 +1997,7 @@ func TestPlugin_Observation_CreateSecretsRequest_SecretLabelIsInvalid(t *testing r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -1819,7 +2047,7 @@ func TestPlugin_Observation_CreateSecretsRequest_SecretLabelIsInvalid(t *testing } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1851,6 +2079,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_SecretLabelIsInvalid(t *testing r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -1900,7 +2129,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_SecretLabelIsInvalid(t *testing } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1930,8 +2159,9 @@ func TestPlugin_StateTransition_CreateSecretsRequest_TooManySecretsForOwner(t *t _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -1955,8 +2185,8 @@ func TestPlugin_StateTransition_CreateSecretsRequest_TooManySecretsForOwner(t *t Namespace: "main", Key: "secret", } - kvstore := NewWriteStore(rdr) - err = kvstore.WriteMetadata(id.Owner, &vaultcommon.StoredMetadata{ + kvstore := newTestWriteStore(t, rdr) + err = kvstore.WriteMetadata(t.Context(), id.Owner, &vaultcommon.StoredMetadata{ SecretIdentifiers: []*vaultcommon.SecretIdentifier{ { Owner: "owner", @@ -2021,8 +2251,9 @@ func TestPlugin_StateTransition_CreateSecretsRequest_SecretExistsForKey(t *testi _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -2046,8 +2277,8 @@ func TestPlugin_StateTransition_CreateSecretsRequest_SecretExistsForKey(t *testi Namespace: "main", Key: "secret", } - kvstore := NewWriteStore(rdr) - err = kvstore.WriteSecret(id, &vaultcommon.StoredSecret{ + kvstore := newTestWriteStore(t, rdr) + err = kvstore.WriteSecret(t.Context(), id, &vaultcommon.StoredSecret{ EncryptedSecret: []byte("some-ciphertext"), }) require.NoError(t, err) @@ -2108,6 +2339,7 @@ func TestPlugin_Observation_CreateSecretsRequest_Success(t *testing.T) { r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -2149,7 +2381,7 @@ func TestPlugin_Observation_CreateSecretsRequest_Success(t *testing.T) { } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -2331,7 +2563,8 @@ func TestPlugin_StateTransition_InsufficientObservations(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -2407,7 +2640,8 @@ func TestPlugin_StateTransition_GetSecretsRequest_ResponseSizeWithinLimit(t *tes N: 10, F: 3, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -2479,7 +2713,8 @@ func TestPlugin_ValidateObservations_InvalidObservations(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -2568,7 +2803,8 @@ func TestPlugin_ValidateObservations_IncludesAllItemsInPendingQueue(t *testing.T N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -2613,7 +2849,7 @@ func TestPlugin_ValidateObservations_IncludesAllItemsInPendingQueue(t *testing.T } anyg, err := anypb.New(g) require.NoError(t, err) - err = NewWriteStore(kv).WritePendingQueue( + err = newTestWriteStore(t, kv).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: vaulttypes.KeyFor(id), Item: anyd}, {Id: vaulttypes.KeyFor(id2), Item: anyg}, @@ -2671,7 +2907,8 @@ func TestPlugin_ValidateObservations_DisallowsDuplicateBlobHandles(t *testing.T) N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -2730,7 +2967,8 @@ func TestPlugin_StateTransition_ShasDontMatch(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -2813,7 +3051,8 @@ func TestPlugin_StateTransition_AggregatesValidationErrors(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -2892,7 +3131,8 @@ func TestPlugin_StateTransition_GetSecretsRequest_CombinesShares(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -3036,7 +3276,8 @@ func TestPlugin_StateTransition_CreateSecretsRequest_WritesSecrets(t *testing.T) N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -3055,7 +3296,7 @@ func TestPlugin_StateTransition_CreateSecretsRequest_WritesSecrets(t *testing.T) kv := &kv{ m: make(map[string]response), } - rs := NewReadStore(kv) + rs := newTestReadStore(t, kv) id := &vaultcommon.SecretIdentifier{ Owner: "owner", @@ -3114,7 +3355,7 @@ func TestPlugin_StateTransition_CreateSecretsRequest_WritesSecrets(t *testing.T) } assert.True(t, proto.Equal(expectedResp, o.GetCreateSecretsResponse()), o.GetCreateSecretsResponse()) - ss, err := rs.GetSecret(id) + ss, err := rs.GetSecret(t.Context(), id) require.NoError(t, err) assert.Equal(t, ss.EncryptedSecret, []byte("encrypted-value")) @@ -3207,7 +3448,8 @@ func TestPlugin_Reports(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -3303,6 +3545,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_SecretIdentifierInvalid(t *test r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -3333,7 +3576,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_SecretIdentifierInvalid(t *test } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -3369,6 +3612,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_DisallowsDuplicateRequests(t *t r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -3408,7 +3652,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_DisallowsDuplicateRequests(t *t } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -3446,6 +3690,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_InvalidCiphertext(t *testing.T) r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -3482,7 +3727,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_InvalidCiphertext(t *testing.T) } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -3516,6 +3761,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_InvalidCiphertext_TooLong(t *te r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -3553,7 +3799,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_InvalidCiphertext_TooLong(t *te } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -3593,6 +3839,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_InvalidCiphertext_EncryptedWith r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -3635,7 +3882,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_InvalidCiphertext_EncryptedWith } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -3674,7 +3921,8 @@ func TestPlugin_StateTransition_UpdateSecretsRequest_SecretDoesntExist(t *testin N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -3693,7 +3941,7 @@ func TestPlugin_StateTransition_UpdateSecretsRequest_SecretDoesntExist(t *testin kv := &kv{ m: make(map[string]response), } - rs := NewReadStore(kv) + rs := newTestReadStore(t, kv) id := &vaultcommon.SecretIdentifier{ Owner: "owner", @@ -3752,7 +4000,7 @@ func TestPlugin_StateTransition_UpdateSecretsRequest_SecretDoesntExist(t *testin } assert.True(t, proto.Equal(expectedResp, o.GetUpdateSecretsResponse()), o.GetUpdateSecretsResponse()) - ss, err := rs.GetSecret(id) + ss, err := rs.GetSecret(t.Context(), id) require.NoError(t, err) require.Nil(t, ss) @@ -3770,7 +4018,8 @@ func TestPlugin_StateTransition_UpdateSecretsRequest_WritesSecrets(t *testing.T) N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -3808,7 +4057,7 @@ func TestPlugin_StateTransition_UpdateSecretsRequest_WritesSecrets(t *testing.T) }, }, } - rs := NewReadStore(kv) + rs := newTestReadStore(t, kv) value := []byte("encrypted-value") enc := hex.EncodeToString(value) @@ -3863,7 +4112,7 @@ func TestPlugin_StateTransition_UpdateSecretsRequest_WritesSecrets(t *testing.T) } assert.True(t, proto.Equal(expectedResp, o.GetUpdateSecretsResponse()), o.GetUpdateSecretsResponse()) - ss, err := rs.GetSecret(id) + ss, err := rs.GetSecret(t.Context(), id) require.NoError(t, err) require.NotNil(t, ss) @@ -3926,7 +4175,8 @@ func TestPlugin_Reports_UpdateSecretsRequest(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -3967,6 +4217,7 @@ func TestPlugin_Observation_DeleteSecrets(t *testing.T) { r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -4021,7 +4272,7 @@ func TestPlugin_Observation_DeleteSecrets(t *testing.T) { } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -4053,6 +4304,7 @@ func TestPlugin_Observation_DeleteSecrets_IdDoesntExist(t *testing.T) { r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -4086,7 +4338,7 @@ func TestPlugin_Observation_DeleteSecrets_IdDoesntExist(t *testing.T) { } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -4118,6 +4370,7 @@ func TestPlugin_Observation_DeleteSecrets_InvalidRequestDuplicateIds(t *testing. r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -4152,7 +4405,7 @@ func TestPlugin_Observation_DeleteSecrets_InvalidRequestDuplicateIds(t *testing. } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -4193,7 +4446,8 @@ func TestPlugin_StateTransition_DeleteSecretsRequest(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -4238,7 +4492,7 @@ func TestPlugin_StateTransition_DeleteSecretsRequest(t *testing.T) { }, }, } - rs := NewReadStore(rdr) + rs := newTestReadStore(t, rdr) req := &vaultcommon.DeleteSecretsRequest{ RequestId: "request-id", @@ -4285,7 +4539,7 @@ func TestPlugin_StateTransition_DeleteSecretsRequest(t *testing.T) { } assert.True(t, proto.Equal(expectedResp, o.GetDeleteSecretsResponse())) - ss, err = rs.GetSecret(id) + ss, err = rs.GetSecret(t.Context(), id) require.NoError(t, err) require.Nil(t, ss) @@ -4303,7 +4557,8 @@ func TestPlugin_StateTransition_DeleteSecretsRequest_SecretDoesNotExist(t *testi N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -4337,7 +4592,7 @@ func TestPlugin_StateTransition_DeleteSecretsRequest_SecretDoesNotExist(t *testi }, }, } - rs := NewReadStore(rdr) + rs := newTestReadStore(t, rdr) req := &vaultcommon.DeleteSecretsRequest{ RequestId: "request-id", @@ -4384,7 +4639,7 @@ func TestPlugin_StateTransition_DeleteSecretsRequest_SecretDoesNotExist(t *testi } assert.True(t, proto.Equal(expectedResp, o.GetDeleteSecretsResponse()), o.GetDeleteSecretsResponse()) - ss, err := rs.GetSecret(id) + ss, err := rs.GetSecret(t.Context(), id) require.NoError(t, err) require.Nil(t, ss) @@ -4440,7 +4695,8 @@ func TestPlugin_Reports_DeleteSecretsRequest(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -4481,6 +4737,7 @@ func TestPlugin_Observation_ListSecretIdentifiers_OwnerRequired(t *testing.T) { r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -4507,7 +4764,7 @@ func TestPlugin_Observation_ListSecretIdentifiers_OwnerRequired(t *testing.T) { } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -4538,6 +4795,7 @@ func TestPlugin_Observation_ListSecretIdentifiers_NoNamespaceProvided(t *testing r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -4590,7 +4848,7 @@ func TestPlugin_Observation_ListSecretIdentifiers_NoNamespaceProvided(t *testing } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -4641,6 +4899,7 @@ func TestPlugin_Observation_ListSecretIdentifiers_FilterByNamespace(t *testing.T r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -4694,7 +4953,7 @@ func TestPlugin_Observation_ListSecretIdentifiers_FilterByNamespace(t *testing.T } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -4779,7 +5038,8 @@ func TestPlugin_Reports_ListSecretIdentifiersRequest(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -4825,7 +5085,8 @@ func TestPlugin_StateTransition_ListSecretIdentifiers(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -4844,7 +5105,7 @@ func TestPlugin_StateTransition_ListSecretIdentifiers(t *testing.T) { kv := &kv{ m: make(map[string]response), } - rs := NewReadStore(kv) + rs := newTestReadStore(t, kv) id := &vaultcommon.SecretIdentifier{ Owner: "owner", @@ -4883,7 +5144,7 @@ func TestPlugin_StateTransition_ListSecretIdentifiers(t *testing.T) { assert.True(t, proto.Equal(resp, o.GetListSecretIdentifiersResponse())) - ss, err := rs.GetSecret(id) + ss, err := rs.GetSecret(t.Context(), id) require.NoError(t, err) require.Nil(t, ss) @@ -4911,8 +5172,9 @@ func TestPlugin_StateTransition_StoresPendingQueue(t *testing.T) { _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -5052,7 +5314,7 @@ func TestPlugin_StateTransition_StoresPendingQueue(t *testing.T) { assert.Empty(t, os.Outcomes) - pq, err := NewReadStore(rdr).GetPendingQueue() + pq, err := newTestReadStore(t, rdr).GetPendingQueue(t.Context()) require.NoError(t, err) assert.Len(t, pq, 3) @@ -5070,8 +5332,9 @@ func TestPlugin_StateTransition_StoresPendingQueue_LimitedToBatchSize(t *testing _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -5203,7 +5466,7 @@ func TestPlugin_StateTransition_StoresPendingQueue_LimitedToBatchSize(t *testing assert.Empty(t, os.Outcomes) - pq, err := NewReadStore(rdr).GetPendingQueue() + pq, err := newTestReadStore(t, rdr).GetPendingQueue(t.Context()) require.NoError(t, err) assert.Len(t, pq, 1) @@ -5222,8 +5485,9 @@ func TestPlugin_StateTransition_StoresPendingQueue_DoesntDoubleCountObservations _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -5294,7 +5558,7 @@ func TestPlugin_StateTransition_StoresPendingQueue_DoesntDoubleCountObservations assert.Empty(t, os.Outcomes) - pq, err := NewReadStore(rdr).GetPendingQueue() + pq, err := newTestReadStore(t, rdr).GetPendingQueue(t.Context()) require.NoError(t, err) assert.Empty(t, pq, 0) @@ -5313,8 +5577,9 @@ func TestPlugin_ValidateObservation_RejectsIfMoreThan2xBatchSize(t *testing.T) { _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -5393,14 +5658,96 @@ func TestPlugin_ValidateObservation_RejectsIfMoreThan2xBatchSize(t *testing.T) { require.ErrorContains(t, err, "invalid observation: too many pending queue items provided, have 4, want max 2") } +// TestPlugin_ValidateObservation_AcceptsFullPendingQueueObservation verifies that an observation +// with exactly 2*batchSize pending queue items (the maximum Observation can produce) is accepted. +func TestPlugin_ValidateObservation_AcceptsFullPendingQueueObservation(t *testing.T) { + lggr := logger.TestLogger(t) + store := requests.NewStore[*vaulttypes.Request]() + _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) + require.NoError(t, err) + + batchSize := 1 // MaxBatchSize=1, so 2*batchSize=2 is the intended max pending queue items + r := &ReportingPlugin{ + lggr: lggr, + store: store, + metrics: newTestMetrics(t), + onchainCfg: ocr3types.ReportingPluginConfig{ + N: 4, + F: 1, + }, + cfg: makeReportingPluginConfig( + t, + batchSize, + pk, + shares[0], + 1, + 1024, + 30, + 30, + 30, + 10, + ), + unmarshalBlob: mockUnmarshalBlob, + } + + seqNr := uint64(1) + rdr := &kv{ + m: make(map[string]response), + } + + req1 := &vaultcommon.ListSecretIdentifiersRequest{ + Owner: "owner", + Namespace: "main", + RequestId: "request-id", + } + areq1, err := anypb.New(req1) + require.NoError(t, err) + + // Build an observation with exactly 2*batchSize = 2 pending queue items. + // This is the maximum that Observation() can produce. + numItems := 2 * batchSize + pendingQueueItems := make([][]byte, numItems) + blobs := make([][]byte, numItems) + for i := range numItems { + pendingQueueItems[i] = []byte{} + blobs[i] = protoMarshal(t, &vaultcommon.StoredPendingQueueItem{ + Id: fmt.Sprintf("request-id-%d", i), + Item: areq1, + }) + } + + o1 := &vaultcommon.Observations{ + PendingQueueItems: pendingQueueItems, + } + + o1b, err := proto.Marshal(o1) + require.NoError(t, err) + + bf := &blobber{ + blobs: blobs, + } + + err = r.ValidateObservation( + t.Context(), + seqNr, + types.AttributedQuery{}, + types.AttributedObservation{ + Observer: 0, Observation: o1b, + }, + rdr, + bf, + ) + require.NoError(t, err) +} func TestPlugin_ValidateObservation_GetSecretsRequest(t *testing.T) { lggr := logger.TestLogger(t) store := requests.NewStore[*vaulttypes.Request]() _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -5462,7 +5809,7 @@ func TestPlugin_ValidateObservation_GetSecretsRequest(t *testing.T) { anyp, err := anypb.New(req) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -5678,8 +6025,9 @@ func TestPlugin_ValidateObservation_PanicsOnEmptyShares(t *testing.T) { _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -5743,7 +6091,7 @@ func TestPlugin_ValidateObservation_PanicsOnEmptyShares(t *testing.T) { anyp, err := anypb.New(req) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -5790,8 +6138,9 @@ func TestPlugin_ValidateObservation_NilSecretIdentifier(t *testing.T) { _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -5991,7 +6340,7 @@ func TestPlugin_ValidateObservation_NilSecretIdentifier(t *testing.T) { } require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -6028,8 +6377,9 @@ func TestPlugin_ValidateObservation_CiphertextSize(t *testing.T) { // maxCipherTextLengthBytes = 10 bytes, so any ciphertext > 10 decoded bytes should be rejected r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -6214,7 +6564,7 @@ func TestPlugin_ValidateObservation_CiphertextSize(t *testing.T) { } require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -6258,7 +6608,8 @@ func TestPlugin_StateTransition_PendingQueueEnabled_NewQuora_NotGetRequest(t *te N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -6277,7 +6628,7 @@ func TestPlugin_StateTransition_PendingQueueEnabled_NewQuora_NotGetRequest(t *te kv := &kv{ m: make(map[string]response), } - rs := NewReadStore(kv) + rs := newTestReadStore(t, kv) id := &vaultcommon.SecretIdentifier{ Owner: "owner", @@ -6314,7 +6665,7 @@ func TestPlugin_StateTransition_PendingQueueEnabled_NewQuora_NotGetRequest(t *te assert.True(t, proto.Equal(req, o.GetListSecretIdentifiersRequest())) assert.True(t, proto.Equal(resp, o.GetListSecretIdentifiersResponse())) - ss, err := rs.GetSecret(id) + ss, err := rs.GetSecret(t.Context(), id) require.NoError(t, err) require.Nil(t, ss) @@ -6332,7 +6683,8 @@ func TestPlugin_StateTransition_PendingQueueEnabled_GetRequest(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -6351,7 +6703,7 @@ func TestPlugin_StateTransition_PendingQueueEnabled_GetRequest(t *testing.T) { kv := &kv{ m: make(map[string]response), } - rs := NewReadStore(kv) + rs := newTestReadStore(t, kv) id := &vaultcommon.SecretIdentifier{ Owner: "owner", @@ -6402,7 +6754,7 @@ func TestPlugin_StateTransition_PendingQueueEnabled_GetRequest(t *testing.T) { assert.True(t, proto.Equal(req, o.GetGetSecretsRequest())) assert.True(t, proto.Equal(resp, o.GetGetSecretsResponse())) - ss, err := rs.GetSecret(id) + ss, err := rs.GetSecret(t.Context(), id) require.NoError(t, err) require.Nil(t, ss) @@ -6546,8 +6898,9 @@ func TestPlugin_ValidateObservation_RequestBatchLimit(t *testing.T) { _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -6601,8 +6954,9 @@ func TestPlugin_ValidateObservation_ListSecretIdentifiersExceedsMaxSecretsPerOwn _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -6650,7 +7004,7 @@ func TestPlugin_ValidateObservation_ListSecretIdentifiersExceedsMaxSecretsPerOwn rdr := &kv{m: make(map[string]response)} anyReq, err := anypb.New(listReq) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyReq}, }, diff --git a/core/services/ocr3_1/beholderwrapper/instrumented_blob.go b/core/services/ocr3_1/beholderwrapper/instrumented_blob.go new file mode 100644 index 00000000000..c59b17f1bcc --- /dev/null +++ b/core/services/ocr3_1/beholderwrapper/instrumented_blob.go @@ -0,0 +1,33 @@ +package beholderwrapper + +import ( + "context" + "time" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" +) + +type instrumentedBlobBroadcastFetcher struct { + inner ocr3_1types.BlobBroadcastFetcher + metrics *pluginMetrics + instrumentedBlobFetcher +} + +func (i *instrumentedBlobBroadcastFetcher) BroadcastBlob(ctx context.Context, payload []byte, expirationHint ocr3_1types.BlobExpirationHint) (ocr3_1types.BlobHandle, error) { + start := time.Now() + handle, err := i.inner.BroadcastBlob(ctx, payload, expirationHint) + i.metrics.recordBlobDuration(ctx, "BroadcastBlob", time.Since(start), err == nil) + return handle, err +} + +type instrumentedBlobFetcher struct { + inner ocr3_1types.BlobFetcher + metrics *pluginMetrics +} + +func (i *instrumentedBlobFetcher) FetchBlob(ctx context.Context, handle ocr3_1types.BlobHandle) ([]byte, error) { + start := time.Now() + data, err := i.inner.FetchBlob(ctx, handle) + i.metrics.recordBlobDuration(ctx, "FetchBlob", time.Since(start), err == nil) + return data, err +} diff --git a/core/services/ocr3_1/beholderwrapper/instrumented_kv.go b/core/services/ocr3_1/beholderwrapper/instrumented_kv.go new file mode 100644 index 00000000000..e215fa9d0b7 --- /dev/null +++ b/core/services/ocr3_1/beholderwrapper/instrumented_kv.go @@ -0,0 +1,40 @@ +package beholderwrapper + +import ( + "context" + "time" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" +) + +type instrumentedKVStateReader struct { + inner ocr3_1types.KeyValueStateReader + ctx context.Context //nolint:containedctx // libocr 3.1's API doesn't support passing in ctx via the Read/Write method. + metrics *pluginMetrics +} + +func (i *instrumentedKVStateReader) Read(key []byte) ([]byte, error) { + start := time.Now() + data, err := i.inner.Read(key) + i.metrics.recordKVDuration(i.ctx, "Read", time.Since(start), err == nil) + return data, err +} + +type instrumentedKVStateReadWriter struct { + instrumentedKVStateReader + writer ocr3_1types.KeyValueStateReadWriter +} + +func (i *instrumentedKVStateReadWriter) Write(key []byte, value []byte) error { + start := time.Now() + err := i.writer.Write(key, value) + i.metrics.recordKVDuration(i.ctx, "Write", time.Since(start), err == nil) + return err +} + +func (i *instrumentedKVStateReadWriter) Delete(key []byte) error { + start := time.Now() + err := i.writer.Delete(key) + i.metrics.recordKVDuration(i.ctx, "Delete", time.Since(start), err == nil) + return err +} diff --git a/core/services/ocr3_1/beholderwrapper/plugin.go b/core/services/ocr3_1/beholderwrapper/plugin.go index 356ac62be57..f55095ef739 100644 --- a/core/services/ocr3_1/beholderwrapper/plugin.go +++ b/core/services/ocr3_1/beholderwrapper/plugin.go @@ -26,15 +26,50 @@ func newReportingPlugin[RI any]( } } +func (p *reportingPlugin[RI]) wrapReader(ctx context.Context, r ocr3_1types.KeyValueStateReader) ocr3_1types.KeyValueStateReader { + if r == nil { + return nil + } + return &instrumentedKVStateReader{inner: r, ctx: ctx, metrics: p.metrics} +} + +func (p *reportingPlugin[RI]) wrapReadWriter(ctx context.Context, rw ocr3_1types.KeyValueStateReadWriter) ocr3_1types.KeyValueStateReadWriter { + if rw == nil { + return nil + } + return &instrumentedKVStateReadWriter{ + instrumentedKVStateReader: instrumentedKVStateReader{inner: rw, ctx: ctx, metrics: p.metrics}, + writer: rw, + } +} + +func (p *reportingPlugin[RI]) wrapBroadcastFetcher(bbf ocr3_1types.BlobBroadcastFetcher) ocr3_1types.BlobBroadcastFetcher { + if bbf == nil { + return nil + } + return &instrumentedBlobBroadcastFetcher{ + inner: bbf, + metrics: p.metrics, + instrumentedBlobFetcher: instrumentedBlobFetcher{inner: bbf, metrics: p.metrics}, + } +} + +func (p *reportingPlugin[RI]) wrapFetcher(bf ocr3_1types.BlobFetcher) ocr3_1types.BlobFetcher { + if bf == nil { + return nil + } + return &instrumentedBlobFetcher{inner: bf, metrics: p.metrics} +} + func (p *reportingPlugin[RI]) Query(ctx context.Context, seqNr uint64, keyValueReader ocr3_1types.KeyValueStateReader, blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher) (ocrtypes.Query, error) { return withObservedExecution(ctx, p.metrics, query, func() (ocrtypes.Query, error) { - return p.ReportingPlugin.Query(ctx, seqNr, keyValueReader, blobBroadcastFetcher) + return p.ReportingPlugin.Query(ctx, seqNr, p.wrapReader(ctx, keyValueReader), p.wrapBroadcastFetcher(blobBroadcastFetcher)) }) } func (p *reportingPlugin[RI]) Observation(ctx context.Context, seqNr uint64, aq ocrtypes.AttributedQuery, keyValueReader ocr3_1types.KeyValueStateReader, blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher) (ocrtypes.Observation, error) { result, err := withObservedExecution(ctx, p.metrics, observation, func() (ocrtypes.Observation, error) { - return p.ReportingPlugin.Observation(ctx, seqNr, aq, keyValueReader, blobBroadcastFetcher) + return p.ReportingPlugin.Observation(ctx, seqNr, aq, p.wrapReader(ctx, keyValueReader), p.wrapBroadcastFetcher(blobBroadcastFetcher)) }) if err == nil { p.metrics.trackSize(ctx, observation, len(result)) @@ -44,7 +79,7 @@ func (p *reportingPlugin[RI]) Observation(ctx context.Context, seqNr uint64, aq func (p *reportingPlugin[RI]) ValidateObservation(ctx context.Context, seqNr uint64, aq ocrtypes.AttributedQuery, ao ocrtypes.AttributedObservation, keyValueReader ocr3_1types.KeyValueStateReader, blobFetcher ocr3_1types.BlobFetcher) error { _, err := withObservedExecution(ctx, p.metrics, validateObservation, func() (any, error) { - err := p.ReportingPlugin.ValidateObservation(ctx, seqNr, aq, ao, keyValueReader, blobFetcher) + err := p.ReportingPlugin.ValidateObservation(ctx, seqNr, aq, ao, p.wrapReader(ctx, keyValueReader), p.wrapFetcher(blobFetcher)) return nil, err }) return err @@ -52,13 +87,13 @@ func (p *reportingPlugin[RI]) ValidateObservation(ctx context.Context, seqNr uin func (p *reportingPlugin[RI]) ObservationQuorum(ctx context.Context, seqNr uint64, aq ocrtypes.AttributedQuery, aos []ocrtypes.AttributedObservation, keyValueReader ocr3_1types.KeyValueStateReader, blobFetcher ocr3_1types.BlobFetcher) (bool, error) { return withObservedExecution(ctx, p.metrics, observationQuorum, func() (bool, error) { - return p.ReportingPlugin.ObservationQuorum(ctx, seqNr, aq, aos, keyValueReader, blobFetcher) + return p.ReportingPlugin.ObservationQuorum(ctx, seqNr, aq, aos, p.wrapReader(ctx, keyValueReader), p.wrapFetcher(blobFetcher)) }) } func (p *reportingPlugin[RI]) StateTransition(ctx context.Context, seqNr uint64, aq ocrtypes.AttributedQuery, aos []ocrtypes.AttributedObservation, keyValueReadWriter ocr3_1types.KeyValueStateReadWriter, blobFetcher ocr3_1types.BlobFetcher) (ocr3_1types.ReportsPlusPrecursor, error) { result, err := withObservedExecution(ctx, p.metrics, stateTransition, func() (ocr3_1types.ReportsPlusPrecursor, error) { - return p.ReportingPlugin.StateTransition(ctx, seqNr, aq, aos, keyValueReadWriter, blobFetcher) + return p.ReportingPlugin.StateTransition(ctx, seqNr, aq, aos, p.wrapReadWriter(ctx, keyValueReadWriter), p.wrapFetcher(blobFetcher)) }) if err == nil { p.metrics.trackSize(ctx, stateTransition, len(result)) @@ -68,7 +103,7 @@ func (p *reportingPlugin[RI]) StateTransition(ctx context.Context, seqNr uint64, func (p *reportingPlugin[RI]) Committed(ctx context.Context, seqNr uint64, keyValueReader ocr3_1types.KeyValueStateReader) error { _, err := withObservedExecution(ctx, p.metrics, committed, func() (any, error) { - err := p.ReportingPlugin.Committed(ctx, seqNr, keyValueReader) + err := p.ReportingPlugin.Committed(ctx, seqNr, p.wrapReader(ctx, keyValueReader)) return nil, err }) return err diff --git a/core/services/ocr3_1/beholderwrapper/plugin_test.go b/core/services/ocr3_1/beholderwrapper/plugin_test.go index d0b5b520780..845a8edc3fd 100644 --- a/core/services/ocr3_1/beholderwrapper/plugin_test.go +++ b/core/services/ocr3_1/beholderwrapper/plugin_test.go @@ -109,6 +109,425 @@ func Test_ReportingPlugin_PropagatesErrors(t *testing.T) { require.ErrorIs(t, err, expectedErr) } +func Test_InstrumentedBlobBroadcastFetcher(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + inner := &fakeBlobBroadcastFetcher{ + broadcastPayload: []byte("broadcast-handle"), + fetchPayload: []byte("fetched-data"), + } + + wrapped := &instrumentedBlobBroadcastFetcher{ + inner: inner, + metrics: metrics, + instrumentedBlobFetcher: instrumentedBlobFetcher{ + inner: inner, + metrics: metrics, + }, + } + + // BroadcastBlob delegates and records metrics + handle, err := wrapped.BroadcastBlob(t.Context(), []byte("payload"), ocr3_1types.BlobExpirationHintSequenceNumber{SeqNr: 1}) + require.NoError(t, err) + require.Equal(t, 1, inner.broadcastCalls) + + // FetchBlob delegates and records metrics + data, err := wrapped.FetchBlob(t.Context(), handle) + require.NoError(t, err) + require.Equal(t, []byte("fetched-data"), data) + require.Equal(t, 1, inner.fetchCalls) +} + +func Test_InstrumentedBlobBroadcastFetcher_PropagatesErrors(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + expectedErr := errors.New("blob error") + inner := &fakeBlobBroadcastFetcher{err: expectedErr} + wrapped := &instrumentedBlobBroadcastFetcher{ + inner: inner, + metrics: metrics, + instrumentedBlobFetcher: instrumentedBlobFetcher{ + inner: inner, + metrics: metrics, + }, + } + + _, err = wrapped.BroadcastBlob(t.Context(), []byte("payload"), ocr3_1types.BlobExpirationHintSequenceNumber{SeqNr: 1}) + require.ErrorIs(t, err, expectedErr) + + _, err = wrapped.FetchBlob(t.Context(), ocr3_1types.BlobHandle{}) + require.ErrorIs(t, err, expectedErr) +} + +func Test_InstrumentedBlobFetcher(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + inner := &fakeBlobFetcher{fetchPayload: []byte("fetched-data")} + wrapped := &instrumentedBlobFetcher{inner: inner, metrics: metrics} + + data, err := wrapped.FetchBlob(t.Context(), ocr3_1types.BlobHandle{}) + require.NoError(t, err) + require.Equal(t, []byte("fetched-data"), data) + require.Equal(t, 1, inner.fetchCalls) +} + +func Test_InstrumentedBlobFetcher_PropagatesErrors(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + expectedErr := errors.New("fetch error") + inner := &fakeBlobFetcher{err: expectedErr} + wrapped := &instrumentedBlobFetcher{inner: inner, metrics: metrics} + + _, err = wrapped.FetchBlob(t.Context(), ocr3_1types.BlobHandle{}) + require.ErrorIs(t, err, expectedErr) +} + +func Test_ReportingPlugin_WrapsBlobs(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + innerBlob := &fakeBlobBroadcastFetcher{ + broadcastPayload: []byte("handle"), + fetchPayload: []byte("data"), + } + innerFetcher := &fakeBlobFetcher{fetchPayload: []byte("data")} + + capturingPlugin := &blobCapturingPlugin[uint]{} + plugin := newReportingPlugin[uint](capturingPlugin, metrics) + + // Query wraps BlobBroadcastFetcher + _, _ = plugin.Query(t.Context(), 1, nil, innerBlob) + require.IsType(t, &instrumentedBlobBroadcastFetcher{}, capturingPlugin.lastBroadcastFetcher) + + // Observation wraps BlobBroadcastFetcher + _, _ = plugin.Observation(t.Context(), 1, ocrtypes.AttributedQuery{}, nil, innerBlob) + require.IsType(t, &instrumentedBlobBroadcastFetcher{}, capturingPlugin.lastBroadcastFetcher) + + // ValidateObservation wraps BlobFetcher + _ = plugin.ValidateObservation(t.Context(), 1, ocrtypes.AttributedQuery{}, ocrtypes.AttributedObservation{}, nil, innerFetcher) + require.IsType(t, &instrumentedBlobFetcher{}, capturingPlugin.lastFetcher) + + // ObservationQuorum wraps BlobFetcher + _, _ = plugin.ObservationQuorum(t.Context(), 1, ocrtypes.AttributedQuery{}, nil, nil, innerFetcher) + require.IsType(t, &instrumentedBlobFetcher{}, capturingPlugin.lastFetcher) + + // StateTransition wraps BlobFetcher + _, _ = plugin.StateTransition(t.Context(), 1, ocrtypes.AttributedQuery{}, nil, nil, innerFetcher) + require.IsType(t, &instrumentedBlobFetcher{}, capturingPlugin.lastFetcher) + + // nil is preserved + _, _ = plugin.Query(t.Context(), 1, nil, nil) + require.Nil(t, capturingPlugin.lastBroadcastFetcher) + + _ = plugin.ValidateObservation(t.Context(), 1, ocrtypes.AttributedQuery{}, ocrtypes.AttributedObservation{}, nil, nil) + require.Nil(t, capturingPlugin.lastFetcher) +} + +func Test_InstrumentedKVStateReader(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + inner := &fakeKVStateReader{data: map[string][]byte{"key1": []byte("value1")}} + wrapped := &instrumentedKVStateReader{inner: inner, ctx: t.Context(), metrics: metrics} + + data, err := wrapped.Read([]byte("key1")) + require.NoError(t, err) + require.Equal(t, []byte("value1"), data) + require.Equal(t, 1, inner.readCalls) + + // Missing key returns nil + data, err = wrapped.Read([]byte("missing")) + require.NoError(t, err) + require.Nil(t, data) + require.Equal(t, 2, inner.readCalls) +} + +func Test_InstrumentedKVStateReader_PropagatesErrors(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + expectedErr := errors.New("read error") + inner := &fakeKVStateReader{err: expectedErr} + wrapped := &instrumentedKVStateReader{inner: inner, ctx: t.Context(), metrics: metrics} + + _, err = wrapped.Read([]byte("key")) + require.ErrorIs(t, err, expectedErr) +} + +func Test_InstrumentedKVStateReadWriter(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + inner := &fakeKVStateReadWriter{fakeKVStateReader: fakeKVStateReader{data: map[string][]byte{}}} + wrapped := &instrumentedKVStateReadWriter{ + instrumentedKVStateReader: instrumentedKVStateReader{inner: inner, ctx: t.Context(), metrics: metrics}, + writer: inner, + } + + // Write + err = wrapped.Write([]byte("key1"), []byte("value1")) + require.NoError(t, err) + require.Equal(t, 1, inner.writeCalls) + + // Read back through the wrapper + data, err := wrapped.Read([]byte("key1")) + require.NoError(t, err) + require.Equal(t, []byte("value1"), data) + + // Delete + err = wrapped.Delete([]byte("key1")) + require.NoError(t, err) + require.Equal(t, 1, inner.deleteCalls) + + // Read returns nil after delete + data, err = wrapped.Read([]byte("key1")) + require.NoError(t, err) + require.Nil(t, data) +} + +func Test_InstrumentedKVStateReadWriter_PropagatesErrors(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + expectedErr := errors.New("write error") + inner := &fakeKVStateReadWriter{fakeKVStateReader: fakeKVStateReader{err: expectedErr}} + wrapped := &instrumentedKVStateReadWriter{ + instrumentedKVStateReader: instrumentedKVStateReader{inner: inner, ctx: t.Context(), metrics: metrics}, + writer: inner, + } + + _, err = wrapped.Read([]byte("key")) + require.ErrorIs(t, err, expectedErr) + + err = wrapped.Write([]byte("key"), []byte("value")) + require.ErrorIs(t, err, expectedErr) + + err = wrapped.Delete([]byte("key")) + require.ErrorIs(t, err, expectedErr) +} + +func Test_ReportingPlugin_WrapsKV(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + innerReader := &fakeKVStateReader{data: map[string][]byte{}} + innerReadWriter := &fakeKVStateReadWriter{fakeKVStateReader: fakeKVStateReader{data: map[string][]byte{}}} + + capturingPlugin := &kvCapturingPlugin[uint]{} + plugin := newReportingPlugin[uint](capturingPlugin, metrics) + + // Query wraps KeyValueStateReader + _, _ = plugin.Query(t.Context(), 1, innerReader, nil) + require.IsType(t, &instrumentedKVStateReader{}, capturingPlugin.lastReader) + + // Observation wraps KeyValueStateReader + _, _ = plugin.Observation(t.Context(), 1, ocrtypes.AttributedQuery{}, innerReader, nil) + require.IsType(t, &instrumentedKVStateReader{}, capturingPlugin.lastReader) + + // ValidateObservation wraps KeyValueStateReader + _ = plugin.ValidateObservation(t.Context(), 1, ocrtypes.AttributedQuery{}, ocrtypes.AttributedObservation{}, innerReader, nil) + require.IsType(t, &instrumentedKVStateReader{}, capturingPlugin.lastReader) + + // ObservationQuorum wraps KeyValueStateReader + _, _ = plugin.ObservationQuorum(t.Context(), 1, ocrtypes.AttributedQuery{}, nil, innerReader, nil) + require.IsType(t, &instrumentedKVStateReader{}, capturingPlugin.lastReader) + + // StateTransition wraps KeyValueStateReadWriter + _, _ = plugin.StateTransition(t.Context(), 1, ocrtypes.AttributedQuery{}, nil, innerReadWriter, nil) + require.IsType(t, &instrumentedKVStateReadWriter{}, capturingPlugin.lastReadWriter) + + // Committed wraps KeyValueStateReader + _ = plugin.Committed(t.Context(), 1, innerReader) + require.IsType(t, &instrumentedKVStateReader{}, capturingPlugin.lastReader) + + // nil is preserved + _, _ = plugin.Query(t.Context(), 1, nil, nil) + require.Nil(t, capturingPlugin.lastReader) + + _, _ = plugin.StateTransition(t.Context(), 1, ocrtypes.AttributedQuery{}, nil, nil, nil) + require.Nil(t, capturingPlugin.lastReadWriter) +} + +type fakeKVStateReader struct { + data map[string][]byte + err error + readCalls int +} + +func (f *fakeKVStateReader) Read(key []byte) ([]byte, error) { + f.readCalls++ + if f.err != nil { + return nil, f.err + } + return f.data[string(key)], nil +} + +type fakeKVStateReadWriter struct { + fakeKVStateReader + writeCalls int + deleteCalls int +} + +func (f *fakeKVStateReadWriter) Write(key []byte, value []byte) error { + f.writeCalls++ + if f.err != nil { + return f.err + } + f.data[string(key)] = value + return nil +} + +func (f *fakeKVStateReadWriter) Delete(key []byte) error { + f.deleteCalls++ + if f.err != nil { + return f.err + } + delete(f.data, string(key)) + return nil +} + +// kvCapturingPlugin captures the KV reader/writer it receives so tests can assert on wrapping. +type kvCapturingPlugin[RI any] struct { + lastReader ocr3_1types.KeyValueStateReader + lastReadWriter ocr3_1types.KeyValueStateReadWriter +} + +func (p *kvCapturingPlugin[RI]) Query(_ context.Context, _ uint64, r ocr3_1types.KeyValueStateReader, _ ocr3_1types.BlobBroadcastFetcher) (ocrtypes.Query, error) { + p.lastReader = r + return ocrtypes.Query{}, nil +} + +func (p *kvCapturingPlugin[RI]) Observation(_ context.Context, _ uint64, _ ocrtypes.AttributedQuery, r ocr3_1types.KeyValueStateReader, _ ocr3_1types.BlobBroadcastFetcher) (ocrtypes.Observation, error) { + p.lastReader = r + return ocrtypes.Observation{}, nil +} + +func (p *kvCapturingPlugin[RI]) ValidateObservation(_ context.Context, _ uint64, _ ocrtypes.AttributedQuery, _ ocrtypes.AttributedObservation, r ocr3_1types.KeyValueStateReader, _ ocr3_1types.BlobFetcher) error { + p.lastReader = r + return nil +} + +func (p *kvCapturingPlugin[RI]) ObservationQuorum(_ context.Context, _ uint64, _ ocrtypes.AttributedQuery, _ []ocrtypes.AttributedObservation, r ocr3_1types.KeyValueStateReader, _ ocr3_1types.BlobFetcher) (bool, error) { + p.lastReader = r + return true, nil +} + +func (p *kvCapturingPlugin[RI]) StateTransition(_ context.Context, _ uint64, _ ocrtypes.AttributedQuery, _ []ocrtypes.AttributedObservation, rw ocr3_1types.KeyValueStateReadWriter, _ ocr3_1types.BlobFetcher) (ocr3_1types.ReportsPlusPrecursor, error) { + p.lastReadWriter = rw + return nil, nil +} + +func (p *kvCapturingPlugin[RI]) Committed(_ context.Context, _ uint64, r ocr3_1types.KeyValueStateReader) error { + p.lastReader = r + return nil +} + +func (p *kvCapturingPlugin[RI]) Reports(context.Context, uint64, ocr3_1types.ReportsPlusPrecursor) ([]ocr3types.ReportPlus[RI], error) { + return nil, nil +} + +func (p *kvCapturingPlugin[RI]) ShouldAcceptAttestedReport(context.Context, uint64, ocr3types.ReportWithInfo[RI]) (bool, error) { + return true, nil +} + +func (p *kvCapturingPlugin[RI]) ShouldTransmitAcceptedReport(context.Context, uint64, ocr3types.ReportWithInfo[RI]) (bool, error) { + return true, nil +} + +func (p *kvCapturingPlugin[RI]) Close() error { + return nil +} + +type fakeBlobBroadcastFetcher struct { + broadcastPayload []byte + fetchPayload []byte + err error + broadcastCalls int + fetchCalls int +} + +func (f *fakeBlobBroadcastFetcher) BroadcastBlob(_ context.Context, _ []byte, _ ocr3_1types.BlobExpirationHint) (ocr3_1types.BlobHandle, error) { + f.broadcastCalls++ + return ocr3_1types.BlobHandle{}, f.err +} + +func (f *fakeBlobBroadcastFetcher) FetchBlob(_ context.Context, _ ocr3_1types.BlobHandle) ([]byte, error) { + f.fetchCalls++ + if f.err != nil { + return nil, f.err + } + return f.fetchPayload, nil +} + +type fakeBlobFetcher struct { + fetchPayload []byte + err error + fetchCalls int +} + +func (f *fakeBlobFetcher) FetchBlob(_ context.Context, _ ocr3_1types.BlobHandle) ([]byte, error) { + f.fetchCalls++ + if f.err != nil { + return nil, f.err + } + return f.fetchPayload, nil +} + +// blobCapturingPlugin captures the blob fetcher/broadcaster it receives so tests can assert on wrapping. +type blobCapturingPlugin[RI any] struct { + lastBroadcastFetcher ocr3_1types.BlobBroadcastFetcher + lastFetcher ocr3_1types.BlobFetcher +} + +func (p *blobCapturingPlugin[RI]) Query(_ context.Context, _ uint64, _ ocr3_1types.KeyValueStateReader, bbf ocr3_1types.BlobBroadcastFetcher) (ocrtypes.Query, error) { + p.lastBroadcastFetcher = bbf + return ocrtypes.Query{}, nil +} + +func (p *blobCapturingPlugin[RI]) Observation(_ context.Context, _ uint64, _ ocrtypes.AttributedQuery, _ ocr3_1types.KeyValueStateReader, bbf ocr3_1types.BlobBroadcastFetcher) (ocrtypes.Observation, error) { + p.lastBroadcastFetcher = bbf + return ocrtypes.Observation{}, nil +} + +func (p *blobCapturingPlugin[RI]) ValidateObservation(_ context.Context, _ uint64, _ ocrtypes.AttributedQuery, _ ocrtypes.AttributedObservation, _ ocr3_1types.KeyValueStateReader, bf ocr3_1types.BlobFetcher) error { + p.lastFetcher = bf + return nil +} + +func (p *blobCapturingPlugin[RI]) ObservationQuorum(_ context.Context, _ uint64, _ ocrtypes.AttributedQuery, _ []ocrtypes.AttributedObservation, _ ocr3_1types.KeyValueStateReader, bf ocr3_1types.BlobFetcher) (bool, error) { + p.lastFetcher = bf + return true, nil +} + +func (p *blobCapturingPlugin[RI]) StateTransition(_ context.Context, _ uint64, _ ocrtypes.AttributedQuery, _ []ocrtypes.AttributedObservation, _ ocr3_1types.KeyValueStateReadWriter, bf ocr3_1types.BlobFetcher) (ocr3_1types.ReportsPlusPrecursor, error) { + p.lastFetcher = bf + return nil, nil +} + +func (p *blobCapturingPlugin[RI]) Committed(context.Context, uint64, ocr3_1types.KeyValueStateReader) error { + return nil +} + +func (p *blobCapturingPlugin[RI]) Reports(context.Context, uint64, ocr3_1types.ReportsPlusPrecursor) ([]ocr3types.ReportPlus[RI], error) { + return nil, nil +} + +func (p *blobCapturingPlugin[RI]) ShouldAcceptAttestedReport(context.Context, uint64, ocr3types.ReportWithInfo[RI]) (bool, error) { + return true, nil +} + +func (p *blobCapturingPlugin[RI]) ShouldTransmitAcceptedReport(context.Context, uint64, ocr3types.ReportWithInfo[RI]) (bool, error) { + return true, nil +} + +func (p *blobCapturingPlugin[RI]) Close() error { + return nil +} + type fakePlugin[RI any] struct { reports []ocr3types.ReportPlus[RI] observationSize int diff --git a/core/services/ocr3_1/beholderwrapper/types.go b/core/services/ocr3_1/beholderwrapper/types.go index 0c8c16d93a3..cbde5670fe3 100644 --- a/core/services/ocr3_1/beholderwrapper/types.go +++ b/core/services/ocr3_1/beholderwrapper/types.go @@ -36,6 +36,8 @@ type pluginMetrics struct { reportsGenerated metric.Int64Counter sizes metric.Int64Histogram status metric.Int64Gauge + blobDurations metric.Int64Histogram + kvDurations metric.Int64Histogram } func newPluginMetrics(plugin, configDigest string) (*pluginMetrics, error) { @@ -59,6 +61,16 @@ func newPluginMetrics(plugin, configDigest string) (*pluginMetrics, error) { return nil, fmt.Errorf("failed to create status gauge: %w", err) } + blobDurations, err := beholder.GetMeter().Int64Histogram("platform_ocr3_1_reporting_plugin_blob_duration_ms", metric.WithUnit("ms")) + if err != nil { + return nil, fmt.Errorf("failed to create blob duration histogram: %w", err) + } + + kvDurations, err := beholder.GetMeter().Int64Histogram("platform_ocr3_1_reporting_plugin_kv_duration_ms", metric.WithUnit("ms")) + if err != nil { + return nil, fmt.Errorf("failed to create kv duration histogram: %w", err) + } + return &pluginMetrics{ plugin: plugin, configDigest: configDigest, @@ -66,6 +78,8 @@ func newPluginMetrics(plugin, configDigest string) (*pluginMetrics, error) { reportsGenerated: reportsGenerated, sizes: sizes, status: status, + blobDurations: blobDurations, + kvDurations: kvDurations, }, nil } @@ -95,6 +109,24 @@ func (m *pluginMetrics) trackSize(ctx context.Context, function functionType, si )) } +func (m *pluginMetrics) recordKVDuration(ctx context.Context, method string, d time.Duration, success bool) { + m.kvDurations.Record(ctx, d.Milliseconds(), metric.WithAttributes( + attribute.String("plugin", m.plugin), + attribute.String("method", method), + attribute.String("success", strconv.FormatBool(success)), + attribute.String("configDigest", m.configDigest), + )) +} + +func (m *pluginMetrics) recordBlobDuration(ctx context.Context, method string, d time.Duration, success bool) { + m.blobDurations.Record(ctx, d.Milliseconds(), metric.WithAttributes( + attribute.String("plugin", m.plugin), + attribute.String("method", method), + attribute.String("success", strconv.FormatBool(success)), + attribute.String("configDigest", m.configDigest), + )) +} + func (m *pluginMetrics) updateStatus(ctx context.Context, up bool) { val := int64(0) if up { @@ -113,16 +145,29 @@ func MetricViews() []sdkmetric.View { sdkmetric.NewView( sdkmetric.Instrument{Name: "platform_ocr3_1_reporting_plugin_duration_ms"}, sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ - // 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560 - Boundaries: prometheus.ExponentialBuckets(5, 2, 10), + // 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240, 20480, 40960 + Boundaries: prometheus.ExponentialBuckets(5, 2, 14), + }}, + ), + sdkmetric.NewView( + sdkmetric.Instrument{Name: "platform_ocr3_1_reporting_plugin_kv_duration_ms"}, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + // 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240, 20480, 40960 + Boundaries: prometheus.ExponentialBuckets(5, 2, 14), + }}, + ), + sdkmetric.NewView( + sdkmetric.Instrument{Name: "platform_ocr3_1_reporting_plugin_blob_duration_ms"}, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + // 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240, 20480, 40960 + Boundaries: prometheus.ExponentialBuckets(5, 2, 14), }}, ), sdkmetric.NewView( sdkmetric.Instrument{Name: "platform_ocr3_1_reporting_plugin_data_sizes"}, sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ - // 512KB is the max value possible - // 1KB, 2KB, 4KB, 8KB, 16KB, 32KB, 64KB, 128KB, 256KB, 512KB - Boundaries: prometheus.ExponentialBuckets(1024, 2, 10), + // 1KB, 2KB, 4KB, 8KB, 16KB, 32KB, 64KB, 128KB, 256KB, 512KB, 1024KB, 2048KB, 4096KB, 8192KB + Boundaries: prometheus.ExponentialBuckets(1024, 2, 14), }}, ), }