diff --git a/apps/workspace-engine/oapi/openapi.json b/apps/workspace-engine/oapi/openapi.json index 7a541edfc..ff5befd41 100644 --- a/apps/workspace-engine/oapi/openapi.json +++ b/apps/workspace-engine/oapi/openapi.json @@ -2145,6 +2145,13 @@ "description": "Reference to the job agent", "type": "string" }, + "resolvedMatrix": { + "items": { + "additionalProperties": true, + "type": "object" + }, + "type": "array" + }, "workflowId": { "type": "string" } diff --git a/apps/workspace-engine/oapi/spec/schemas/workflows.jsonnet b/apps/workspace-engine/oapi/spec/schemas/workflows.jsonnet index c3ae6c018..dc6d16949 100644 --- a/apps/workspace-engine/oapi/spec/schemas/workflows.jsonnet +++ b/apps/workspace-engine/oapi/spec/schemas/workflows.jsonnet @@ -144,7 +144,7 @@ local openapi = import '../lib/openapi.libsonnet'; workflowId: { type: 'string' }, ref: { type: 'string', description: 'Reference to the job agent' }, config: { type: 'object', additionalProperties: true, description: 'Configuration for the job agent' }, - + resolvedMatrix: { type: 'array', items: { type: 'object', additionalProperties: true } }, }, }, } diff --git a/apps/workspace-engine/pkg/oapi/oapi.gen.go b/apps/workspace-engine/pkg/oapi/oapi.gen.go index 9e8b8551c..5baab073a 100644 --- a/apps/workspace-engine/pkg/oapi/oapi.gen.go +++ b/apps/workspace-engine/pkg/oapi/oapi.gen.go @@ -1087,8 +1087,9 @@ type WorkflowJob struct { Index int `json:"index"` // Ref Reference to the job agent - Ref string `json:"ref"` - WorkflowId string `json:"workflowId"` + Ref string `json:"ref"` + ResolvedMatrix *[]map[string]interface{} `json:"resolvedMatrix,omitempty"` + WorkflowId string `json:"workflowId"` } // WorkflowJobAgentConfig defines model for WorkflowJobAgentConfig. diff --git a/apps/workspace-engine/pkg/workspace/jobagents/registry.go b/apps/workspace-engine/pkg/workspace/jobagents/registry.go index 7a1402899..e513e5c02 100644 --- a/apps/workspace-engine/pkg/workspace/jobagents/registry.go +++ b/apps/workspace-engine/pkg/workspace/jobagents/registry.go @@ -84,6 +84,9 @@ func (r *Registry) Dispatch(ctx context.Context, job *oapi.Job) error { if workflow, ok := r.store.Workflows.Get(workflowJob.WorkflowId); ok { renderContext.Workflow = workflow } + if job.JobAgentConfig["matrix"] != nil { + renderContext.Matrix = job.JobAgentConfig["matrix"].(map[string]interface{}) + } } return dispatcher.Dispatch(ctx, renderContext) diff --git a/apps/workspace-engine/pkg/workspace/jobagents/types/types.go b/apps/workspace-engine/pkg/workspace/jobagents/types/types.go index 687455d0a..62fce21e9 100644 --- a/apps/workspace-engine/pkg/workspace/jobagents/types/types.go +++ b/apps/workspace-engine/pkg/workspace/jobagents/types/types.go @@ -29,6 +29,7 @@ type DispatchContext struct { WorkflowJob *oapi.WorkflowJob `json:"workflowJob"` Version *oapi.DeploymentVersion `json:"version"` Inputs map[string]any `json:"inputs"` + Matrix map[string]interface{} `json:"matrix"` } func (r *DispatchContext) Map() map[string]any { diff --git a/apps/workspace-engine/pkg/workspace/store/deployments.go b/apps/workspace-engine/pkg/workspace/store/deployments.go index 457c5231c..8e417d1dd 100644 --- a/apps/workspace-engine/pkg/workspace/store/deployments.go +++ b/apps/workspace-engine/pkg/workspace/store/deployments.go @@ -111,3 +111,17 @@ func (e *Deployments) ForJobAgent(ctx context.Context, jobAgent *oapi.JobAgent) } return deployments, nil } + +func (e *Deployments) ForSelector(ctx context.Context, sel *oapi.Selector) map[string]*oapi.Deployment { + deployments := make(map[string]*oapi.Deployment) + for _, deployment := range e.Items() { + matched, err := selector.Match(ctx, sel, deployment) + if err != nil { + continue + } + if matched { + deployments[deployment.Id] = deployment + } + } + return deployments +} diff --git a/apps/workspace-engine/pkg/workspace/store/environments.go b/apps/workspace-engine/pkg/workspace/store/environments.go index ecdf8075f..3a6558375 100644 --- a/apps/workspace-engine/pkg/workspace/store/environments.go +++ b/apps/workspace-engine/pkg/workspace/store/environments.go @@ -82,3 +82,17 @@ func (e *Environments) ForResource(ctx context.Context, resource *oapi.Resource) } return environments, nil } + +func (e *Environments) ForSelector(ctx context.Context, sel *oapi.Selector) map[string]*oapi.Environment { + environments := make(map[string]*oapi.Environment) + for _, environment := range e.Items() { + matched, err := selector.Match(ctx, sel, environment) + if err != nil { + continue + } + if matched { + environments[environment.Id] = environment + } + } + return environments +} diff --git a/apps/workspace-engine/pkg/workspace/workflowmanager/manager.go b/apps/workspace-engine/pkg/workspace/workflowmanager/manager.go index 46b15c08d..427d45e97 100644 --- a/apps/workspace-engine/pkg/workspace/workflowmanager/manager.go +++ b/apps/workspace-engine/pkg/workspace/workflowmanager/manager.go @@ -30,10 +30,15 @@ func (m *Manager) CreateWorkflow(ctx context.Context, workflowTemplateId string, return nil, fmt.Errorf("workflow template %s not found", workflowTemplateId) } + inputsWithResolvedSelectors, err := m.getInputWithResolvedSelectors(ctx, workflowTemplate, inputs) + if err != nil { + return nil, fmt.Errorf("failed to get input with resolved selectors: %w", err) + } + workflow := &oapi.Workflow{ Id: uuid.New().String(), WorkflowTemplateId: workflowTemplateId, - Inputs: maps.Clone(inputs), + Inputs: inputsWithResolvedSelectors, } workflowJobs := make([]*oapi.WorkflowJob, 0, len(workflowTemplate.Jobs)) @@ -45,6 +50,14 @@ func (m *Manager) CreateWorkflow(ctx context.Context, workflowTemplateId string, Ref: jobTemplate.Ref, Config: maps.Clone(jobTemplate.Config), } + + if jobTemplate.Matrix != nil { + resolvedMatrix, err := NewMatrixResolver(jobTemplate.Matrix, inputsWithResolvedSelectors).Resolve() + if err != nil { + return workflow, fmt.Errorf("failed to resolve matrix: %w", err) + } + wfJob.ResolvedMatrix = &resolvedMatrix + } m.store.WorkflowJobs.Upsert(ctx, wfJob) workflowJobs = append(workflowJobs, wfJob) } @@ -66,36 +79,60 @@ func (m *Manager) dispatchJob(ctx context.Context, wfJob *oapi.WorkflowJob) erro return fmt.Errorf("job agent %s not found", wfJob.Ref) } - mergedConfig, err := mergeJobAgentConfig(jobAgent.Config, wfJob.Config) - if err != nil { - return fmt.Errorf("failed to merge job agent config: %w", err) - } + mergedConfig := mergeJobAgentConfig(jobAgent.Config, wfJob.Config) + + if wfJob.ResolvedMatrix == nil { + job := &oapi.Job{ + Id: uuid.New().String(), + WorkflowJobId: wfJob.Id, + JobAgentId: wfJob.Ref, + JobAgentConfig: mergedConfig, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + Metadata: make(map[string]string), + Status: oapi.JobStatusPending, + } + + m.store.Jobs.Upsert(ctx, job) + if err := m.jobAgentRegistry.Dispatch(ctx, job); err != nil { + return fmt.Errorf("failed to dispatch job: %w", err) + } - job := &oapi.Job{ - Id: uuid.New().String(), - WorkflowJobId: wfJob.Id, - JobAgentId: wfJob.Ref, - JobAgentConfig: mergedConfig, - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - Metadata: make(map[string]string), - Status: oapi.JobStatusPending, + return nil } - m.store.Jobs.Upsert(ctx, job) - if err := m.jobAgentRegistry.Dispatch(ctx, job); err != nil { - return fmt.Errorf("failed to dispatch job: %w", err) + for _, matrixItem := range *wfJob.ResolvedMatrix { + matrixMergedConfig := mergeJobAgentConfig( + mergedConfig, + oapi.JobAgentConfig{ + "matrix": matrixItem, + }, + ) + job := &oapi.Job{ + Id: uuid.New().String(), + WorkflowJobId: wfJob.Id, + JobAgentId: wfJob.Ref, + JobAgentConfig: matrixMergedConfig, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + Metadata: make(map[string]string), + Status: oapi.JobStatusPending, + } + m.store.Jobs.Upsert(ctx, job) + if err := m.jobAgentRegistry.Dispatch(ctx, job); err != nil { + return fmt.Errorf("failed to dispatch job: %w", err) + } } return nil } -func mergeJobAgentConfig(configs ...oapi.JobAgentConfig) (oapi.JobAgentConfig, error) { +func mergeJobAgentConfig(configs ...oapi.JobAgentConfig) oapi.JobAgentConfig { mergedConfig := make(map[string]any) for _, config := range configs { deepMerge(mergedConfig, config) } - return mergedConfig, nil + return mergedConfig } func deepMerge(dst, src map[string]any) { diff --git a/apps/workspace-engine/pkg/workspace/workflowmanager/matrix_resolver.go b/apps/workspace-engine/pkg/workspace/workflowmanager/matrix_resolver.go new file mode 100644 index 000000000..3de146529 --- /dev/null +++ b/apps/workspace-engine/pkg/workspace/workflowmanager/matrix_resolver.go @@ -0,0 +1,89 @@ +package workflowmanager + +import ( + "fmt" + "workspace-engine/pkg/oapi" +) + +type MatrixResolver struct { + matrix *oapi.WorkflowJobMatrix + inputs map[string]any +} + +type matrixRow struct { + Key string + Values []map[string]interface{} +} + +func NewMatrixResolver(matrix *oapi.WorkflowJobMatrix, inputs map[string]any) *MatrixResolver { + return &MatrixResolver{ + matrix: matrix, + inputs: inputs, + } +} + +func (r *MatrixResolver) getMatrixRows() ([]matrixRow, error) { + matrixRows := make([]matrixRow, 0, len(*r.matrix)) + for key, value := range *r.matrix { + asArray, err := value.AsWorkflowJobMatrix0() + if err == nil { + matrixRows = append(matrixRows, matrixRow{ + Key: key, + Values: asArray, + }) + continue + } + asString, err := value.AsWorkflowJobMatrix1() + if err == nil { + arrayFromInput, ok := r.inputs[asString].([]map[string]interface{}) + if !ok { + return nil, fmt.Errorf("input %s is not an array", asString) + } + matrixRows = append(matrixRows, matrixRow{ + Key: key, + Values: arrayFromInput, + }) + continue + } + } + return matrixRows, nil +} + +func (r *MatrixResolver) computeCartesianProduct(matrixRows []matrixRow) []map[string]interface{} { + totalSize := 1 + for _, row := range matrixRows { + totalSize *= len(row.Values) + } + + result := make([]map[string]interface{}, 0, totalSize) + result = append(result, map[string]interface{}{}) + + for _, row := range matrixRows { + newResult := make([]map[string]interface{}, 0, totalSize) + for _, existing := range result { + for _, value := range row.Values { + combined := make(map[string]interface{}, len(existing)+len(value)) + for k, v := range existing { + combined[k] = v + } + for k, v := range value { + combined[k] = v + } + newResult = append(newResult, combined) + } + } + result = newResult + } + + return result +} + +func (r *MatrixResolver) Resolve() ([]map[string]interface{}, error) { + matrixRows, err := r.getMatrixRows() + if err != nil { + return nil, fmt.Errorf("failed to get matrix rows: %w", err) + } + + product := r.computeCartesianProduct(matrixRows) + return product, nil +} diff --git a/apps/workspace-engine/pkg/workspace/workflowmanager/resolve_selectors.go b/apps/workspace-engine/pkg/workspace/workflowmanager/resolve_selectors.go new file mode 100644 index 000000000..91f0e6146 --- /dev/null +++ b/apps/workspace-engine/pkg/workspace/workflowmanager/resolve_selectors.go @@ -0,0 +1,106 @@ +package workflowmanager + +import ( + "context" + "encoding/json" + "fmt" + "maps" + "workspace-engine/pkg/oapi" +) + +func structToMap(v any) (map[string]interface{}, error) { + data, err := json.Marshal(v) + if err != nil { + return nil, err + } + var result map[string]interface{} + if err := json.Unmarshal(data, &result); err != nil { + return nil, err + } + return result, nil +} + +func (m *Manager) getResources(ctx context.Context, selector *oapi.Selector) []map[string]interface{} { + resources := m.store.Resources.ForSelector(ctx, selector) + resourcesSlice := make([]map[string]interface{}, 0, len(resources)) + for _, resource := range resources { + entityMap, err := structToMap(resource) + if err != nil { + continue + } + resourcesSlice = append(resourcesSlice, entityMap) + } + return resourcesSlice +} + +func (m *Manager) getEnvironments(ctx context.Context, selector *oapi.Selector) []map[string]interface{} { + environments := m.store.Environments.ForSelector(ctx, selector) + environmentsSlice := make([]map[string]interface{}, 0, len(environments)) + for _, environment := range environments { + entityMap, err := structToMap(environment) + if err != nil { + continue + } + environmentsSlice = append(environmentsSlice, entityMap) + } + return environmentsSlice +} + +func (m *Manager) getDeployments(ctx context.Context, selector *oapi.Selector) []map[string]interface{} { + deployments := m.store.Deployments.ForSelector(ctx, selector) + deploymentsSlice := make([]map[string]interface{}, 0, len(deployments)) + for _, deployment := range deployments { + entityMap, err := structToMap(deployment) + if err != nil { + continue + } + deploymentsSlice = append(deploymentsSlice, entityMap) + } + return deploymentsSlice +} + +func (m *Manager) getInputWithResolvedSelectors(ctx context.Context, workflowTemplate *oapi.WorkflowTemplate, inputs map[string]any) (map[string]any, error) { + inputsClone := maps.Clone(inputs) + + for _, input := range workflowTemplate.Inputs { + asArray, err := input.AsWorkflowArrayInput() + if err != nil { + continue + } + + asSelectorArray, err := asArray.AsWorkflowSelectorArrayInput() + if err != nil { + continue + } + + sel := asSelectorArray.Selector.Default + + selectorInputEntry, ok := inputsClone[asSelectorArray.Name] + if ok { + selectorInputEntryString := selectorInputEntry.(string) + sel = &oapi.Selector{} + if err := sel.FromCelSelector(oapi.CelSelector{Cel: selectorInputEntryString}); err != nil { + return nil, fmt.Errorf("failed to parse selector: %w", err) + } + } + + if sel == nil { + return nil, fmt.Errorf("selector is nil") + } + + var matchedEntities []map[string]interface{} + + switch asSelectorArray.Selector.EntityType { + case oapi.WorkflowSelectorArrayInputSelectorEntityTypeResource: + matchedEntities = m.getResources(ctx, sel) + case oapi.WorkflowSelectorArrayInputSelectorEntityTypeEnvironment: + matchedEntities = m.getEnvironments(ctx, sel) + case oapi.WorkflowSelectorArrayInputSelectorEntityTypeDeployment: + matchedEntities = m.getDeployments(ctx, sel) + } + + inputsClone[asSelectorArray.Name] = matchedEntities + } + + return inputsClone, nil +}