Skip to content

Commit 03604c7

Browse files
intel352claude
andcommitted
feat: unify pipeline execution paths — all triggers preserve StepOutputs via PipelineContextHolder
Adds PipelineContextHolder (module.PipelineContextHolder / module.PipelineContextKey) alongside the existing PipelineResultHolder. Pipeline.Run() now populates the holder when it is present in the context, so every caller of TriggerWorkflow — HTTP triggers, EventBus triggers, scheduler triggers, messaging triggers — gets full StepOutputs visibility without a separate code path. ExecutePipeline and ExecutePipelineContext are unified through TriggerWorkflow; both inject a PipelineContextHolder before calling TriggerWorkflow and read the result back, eliminating the bypass that previously discarded step output data. FireEvent in wftest now also injects a PipelineContextHolder so event-driven test assertions get StepResults populated, matching the behaviour of FireSchedule and ExecutePipeline. Three TDD tests assert the new behaviour across all paths. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent c0c709c commit 03604c7

5 files changed

Lines changed: 233 additions & 15 deletions

File tree

engine.go

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -694,15 +694,20 @@ func (e *StdEngine) TriggerWorkflow(ctx context.Context, workflowType string, ac
694694
//
695695
// If the pipeline uses step.pipeline_output, the explicitly marked output
696696
// is returned. Otherwise, the pipeline's merged Current state is returned.
697+
//
698+
// This goes through TriggerWorkflow so all trigger paths share the same
699+
// execution code, including StepOutputs preservation via PipelineContextHolder.
697700
func (e *StdEngine) ExecutePipeline(ctx context.Context, name string, data map[string]any) (map[string]any, error) {
698-
pipeline, ok := e.pipelineRegistry[name]
699-
if !ok {
700-
return nil, fmt.Errorf("pipeline %q not found", name)
701+
holder := &module.PipelineContextHolder{}
702+
ctx = context.WithValue(ctx, module.PipelineContextKey, holder)
703+
704+
if err := e.TriggerWorkflow(ctx, "pipeline:"+name, "", data); err != nil {
705+
return nil, err
701706
}
702707

703-
pc, err := pipeline.Execute(ctx, data)
704-
if err != nil {
705-
return nil, fmt.Errorf("pipeline %q: %w", name, err)
708+
pc := holder.Get()
709+
if pc == nil {
710+
return nil, fmt.Errorf("pipeline %q: no context returned", name)
706711
}
707712

708713
// Prefer explicit pipeline output if step.pipeline_output was used.
@@ -720,15 +725,20 @@ func (e *StdEngine) ExecutePipeline(ctx context.Context, name string, data map[s
720725
// ExecutePipelineContext runs a named pipeline and returns the full PipelineContext,
721726
// including StepOutputs for each step. This is intended for test harnesses that need
722727
// per-step output inspection. For production callers, use ExecutePipeline instead.
728+
//
729+
// This goes through TriggerWorkflow so all trigger paths share the same
730+
// execution code. The PipelineContextHolder captures the full context.
723731
func (e *StdEngine) ExecutePipelineContext(ctx context.Context, name string, data map[string]any) (*interfaces.PipelineContext, error) {
724-
pipeline, ok := e.pipelineRegistry[name]
725-
if !ok {
726-
return nil, fmt.Errorf("pipeline %q not found", name)
732+
holder := &module.PipelineContextHolder{}
733+
ctx = context.WithValue(ctx, module.PipelineContextKey, holder)
734+
735+
if err := e.TriggerWorkflow(ctx, "pipeline:"+name, "", data); err != nil {
736+
return nil, err
727737
}
728738

729-
pc, err := pipeline.Execute(ctx, data)
730-
if err != nil {
731-
return nil, fmt.Errorf("pipeline %q: %w", name, err)
739+
pc := holder.Get()
740+
if pc == nil {
741+
return nil, fmt.Errorf("pipeline %q: no context returned", name)
732742
}
733743

734744
return pc, nil

engine_pipeline_test.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,3 +736,156 @@ func TestExecutePipeline_UnknownPipeline(t *testing.T) {
736736
t.Errorf("expected error to mention pipeline name, got: %v", err)
737737
}
738738
}
739+
740+
// TestTriggerWorkflow_PreservesStepOutputs verifies that injecting a
741+
// PipelineContextHolder into the context before calling TriggerWorkflow causes
742+
// StepOutputs to be populated after the pipeline runs.
743+
func TestTriggerWorkflow_PreservesStepOutputs(t *testing.T) {
744+
engine, _ := setupPipelineEngine(t)
745+
746+
pipelineCfg := map[string]any{
747+
"step_output_pipeline": map[string]any{
748+
"steps": []any{
749+
map[string]any{
750+
"name": "set_value",
751+
"type": "step.set",
752+
"config": map[string]any{
753+
"values": map[string]any{
754+
"color": "blue",
755+
},
756+
},
757+
},
758+
},
759+
},
760+
}
761+
if err := engine.configurePipelines(pipelineCfg); err != nil {
762+
t.Fatalf("configurePipelines failed: %v", err)
763+
}
764+
765+
holder := &module.PipelineContextHolder{}
766+
ctx := context.WithValue(context.Background(), module.PipelineContextKey, holder)
767+
768+
if err := engine.TriggerWorkflow(ctx, "pipeline:step_output_pipeline", "", map[string]any{}); err != nil {
769+
t.Fatalf("TriggerWorkflow failed: %v", err)
770+
}
771+
772+
pc := holder.Get()
773+
if pc == nil {
774+
t.Fatal("expected PipelineContextHolder to be populated after TriggerWorkflow")
775+
}
776+
if pc.StepOutputs == nil {
777+
t.Fatal("expected StepOutputs to be non-nil")
778+
}
779+
stepOut, ok := pc.StepOutputs["set_value"]
780+
if !ok {
781+
t.Fatalf("expected StepOutputs to contain 'set_value', got keys: %v", stepKeys(pc.StepOutputs))
782+
}
783+
if stepOut["color"] != "blue" {
784+
t.Errorf("expected step output color=blue, got %v", stepOut["color"])
785+
}
786+
}
787+
788+
// TestExecutePipeline_UsesUnifiedPath verifies that ExecutePipeline and
789+
// TriggerWorkflow with a PipelineContextHolder produce identical results.
790+
func TestExecutePipeline_UsesUnifiedPath(t *testing.T) {
791+
engine, _ := setupPipelineEngine(t)
792+
793+
pipelineCfg := map[string]any{
794+
"unified_path_pipeline": map[string]any{
795+
"steps": []any{
796+
map[string]any{
797+
"name": "set_msg",
798+
"type": "step.set",
799+
"config": map[string]any{
800+
"values": map[string]any{
801+
"msg": "hello",
802+
},
803+
},
804+
},
805+
},
806+
},
807+
}
808+
if err := engine.configurePipelines(pipelineCfg); err != nil {
809+
t.Fatalf("configurePipelines failed: %v", err)
810+
}
811+
812+
// Path 1: ExecutePipeline
813+
result1, err := engine.ExecutePipeline(context.Background(), "unified_path_pipeline", map[string]any{})
814+
if err != nil {
815+
t.Fatalf("ExecutePipeline failed: %v", err)
816+
}
817+
818+
// Path 2: TriggerWorkflow with PipelineContextHolder
819+
holder := &module.PipelineContextHolder{}
820+
ctx := context.WithValue(context.Background(), module.PipelineContextKey, holder)
821+
if err := engine.TriggerWorkflow(ctx, "pipeline:unified_path_pipeline", "", map[string]any{}); err != nil {
822+
t.Fatalf("TriggerWorkflow failed: %v", err)
823+
}
824+
pc := holder.Get()
825+
if pc == nil {
826+
t.Fatal("holder should be populated")
827+
}
828+
829+
if result1["msg"] != pc.Current["msg"] {
830+
t.Errorf("ExecutePipeline and TriggerWorkflow produced different results: %v vs %v", result1["msg"], pc.Current["msg"])
831+
}
832+
if pc.StepOutputs["set_msg"]["msg"] != "hello" {
833+
t.Errorf("expected StepOutputs[set_msg][msg]=hello, got %v", pc.StepOutputs["set_msg"]["msg"])
834+
}
835+
}
836+
837+
// TestExecutePipelineContext_PreservesStepOutputs verifies that ExecutePipelineContext
838+
// returns a PipelineContext with StepOutputs populated for all executed steps.
839+
func TestExecutePipelineContext_PreservesStepOutputs(t *testing.T) {
840+
engine, _ := setupPipelineEngine(t)
841+
842+
pipelineCfg := map[string]any{
843+
"ctx_step_output_pipeline": map[string]any{
844+
"steps": []any{
845+
map[string]any{
846+
"name": "step_a",
847+
"type": "step.set",
848+
"config": map[string]any{
849+
"values": map[string]any{"a": "1"},
850+
},
851+
},
852+
map[string]any{
853+
"name": "step_b",
854+
"type": "step.set",
855+
"config": map[string]any{
856+
"values": map[string]any{"b": "2"},
857+
},
858+
},
859+
},
860+
},
861+
}
862+
if err := engine.configurePipelines(pipelineCfg); err != nil {
863+
t.Fatalf("configurePipelines failed: %v", err)
864+
}
865+
866+
pc, err := engine.ExecutePipelineContext(context.Background(), "ctx_step_output_pipeline", map[string]any{})
867+
if err != nil {
868+
t.Fatalf("ExecutePipelineContext failed: %v", err)
869+
}
870+
if pc == nil {
871+
t.Fatal("expected non-nil PipelineContext")
872+
}
873+
if len(pc.StepOutputs) != 2 {
874+
t.Errorf("expected 2 step outputs, got %d: %v", len(pc.StepOutputs), stepKeys(pc.StepOutputs))
875+
}
876+
if pc.StepOutputs["step_a"]["a"] != "1" {
877+
t.Errorf("expected step_a output a=1, got %v", pc.StepOutputs["step_a"]["a"])
878+
}
879+
if pc.StepOutputs["step_b"]["b"] != "2" {
880+
t.Errorf("expected step_b output b=2, got %v", pc.StepOutputs["step_b"]["b"])
881+
}
882+
}
883+
884+
// stepKeys returns the keys of a step output map for test failure messages.
885+
func stepKeys(m map[string]map[string]any) []string {
886+
keys := make([]string, 0, len(m))
887+
for k := range m {
888+
keys = append(keys, k)
889+
}
890+
return keys
891+
}

module/http_trigger.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"net/http"
1212
"strconv"
1313
"strings"
14+
"sync"
1415

1516
"github.com/GoCodeAlone/modular"
1617
"github.com/GoCodeAlone/workflow/interfaces"
@@ -61,6 +62,37 @@ func (h *PipelineResultHolder) Get() map[string]any {
6162
return h.result
6263
}
6364

65+
// pipelineContextKey is the unexported type for the PipelineContextHolder context key.
66+
type pipelineContextKey struct{}
67+
68+
// PipelineContextKey is the context key used to capture the full PipelineContext
69+
// (including StepOutputs) from TriggerWorkflow. Callers that need per-step output
70+
// visibility inject a *PipelineContextHolder into the context before calling
71+
// TriggerWorkflow; the pipeline handler populates it after execution.
72+
var PipelineContextKey = pipelineContextKey{}
73+
74+
// PipelineContextHolder is a thread-safe container used to pass the full
75+
// PipelineContext (including StepOutputs) back through the context from the
76+
// pipeline handler to any caller of TriggerWorkflow that needs per-step visibility.
77+
type PipelineContextHolder struct {
78+
mu sync.Mutex
79+
ctx *PipelineContext
80+
}
81+
82+
// Set stores the PipelineContext in the holder.
83+
func (h *PipelineContextHolder) Set(pc *PipelineContext) {
84+
h.mu.Lock()
85+
defer h.mu.Unlock()
86+
h.ctx = pc
87+
}
88+
89+
// Get returns the stored PipelineContext, or nil if not set.
90+
func (h *PipelineContextHolder) Get() *PipelineContext {
91+
h.mu.Lock()
92+
defer h.mu.Unlock()
93+
return h.ctx
94+
}
95+
6496
// coercePipelineStatus coerces common numeric/string types into an HTTP status
6597
// code. Pipeline steps may emit response_status as int, int64, float64 (common
6698
// after generic JSON decoding), json.Number, or a numeric string.

module/pipeline_executor.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,10 +359,18 @@ func (p *Pipeline) SetEventRecorder(recorder interfaces.EventRecorder) {
359359
// Run executes the pipeline and returns the merged result data map.
360360
// It implements interfaces.PipelineRunner by wrapping Execute and
361361
// returning PipelineContext.Current so callers need not import PipelineContext.
362+
// If a PipelineContextHolder is stored in ctx under PipelineContextKey, it is
363+
// populated with the full PipelineContext (including StepOutputs) so that callers
364+
// can inspect per-step results without a separate code path.
362365
func (p *Pipeline) Run(ctx context.Context, data map[string]any) (map[string]any, error) {
363366
pc, err := p.Execute(ctx, data)
364367
if err != nil {
365368
return nil, err
366369
}
370+
// Populate the PipelineContextHolder if present in context, so all callers
371+
// of TriggerWorkflow can receive full step output visibility.
372+
if holder, ok := ctx.Value(PipelineContextKey).(*PipelineContextHolder); ok && holder != nil {
373+
holder.Set(pc)
374+
}
367375
return pc.Current, nil
368376
}

wftest/events.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ func (h *Harness) FireEvent(topic string, data map[string]any) *Result {
2525
data = map[string]any{}
2626
}
2727

28-
holder := &module.PipelineResultHolder{}
29-
ctx := context.WithValue(h.t.Context(), module.PipelineResultContextKey, holder)
28+
resultHolder := &module.PipelineResultHolder{}
29+
ctxHolder := &module.PipelineContextHolder{}
30+
ctx := context.WithValue(h.t.Context(), module.PipelineResultContextKey, resultHolder)
31+
ctx = context.WithValue(ctx, module.PipelineContextKey, ctxHolder)
3032

3133
start := time.Now()
3234
for _, trigger := range h.engine.Triggers() {
@@ -39,8 +41,21 @@ func (h *Harness) FireEvent(topic string, data map[string]any) *Result {
3941
}
4042
}
4143

44+
// Prefer the full PipelineContext (with StepOutputs) when available.
45+
if pc := ctxHolder.Get(); pc != nil {
46+
output := pc.Current
47+
if pipeOut, ok := pc.Metadata["_pipeline_output"].(map[string]any); ok {
48+
output = pipeOut
49+
}
50+
return &Result{
51+
Output: output,
52+
StepResults: pc.StepOutputs,
53+
Duration: time.Since(start),
54+
}
55+
}
56+
4257
return &Result{
43-
Output: holder.Get(),
58+
Output: resultHolder.Get(),
4459
Duration: time.Since(start),
4560
}
4661
}

0 commit comments

Comments
 (0)