Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions apps/workspace-engine/oapi/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -2145,6 +2145,13 @@
"description": "Reference to the job agent",
"type": "string"
},
"resolvedMatrix": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
},
"workflowId": {
"type": "string"
}
Expand Down
2 changes: 1 addition & 1 deletion apps/workspace-engine/oapi/spec/schemas/workflows.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ local openapi = import '../lib/openapi.libsonnet';
workflowId: { type: 'string' },
ref: { type: 'string', description: 'Reference to the job agent' },
config: { type: 'object', additionalProperties: true, description: 'Configuration for the job agent' },

resolvedMatrix: { type: 'array', items: { type: 'object', additionalProperties: true } },
},
},
}
5 changes: 3 additions & 2 deletions apps/workspace-engine/pkg/oapi/oapi.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions apps/workspace-engine/pkg/workspace/jobagents/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func (r *Registry) Dispatch(ctx context.Context, job *oapi.Job) error {
if workflow, ok := r.store.Workflows.Get(workflowJob.WorkflowId); ok {
renderContext.Workflow = workflow
}
if job.JobAgentConfig["matrix"] != nil {
renderContext.Matrix = job.JobAgentConfig["matrix"].(map[string]interface{})
}
}

return dispatcher.Dispatch(ctx, renderContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type DispatchContext struct {
WorkflowJob *oapi.WorkflowJob `json:"workflowJob"`
Version *oapi.DeploymentVersion `json:"version"`
Inputs map[string]any `json:"inputs"`
Matrix map[string]interface{} `json:"matrix"`
}

func (r *DispatchContext) Map() map[string]any {
Expand Down
14 changes: 14 additions & 0 deletions apps/workspace-engine/pkg/workspace/store/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,17 @@ func (e *Deployments) ForJobAgent(ctx context.Context, jobAgent *oapi.JobAgent)
}
return deployments, nil
}

func (e *Deployments) ForSelector(ctx context.Context, sel *oapi.Selector) map[string]*oapi.Deployment {
deployments := make(map[string]*oapi.Deployment)
for _, deployment := range e.Items() {
matched, err := selector.Match(ctx, sel, deployment)
if err != nil {
continue
}
if matched {
deployments[deployment.Id] = deployment
}
}
return deployments
}
14 changes: 14 additions & 0 deletions apps/workspace-engine/pkg/workspace/store/environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,17 @@ func (e *Environments) ForResource(ctx context.Context, resource *oapi.Resource)
}
return environments, nil
}

func (e *Environments) ForSelector(ctx context.Context, sel *oapi.Selector) map[string]*oapi.Environment {
environments := make(map[string]*oapi.Environment)
for _, environment := range e.Items() {
matched, err := selector.Match(ctx, sel, environment)
if err != nil {
continue
}
if matched {
environments[environment.Id] = environment
}
}
return environments
}
75 changes: 56 additions & 19 deletions apps/workspace-engine/pkg/workspace/workflowmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@ func (m *Manager) CreateWorkflow(ctx context.Context, workflowTemplateId string,
return nil, fmt.Errorf("workflow template %s not found", workflowTemplateId)
}

inputsWithResolvedSelectors, err := m.getInputWithResolvedSelectors(ctx, workflowTemplate, inputs)
if err != nil {
return nil, fmt.Errorf("failed to get input with resolved selectors: %w", err)
}

workflow := &oapi.Workflow{
Id: uuid.New().String(),
WorkflowTemplateId: workflowTemplateId,
Inputs: maps.Clone(inputs),
Inputs: inputsWithResolvedSelectors,
}

workflowJobs := make([]*oapi.WorkflowJob, 0, len(workflowTemplate.Jobs))
Expand All @@ -45,6 +50,14 @@ func (m *Manager) CreateWorkflow(ctx context.Context, workflowTemplateId string,
Ref: jobTemplate.Ref,
Config: maps.Clone(jobTemplate.Config),
}

