Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 49 additions & 34 deletions core/services/ocr2/plugins/vault/kvstore.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package vault

import (
"context"
"errors"
"fmt"
"strconv"
"time"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types"
"google.golang.org/protobuf/proto"
Expand All @@ -20,38 +22,44 @@ const (
)

type KVStore struct {
reader ocr3_1types.KeyValueStateReader
writer ocr3_1types.KeyValueStateReadWriter
reader ocr3_1types.KeyValueStateReader
writer ocr3_1types.KeyValueStateReadWriter
metrics *pluginMetrics
}

func (s *KVStore) trackDuration(ctx context.Context, method string, start time.Time) {
s.metrics.trackKVOperation(ctx, method, time.Since(start).Milliseconds())
}

type ReadKVStore interface {
GetSecret(id *vault.SecretIdentifier) (*vault.StoredSecret, error)
GetMetadata(owner string) (*vault.StoredMetadata, error)
GetSecretIdentifiersCountForOwner(owner string) (int, error)
GetPendingQueue() ([]*vault.StoredPendingQueueItem, error)
GetSecret(ctx context.Context, id *vault.SecretIdentifier) (*vault.StoredSecret, error)
GetMetadata(ctx context.Context, owner string) (*vault.StoredMetadata, error)
GetSecretIdentifiersCountForOwner(ctx context.Context, owner string) (int, error)
GetPendingQueue(ctx context.Context) ([]*vault.StoredPendingQueueItem, error)
}

type WriteKVStore interface {
ReadKVStore
WriteSecret(id *vault.SecretIdentifier, secret *vault.StoredSecret) error
WriteMetadata(owner string, metadata *vault.StoredMetadata) error
DeleteSecret(id *vault.SecretIdentifier) error
WritePendingQueue(pending []*vault.StoredPendingQueueItem) error
WriteSecret(ctx context.Context, id *vault.SecretIdentifier, secret *vault.StoredSecret) error
WriteMetadata(ctx context.Context, owner string, metadata *vault.StoredMetadata) error
DeleteSecret(ctx context.Context, id *vault.SecretIdentifier) error
WritePendingQueue(ctx context.Context, pending []*vault.StoredPendingQueueItem) error
}

func NewReadStore(reader ocr3_1types.KeyValueStateReader) *KVStore {
return &KVStore{reader: reader}
func NewReadStore(reader ocr3_1types.KeyValueStateReader, metrics *pluginMetrics) *KVStore {
return &KVStore{reader: reader, metrics: metrics}
}

func NewWriteStore(writer ocr3_1types.KeyValueStateReadWriter) *KVStore {
return &KVStore{reader: writer, writer: writer}
func NewWriteStore(writer ocr3_1types.KeyValueStateReadWriter, metrics *pluginMetrics) *KVStore {
return &KVStore{reader: writer, writer: writer, metrics: metrics}
}

func (s *KVStore) GetSecret(id *vault.SecretIdentifier) (*vault.StoredSecret, error) {
func (s *KVStore) GetSecret(ctx context.Context, id *vault.SecretIdentifier) (*vault.StoredSecret, error) {
defer s.trackDuration(ctx, "GetSecret", time.Now())
if id == nil {
return nil, errors.New("id cannot be nil")
}
found, err := s.metadataContainsID(id)
found, err := s.metadataContainsID(ctx, id)
if err != nil {
return nil, fmt.Errorf("failed to check if metadata contains id: %w", err)
}
Expand All @@ -77,7 +85,8 @@ func (s *KVStore) GetSecret(id *vault.SecretIdentifier) (*vault.StoredSecret, er
return secret, nil
}

func (s *KVStore) GetMetadata(owner string) (*vault.StoredMetadata, error) {
func (s *KVStore) GetMetadata(ctx context.Context, owner string) (*vault.StoredMetadata, error) {
defer s.trackDuration(ctx, "GetMetadata", time.Now())
b, err := s.reader.Read([]byte(metadataPrefix + owner))
if err != nil {
return nil, fmt.Errorf("failed to read metadata: %w", err)
Expand All @@ -95,8 +104,9 @@ func (s *KVStore) GetMetadata(owner string) (*vault.StoredMetadata, error) {
return md, nil
}

func (s *KVStore) GetSecretIdentifiersCountForOwner(owner string) (int, error) {
md, err := s.GetMetadata(owner)
func (s *KVStore) GetSecretIdentifiersCountForOwner(ctx context.Context, owner string) (int, error) {
defer s.trackDuration(ctx, "GetSecretIdentifiersCountForOwner", time.Now())
md, err := s.GetMetadata(ctx, owner)
if err != nil {
return 0, fmt.Errorf("failed to get metadata for owner %s: %w", owner, err)
}
Expand All @@ -108,7 +118,8 @@ func (s *KVStore) GetSecretIdentifiersCountForOwner(owner string) (int, error) {
return count, nil
}

func (s *KVStore) WriteMetadata(owner string, metadata *vault.StoredMetadata) error {
func (s *KVStore) WriteMetadata(ctx context.Context, owner string, metadata *vault.StoredMetadata) error {
defer s.trackDuration(ctx, "WriteMetadata", time.Now())
if metadata == nil {
return errors.New("metadata cannot be nil")
}
Expand All @@ -125,11 +136,11 @@ func (s *KVStore) WriteMetadata(owner string, metadata *vault.StoredMetadata) er
return nil
}

func (s *KVStore) metadataContainsID(id *vault.SecretIdentifier) (bool, error) {
func (s *KVStore) metadataContainsID(ctx context.Context, id *vault.SecretIdentifier) (bool, error) {
if id == nil {
return false, errors.New("id cannot be nil")
}
md, err := s.GetMetadata(id.Owner)
md, err := s.GetMetadata(ctx, id.Owner)
if err != nil {
return false, fmt.Errorf("failed to get metadata for owner %s: %w", id.Owner, err)
}
Expand All @@ -147,11 +158,11 @@ func (s *KVStore) metadataContainsID(id *vault.SecretIdentifier) (bool, error) {
return false, nil
}

func (s *KVStore) addIDToMetadata(id *vault.SecretIdentifier) error {
func (s *KVStore) addIDToMetadata(ctx context.Context, id *vault.SecretIdentifier) error {
if id == nil {
return errors.New("id cannot be nil")
}
md, err := s.GetMetadata(id.Owner)
md, err := s.GetMetadata(ctx, id.Owner)
if err != nil {
return fmt.Errorf("failed to get metadata for owner %s: %w", id.Owner, err)
}
Expand All @@ -171,19 +182,19 @@ func (s *KVStore) addIDToMetadata(id *vault.SecretIdentifier) error {
md.SecretIdentifiers = append(md.SecretIdentifiers, id)
}

err = s.WriteMetadata(id.Owner, md)
err = s.WriteMetadata(ctx, id.Owner, md)
if err != nil {
return fmt.Errorf("failed to write metadata for owner %s: %w", id.Owner, err)
}

return nil
}

func (s *KVStore) removeIDFromMetadata(id *vault.SecretIdentifier) error {
func (s *KVStore) removeIDFromMetadata(ctx context.Context, id *vault.SecretIdentifier) error {
if id == nil {
return errors.New("id cannot be nil")
}
md, err := s.GetMetadata(id.Owner)
md, err := s.GetMetadata(ctx, id.Owner)
if err != nil {
return fmt.Errorf("failed to get metadata for owner %s: %w", id.Owner, err)
}
Expand All @@ -209,15 +220,16 @@ func (s *KVStore) removeIDFromMetadata(id *vault.SecretIdentifier) error {
newMd := &vault.StoredMetadata{
SecretIdentifiers: si,
}
err = s.WriteMetadata(id.Owner, newMd)
err = s.WriteMetadata(ctx, id.Owner, newMd)
if err != nil {
return fmt.Errorf("failed to write metadata for owner %s: %w", id.Owner, err)
}

return nil
}

func (s *KVStore) WriteSecret(id *vault.SecretIdentifier, secret *vault.StoredSecret) error {
func (s *KVStore) WriteSecret(ctx context.Context, id *vault.SecretIdentifier, secret *vault.StoredSecret) error {
defer s.trackDuration(ctx, "WriteSecret", time.Now())
if id == nil {
return errors.New("id cannot be nil")
}
Expand All @@ -231,18 +243,19 @@ func (s *KVStore) WriteSecret(id *vault.SecretIdentifier, secret *vault.StoredSe
return fmt.Errorf("failed to write secret: %w", err)
}

if err := s.addIDToMetadata(id); err != nil {
if err := s.addIDToMetadata(ctx, id); err != nil {
return fmt.Errorf("failed to add id to metadata: %w", err)
}

return nil
}

func (s *KVStore) DeleteSecret(id *vault.SecretIdentifier) error {
func (s *KVStore) DeleteSecret(ctx context.Context, id *vault.SecretIdentifier) error {
defer s.trackDuration(ctx, "DeleteSecret", time.Now())
if id == nil {
return errors.New("id cannot be nil")
}
err := s.removeIDFromMetadata(id)
err := s.removeIDFromMetadata(ctx, id)
if err != nil {
return fmt.Errorf("failed to remove id from metadata: %w", err)
}
Expand All @@ -255,7 +268,8 @@ func (s *KVStore) DeleteSecret(id *vault.SecretIdentifier) error {
return nil
}

func (s *KVStore) GetPendingQueue() ([]*vault.StoredPendingQueueItem, error) {
func (s *KVStore) GetPendingQueue(ctx context.Context) ([]*vault.StoredPendingQueueItem, error) {
defer s.trackDuration(ctx, "GetPendingQueue", time.Now())
indexBytes, err := s.reader.Read([]byte(pendingQueueIndex))
if err != nil {
return nil, fmt.Errorf("failed to read pending queue index: %w", err)
Expand Down Expand Up @@ -320,7 +334,8 @@ func (s *KVStore) deletePendingQueue() error {
return nil
}

func (s *KVStore) WritePendingQueue(pending []*vault.StoredPendingQueueItem) error {
func (s *KVStore) WritePendingQueue(ctx context.Context, pending []*vault.StoredPendingQueueItem) error {
defer s.trackDuration(ctx, "WritePendingQueue", time.Now())
err := s.deletePendingQueue()
if err != nil {
return fmt.Errorf("failed to delete pending requests: %w", err)
Expand Down
Loading
Loading