Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func (w *launcher) addRemoteCapability(ctx context.Context, cid string, capabili

methodConfig := capabilityConfig.CapabilityMethodConfig
if methodConfig != nil { // v2 capability - handle via CombinedClient
errAdd := w.addRemoteCapabilityV2(ctx, capability.ID, methodConfig, myDON, remoteDON)
errAdd := w.addRemoteCapabilityV2(ctx, capability.ID, methodConfig, myDON, remoteDON, localRegistry)
if errAdd != nil {
return fmt.Errorf("failed to add remote v2 capability %s: %w", capability.ID, errAdd)
}
Expand Down Expand Up @@ -581,7 +581,7 @@ func (w *launcher) addRemoteCapability(ctx context.Context, cid string, capabili
w.cachedShims.executableClients[shimKey] = execCap
}
// V1 capabilities read transmission schedule from every request
if errCfg := execCap.SetConfig(info, myDON.DON, defaultTargetRequestTimeout, nil); errCfg != nil {
if errCfg := execCap.SetConfig(info, myDON.DON, defaultTargetRequestTimeout, nil, nil); errCfg != nil {
return nil, fmt.Errorf("failed to set trigger config: %w", errCfg)
}
return execCap.(capabilityService), nil
Expand All @@ -607,7 +607,7 @@ func (w *launcher) addRemoteCapability(ctx context.Context, cid string, capabili
w.cachedShims.executableClients[shimKey] = execCap
}
// V1 capabilities read transmission schedule from every request
if errCfg := execCap.SetConfig(info, myDON.DON, defaultTargetRequestTimeout, nil); errCfg != nil {
if errCfg := execCap.SetConfig(info, myDON.DON, defaultTargetRequestTimeout, nil, nil); errCfg != nil {
return nil, fmt.Errorf("failed to set trigger config: %w", errCfg)
}
return execCap.(capabilityService), nil
Expand Down Expand Up @@ -914,7 +914,7 @@ func signersFor(don registrysyncer.DON, localRegistry *registrysyncer.LocalRegis
}

// Add a V2 capability with multiple methods, using CombinedClient.
func (w *launcher) addRemoteCapabilityV2(ctx context.Context, capID string, methodConfig map[string]capabilities.CapabilityMethodConfig, myDON registrysyncer.DON, remoteDON registrysyncer.DON) error {
func (w *launcher) addRemoteCapabilityV2(ctx context.Context, capID string, methodConfig map[string]capabilities.CapabilityMethodConfig, myDON registrysyncer.DON, remoteDON registrysyncer.DON, localRegistry *registrysyncer.LocalRegistry) error {
info, err := capabilities.NewRemoteCapabilityInfo(
capID,
capabilities.CapabilityTypeCombined,
Expand Down Expand Up @@ -969,7 +969,12 @@ func (w *launcher) addRemoteCapabilityV2(ctx context.Context, capID string, meth
Schedule: transmission.EnumToString(config.RemoteExecutableConfig.TransmissionSchedule),
DeltaStage: config.RemoteExecutableConfig.DeltaStage,
}
err := client.SetConfig(info, myDON.DON, config.RemoteExecutableConfig.RequestTimeout, transmissionConfig)

signers, err := signersFor(remoteDON, localRegistry)
if err != nil {
return fmt.Errorf("failed to get signers for executable client: %w", err)
}
err = client.SetConfig(info, myDON.DON, config.RemoteExecutableConfig.RequestTimeout, transmissionConfig, signers)
if err != nil {
w.lggr.Errorw("failed to update client config", "capID", capID, "method", method, "error", err)
continue
Expand Down
9 changes: 6 additions & 3 deletions core/capabilities/remote/executable/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ type dynamicConfig struct {
requestTimeout time.Duration
// Has to be set only for V2 capabilities. V1 capabilities read transmission schedule from every request.
transmissionConfig *transmission.TransmissionConfig
// Has to be set only for V2 capabilities using OCR.
signers [][]byte
}

type Client interface {
commoncap.ExecutableCapability
Receive(ctx context.Context, msg *types.MessageBody)
SetConfig(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig) error
SetConfig(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig, signers [][]byte) error
}

var _ Client = &client{}
Expand All @@ -78,7 +80,7 @@ func NewClient(capabilityID string, capMethodName string, dispatcher types.Dispa

// SetConfig sets the remote capability configuration dynamically
// TransmissionConfig has to be set only for V2 capabilities. V1 capabilities read transmission schedule from every request.
func (c *client) SetConfig(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig) error {
func (c *client) SetConfig(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig, signers [][]byte) error {
if remoteCapabilityInfo.ID == "" || remoteCapabilityInfo.ID != c.capabilityID {
return fmt.Errorf("capability info provided does not match the client's capabilityID: %s != %s", remoteCapabilityInfo.ID, c.capabilityID)
}
Expand All @@ -98,6 +100,7 @@ func (c *client) SetConfig(remoteCapabilityInfo commoncap.CapabilityInfo, localD
localDONInfo: localDonInfo,
requestTimeout: requestTimeout,
transmissionConfig: transmissionConfig,
signers: signers,
})
c.lggr.Infow("SetConfig", "remoteDONName", remoteCapabilityInfo.DON.Name, "remoteDONID", remoteCapabilityInfo.DON.ID, "requestTimeout", requestTimeout, "transmissionConfig", transmissionConfig)
return nil
Expand Down Expand Up @@ -234,7 +237,7 @@ func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest
}

req, err := request.NewClientExecuteRequest(ctx, c.lggr, capReq, cfg.remoteCapabilityInfo, cfg.localDONInfo, c.dispatcher,
cfg.requestTimeout, cfg.transmissionConfig, c.capMethodName)
cfg.requestTimeout, cfg.transmissionConfig, c.capMethodName, cfg.signers)
if err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to create client request: %w", err)
}
Expand Down
18 changes: 9 additions & 9 deletions core/capabilities/remote/executable/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func testClient(t *testing.T, numWorkflowPeers int, workflowNodeResponseTimeout
for i := range numWorkflowPeers {
workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i])
caller := executable.NewClient(capInfo.ID, "", workflowPeerDispatcher, lggr)
err := caller.SetConfig(capInfo, workflowDonInfo, workflowNodeResponseTimeout, nil)
err := caller.SetConfig(capInfo, workflowDonInfo, workflowNodeResponseTimeout, nil, nil)
require.NoError(t, err)
servicetest.Run(t, caller)
broker.RegisterReceiverNode(workflowPeers[i], caller)
Expand Down Expand Up @@ -403,7 +403,7 @@ func TestClient_SetConfig(t *testing.T) {
DeltaStage: 10 * time.Millisecond,
}

err := client.SetConfig(validCapInfo, validDonInfo, validTimeout, transmissionConfig)
err := client.SetConfig(validCapInfo, validDonInfo, validTimeout, transmissionConfig, nil)
require.NoError(t, err)

// Verify config was set
Expand All @@ -418,7 +418,7 @@ func TestClient_SetConfig(t *testing.T) {
CapabilityType: commoncap.CapabilityTypeAction,
}

err := client.SetConfig(invalidCapInfo, validDonInfo, validTimeout, nil)
err := client.SetConfig(invalidCapInfo, validDonInfo, validTimeout, nil, nil)
require.Error(t, err)
assert.Contains(t, err.Error(), "capability info provided does not match the client's capabilityID")
assert.Contains(t, err.Error(), "different_capability@1.0.0 != test_capability@1.0.0")
Expand All @@ -431,15 +431,15 @@ func TestClient_SetConfig(t *testing.T) {
F: 0,
}

err := client.SetConfig(validCapInfo, invalidDonInfo, validTimeout, nil)
err := client.SetConfig(validCapInfo, invalidDonInfo, validTimeout, nil, nil)
require.Error(t, err)
assert.Contains(t, err.Error(), "empty localDonInfo provided")
})

t.Run("successful config update", func(t *testing.T) {
// Set initial config
initialTimeout := 10 * time.Second
err := client.SetConfig(validCapInfo, validDonInfo, initialTimeout, nil)
err := client.SetConfig(validCapInfo, validDonInfo, initialTimeout, nil, nil)
require.NoError(t, err)

// Replace with new config
Expand All @@ -450,7 +450,7 @@ func TestClient_SetConfig(t *testing.T) {
F: 1,
}

err = client.SetConfig(validCapInfo, newDonInfo, newTimeout, nil)
err = client.SetConfig(validCapInfo, newDonInfo, newTimeout, nil, nil)
require.NoError(t, err)

// Verify the config was completely replaced
Expand Down Expand Up @@ -494,7 +494,7 @@ func TestClient_SetConfig_StartClose(t *testing.T) {
})

t.Run("start succeeds after config set", func(t *testing.T) {
require.NoError(t, client.SetConfig(validCapInfo, validDonInfo, validTimeout, nil))
require.NoError(t, client.SetConfig(validCapInfo, validDonInfo, validTimeout, nil, nil))
require.NoError(t, client.Start(ctx))
require.NoError(t, client.Close())
})
Expand All @@ -504,12 +504,12 @@ func TestClient_SetConfig_StartClose(t *testing.T) {
freshClient := executable.NewClient(capabilityID, "execute", dispatcher, lggr)

// Set initial config and start
require.NoError(t, freshClient.SetConfig(validCapInfo, validDonInfo, validTimeout, nil))
require.NoError(t, freshClient.SetConfig(validCapInfo, validDonInfo, validTimeout, nil, nil))
require.NoError(t, freshClient.Start(ctx))

// Update config while running
validCapInfo.Description = "new description"
require.NoError(t, freshClient.SetConfig(validCapInfo, validDonInfo, validTimeout, nil))
require.NoError(t, freshClient.SetConfig(validCapInfo, validDonInfo, validTimeout, nil, nil))

// Verify config was updated
info, err := freshClient.Info(ctx)
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/remote/executable/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func testRemoteExecutableCapability(ctx context.Context, t *testing.T, underlyin
for i := range numWorkflowPeers {
workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i])
workflowNode := executable.NewClient(capInfo.ID, "", workflowPeerDispatcher, lggr)
err := workflowNode.SetConfig(capInfo, workflowDonInfo, workflowNodeTimeout, nil)
err := workflowNode.SetConfig(capInfo, workflowDonInfo, workflowNodeTimeout, nil, nil)
require.NoError(t, err)
servicetest.Run(t, workflowNode)
broker.RegisterReceiverNode(workflowPeers[i], workflowNode)
Expand Down
Loading
Loading