if jobTemplate.Matrix != nil {
resolvedMatrix, err := NewMatrixResolver(jobTemplate.Matrix, inputsWithResolvedSelectors).Resolve()
if err != nil {
return workflow, fmt.Errorf("failed to resolve matrix: %w", err)
}
wfJob.ResolvedMatrix = &resolvedMatrix
}
Comment on lines +54 to +60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid partial WorkflowJob persistence on matrix resolution failure.
If matrix resolution fails (Line 55–57), earlier jobs have already been Upserted (Line 61), but the workflow isn’t stored yet—this leaves orphaned jobs. Consider deferring WorkflowJobs.Upsert until all templates resolve, or rolling back on error.

💡 Suggested change to defer job persistence
	for idx, jobTemplate := range workflowTemplate.Jobs {
		wfJob := &oapi.WorkflowJob{
			Id:         uuid.New().String(),
			WorkflowId: workflow.Id,
			Index:      idx,
			Ref:        jobTemplate.Ref,
			Config:     maps.Clone(jobTemplate.Config),
		}

		if jobTemplate.Matrix != nil {
			resolvedMatrix, err := NewMatrixResolver(jobTemplate.Matrix, inputsWithResolvedSelectors).Resolve()
			if err != nil {
				return workflow, fmt.Errorf("failed to resolve matrix: %w", err)
			}
			wfJob.ResolvedMatrix = &resolvedMatrix
		}
-		m.store.WorkflowJobs.Upsert(ctx, wfJob)
		workflowJobs = append(workflowJobs, wfJob)
	}

+	for _, wfJob := range workflowJobs {
+		m.store.WorkflowJobs.Upsert(ctx, wfJob)
+	}
🤖 Prompt for AI Agents
In `@apps/workspace-engine/pkg/workspace/workflowmanager/manager.go` around lines
54 - 60, The current loop upserts WorkflowJobs early and then calls
NewMatrixResolver(...).Resolve(), which can fail and leave orphaned jobs; change
the logic in the manager that iterates job templates (the code that constructs
wfJob and calls WorkflowJobs.Upsert) to first resolve all job templates
including calling NewMatrixResolver(jobTemplate.Matrix,
inputsWithResolvedSelectors).Resolve() and setting WorkflowJob.ResolvedMatrix
(or collect resolved wfJob objects in-memory), and only after all templates
resolve successfully perform WorkflowJobs.Upsert for each wfJob; alternatively,
implement a rollback path that calls WorkflowJobs.Delete (or the repository's
delete/cleanup method) for any jobs already Upserted if Resolve() returns an
error—apply this change around the loop that creates wfJob and where
WorkflowJobs.Upsert is invoked.

m.store.WorkflowJobs.Upsert(ctx, wfJob)
workflowJobs = append(workflowJobs, wfJob)
}
Expand All @@ -66,36 +79,60 @@ func (m *Manager) dispatchJob(ctx context.Context, wfJob *oapi.WorkflowJob) erro
return fmt.Errorf("job agent %s not found", wfJob.Ref)
}

mergedConfig, err := mergeJobAgentConfig(jobAgent.Config, wfJob.Config)
if err != nil {
return fmt.Errorf("failed to merge job agent config: %w", err)
}
mergedConfig := mergeJobAgentConfig(jobAgent.Config, wfJob.Config)

if wfJob.ResolvedMatrix == nil {
job := &oapi.Job{
Id: uuid.New().String(),
WorkflowJobId: wfJob.Id,
JobAgentId: wfJob.Ref,
JobAgentConfig: mergedConfig,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
Metadata: make(map[string]string),
Status: oapi.JobStatusPending,
}

m.store.Jobs.Upsert(ctx, job)
if err := m.jobAgentRegistry.Dispatch(ctx, job); err != nil {
return fmt.Errorf("failed to dispatch job: %w", err)
}

job := &oapi.Job{
Id: uuid.New().String(),
WorkflowJobId: wfJob.Id,
JobAgentId: wfJob.Ref,
JobAgentConfig: mergedConfig,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
Metadata: make(map[string]string),
Status: oapi.JobStatusPending,
return nil
}

m.store.Jobs.Upsert(ctx, job)
if err := m.jobAgentRegistry.Dispatch(ctx, job); err != nil {
return fmt.Errorf("failed to dispatch job: %w", err)
for _, matrixItem := range *wfJob.ResolvedMatrix {
matrixMergedConfig := mergeJobAgentConfig(
mergedConfig,
oapi.JobAgentConfig{
"matrix": matrixItem,
},
)
job := &oapi.Job{
Id: uuid.New().String(),
WorkflowJobId: wfJob.Id,
JobAgentId: wfJob.Ref,
JobAgentConfig: matrixMergedConfig,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
Metadata: make(map[string]string),
Status: oapi.JobStatusPending,
}
m.store.Jobs.Upsert(ctx, job)
if err := m.jobAgentRegistry.Dispatch(ctx, job); err != nil {
return fmt.Errorf("failed to dispatch job: %w", err)
}
}

return nil
}

func mergeJobAgentConfig(configs ...oapi.JobAgentConfig) (oapi.JobAgentConfig, error) {
func mergeJobAgentConfig(configs ...oapi.JobAgentConfig) oapi.JobAgentConfig {
mergedConfig := make(map[string]any)
for _, config := range configs {
deepMerge(mergedConfig, config)
}
return mergedConfig, nil
return mergedConfig
}

func deepMerge(dst, src map[string]any) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package workflowmanager

import (
"fmt"
"workspace-engine/pkg/oapi"
)

type MatrixResolver struct {
matrix *oapi.WorkflowJobMatrix
inputs map[string]any
}

type matrixRow struct {
Key string
Values []map[string]interface{}
}

func NewMatrixResolver(matrix *oapi.WorkflowJobMatrix, inputs map[string]any) *MatrixResolver {
return &MatrixResolver{
matrix: matrix,
inputs: inputs,
}
}

func (r *MatrixResolver) getMatrixRows() ([]matrixRow, error) {
matrixRows := make([]matrixRow, 0, len(*r.matrix))
for key, value := range *r.matrix {
asArray, err := value.AsWorkflowJobMatrix0()
if err == nil {
matrixRows = append(matrixRows, matrixRow{
Key: key,
Values: asArray,
})
continue
}
asString, err := value.AsWorkflowJobMatrix1()
if err == nil {
arrayFromInput, ok := r.inputs[asString].([]map[string]interface{})
if !ok {
return nil, fmt.Errorf("input %s is not an array", asString)
}
matrixRows = append(matrixRows, matrixRow{
Key: key,
Values: arrayFromInput,
})
continue
}
}
return matrixRows, nil
Comment on lines +25 to +49
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle JSON-decoded input arrays when matrix references inputs.
Line 38 assumes []map[string]interface{}. Inputs coming from JSON are typically []interface{} (with map[string]interface{} elements), so this path will error even for valid arrays. Consider accepting []interface{} and converting items.

🔧 Suggested conversion to support JSON-decoded arrays
		asString, err := value.AsWorkflowJobMatrix1()
		if err == nil {
-			arrayFromInput, ok := r.inputs[asString].([]map[string]interface{})
-			if !ok {
-				return nil, fmt.Errorf("input %s is not an array", asString)
-			}
+			raw, ok := r.inputs[asString]
+			if !ok {
+				return nil, fmt.Errorf("input %s not found", asString)
+			}
+			var arrayFromInput []map[string]interface{}
+			switch v := raw.(type) {
+			case []map[string]interface{}:
+				arrayFromInput = v
+			case []interface{}:
+				arrayFromInput = make([]map[string]interface{}, 0, len(v))
+				for _, item := range v {
+					m, ok := item.(map[string]interface{})
+					if !ok {
+						return nil, fmt.Errorf("input %s contains non-object item", asString)
+					}
+					arrayFromInput = append(arrayFromInput, m)
+				}
+			default:
+				return nil, fmt.Errorf("input %s is not an array", asString)
+			}
			matrixRows = append(matrixRows, matrixRow{
				Key:    key,
				Values: arrayFromInput,
			})
			continue
		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (r *MatrixResolver) getMatrixRows() ([]matrixRow, error) {
matrixRows := make([]matrixRow, 0, len(*r.matrix))
for key, value := range *r.matrix {
asArray, err := value.AsWorkflowJobMatrix0()
if err == nil {
matrixRows = append(matrixRows, matrixRow{
Key: key,
Values: asArray,
})
continue
}
asString, err := value.AsWorkflowJobMatrix1()
if err == nil {
arrayFromInput, ok := r.inputs[asString].([]map[string]interface{})
if !ok {
return nil, fmt.Errorf("input %s is not an array", asString)
}
matrixRows = append(matrixRows, matrixRow{
Key: key,
Values: arrayFromInput,
})
continue
}
}
return matrixRows, nil
func (r *MatrixResolver) getMatrixRows() ([]matrixRow, error) {
matrixRows := make([]matrixRow, 0, len(*r.matrix))
for key, value := range *r.matrix {
asArray, err := value.AsWorkflowJobMatrix0()
if err == nil {
matrixRows = append(matrixRows, matrixRow{
Key: key,
Values: asArray,
})
continue
}
asString, err := value.AsWorkflowJobMatrix1()
if err == nil {
raw, ok := r.inputs[asString]
if !ok {
return nil, fmt.Errorf("input %s not found", asString)
}
var arrayFromInput []map[string]interface{}
switch v := raw.(type) {
case []map[string]interface{}:
arrayFromInput = v
case []interface{}:
arrayFromInput = make([]map[string]interface{}, 0, len(v))
for _, item := range v {
m, ok := item.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("input %s contains non-object item", asString)
}
arrayFromInput = append(arrayFromInput, m)
}
default:
return nil, fmt.Errorf("input %s is not an array", asString)
}
matrixRows = append(matrixRows, matrixRow{
Key: key,
Values: arrayFromInput,
})
continue
}
}
return matrixRows, nil
}
🤖 Prompt for AI Agents
In `@apps/workspace-engine/pkg/workspace/workflowmanager/matrix_resolver.go`
around lines 25 - 49, In getMatrixRows, the branch handling AsWorkflowJobMatrix1
assumes r.inputs[asString] is []map[string]interface{} which fails for
JSON-decoded arrays (they are []interface{}); update the logic in getMatrixRows
(and the matrixRow creation) to first attempt the existing
[]map[string]interface{} assertion, and if that fails check for []interface{}
and iterate over its elements converting each element to map[string]interface{}
(returning an error if any element is not a map); keep the same error message
("input %s is not an array") or refine to indicate element type issues and
ensure the converted slice is used as Values when appending the matrixRow.

}

func (r *MatrixResolver) computeCartesianProduct(matrixRows []matrixRow) []map[string]interface{} {
totalSize := 1
for _, row := range matrixRows {
totalSize *= len(row.Values)
}

result := make([]map[string]interface{}, 0, totalSize)
result = append(result, map[string]interface{}{})

for _, row := range matrixRows {
newResult := make([]map[string]interface{}, 0, totalSize)
for _, existing := range result {
for _, value := range row.Values {
combined := make(map[string]interface{}, len(existing)+len(value))
for k, v := range existing {
combined[k] = v
}
for k, v := range value {
combined[k] = v
}
newResult = append(newResult, combined)
}
}
result = newResult
}

return result
}

func (r *MatrixResolver) Resolve() ([]map[string]interface{}, error) {
matrixRows, err := r.getMatrixRows()
if err != nil {
return nil, fmt.Errorf("failed to get matrix rows: %w", err)
}

product := r.computeCartesianProduct(matrixRows)
return product, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package workflowmanager

import (
"context"
"encoding/json"
"fmt"
"maps"
"workspace-engine/pkg/oapi"
)

func structToMap(v any) (map[string]interface{}, error) {
data, err := json.Marshal(v)
if err != nil {
return nil, err
}
var result map[string]interface{}
if err := json.Unmarshal(data, &result); err != nil {
return nil, err
}
return result, nil
}

func (m *Manager) getResources(ctx context.Context, selector *oapi.Selector) []map[string]interface{} {
resources := m.store.Resources.ForSelector(ctx, selector)
resourcesSlice := make([]map[string]interface{}, 0, len(resources))
for _, resource := range resources {
entityMap, err := structToMap(resource)
if err != nil {
continue
}
resourcesSlice = append(resourcesSlice, entityMap)
}
return resourcesSlice
}

func (m *Manager) getEnvironments(ctx context.Context, selector *oapi.Selector) []map[string]interface{} {
environments := m.store.Environments.ForSelector(ctx, selector)
environmentsSlice := make([]map[string]interface{}, 0, len(environments))
for _, environment := range environments {
entityMap, err := structToMap(environment)
if err != nil {
continue
}
environmentsSlice = append(environmentsSlice, entityMap)
}
return environmentsSlice
}

func (m *Manager) getDeployments(ctx context.Context, selector *oapi.Selector) []map[string]interface{} {
deployments := m.store.Deployments.ForSelector(ctx, selector)
deploymentsSlice := make([]map[string]interface{}, 0, len(deployments))
for _, deployment := range deployments {
entityMap, err := structToMap(deployment)
if err != nil {
continue
}
deploymentsSlice = append(deploymentsSlice, entityMap)
}
return deploymentsSlice
}

func (m *Manager) getInputWithResolvedSelectors(ctx context.Context, workflowTemplate *oapi.WorkflowTemplate, inputs map[string]any) (map[string]any, error) {
inputsClone := maps.Clone(inputs)

for _, input := range workflowTemplate.Inputs {
asArray, err := input.AsWorkflowArrayInput()
if err != nil {
continue
}

asSelectorArray, err := asArray.AsWorkflowSelectorArrayInput()
if err != nil {
continue
}

sel := asSelectorArray.Selector.Default

selectorInputEntry, ok := inputsClone[asSelectorArray.Name]
if ok {
selectorInputEntryString := selectorInputEntry.(string)
sel = &oapi.Selector{}
if err := sel.FromCelSelector(oapi.CelSelector{Cel: selectorInputEntryString}); err != nil {
return nil, fmt.Errorf("failed to parse selector: %w", err)
}
}

if sel == nil {
return nil, fmt.Errorf("selector is nil")
}

var matchedEntities []map[string]interface{}

switch asSelectorArray.Selector.EntityType {
case oapi.WorkflowSelectorArrayInputSelectorEntityTypeResource:
matchedEntities = m.getResources(ctx, sel)
case oapi.WorkflowSelectorArrayInputSelectorEntityTypeEnvironment:
matchedEntities = m.getEnvironments(ctx, sel)
case oapi.WorkflowSelectorArrayInputSelectorEntityTypeDeployment:
matchedEntities = m.getDeployments(ctx, sel)
}

inputsClone[asSelectorArray.Name] = matchedEntities
}

return inputsClone, nil
Comment on lines +62 to +105
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Guard selector input type to avoid runtime panic.
Pipeline failure shows a panic at Line 80: interface {} is float64, not string. The direct selectorInputEntry.(string) assertion will panic for non-string inputs (e.g., JSON numbers). Use a safe type assertion and return a validation error instead.

🛠️ Suggested fix for safe selector parsing
	selectorInputEntry, ok := inputsClone[asSelectorArray.Name]
	if ok {
-		selectorInputEntryString := selectorInputEntry.(string)
+		selectorInputEntryString, ok := selectorInputEntry.(string)
+		if !ok {
+			return nil, fmt.Errorf("selector input %s must be a string", asSelectorArray.Name)
+		}
		sel = &oapi.Selector{}
		if err := sel.FromCelSelector(oapi.CelSelector{Cel: selectorInputEntryString}); err != nil {
			return nil, fmt.Errorf("failed to parse selector: %w", err)
		}
	}
🧰 Tools
🪛 GitHub Actions: Apps / Workspace Engine

[error] 80-80: Runtime panic: interface conversion: interface {} is float64, not string at resolve_selectors.go:80

🤖 Prompt for AI Agents
In `@apps/workspace-engine/pkg/workspace/workflowmanager/resolve_selectors.go`
around lines 62 - 105, The code in getInputWithResolvedSelectors currently
unsafely asserts selectorInputEntry.(string) which panics for non-string types;
change the handling of selectorInputEntry (from asSelectorArray.Name) to perform
a safe type check (e.g., type switch or ok, str := selectorInputEntry.(string))
and if the value is not a string return a clear validation error rather than
panicking, then pass the validated string into sel.FromCelSelector (or handle
the absence/invalid type by falling back to the default selector) so sel is only
constructed when a valid string is provided.

}
Loading