-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparallel-fan-out-config.yaml
More file actions
132 lines (119 loc) · 3.6 KB
/
parallel-fan-out-config.yaml
File metadata and controls
132 lines (119 loc) · 3.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
app:
name: parallel-demo
version: "1.0"
modules:
- name: server
type: http.server
config:
address: ":8080"
- name: router
type: http.router
config:
module: server
- name: api-handler
type: http.handler
config:
contentType: application/json
workflows:
http:
routes:
# Fan-out: parallel API aggregation
- path: /aggregate/{id}
method: GET
handler: api-handler
pipeline:
steps:
- type: step.request_parse
name: parse
config:
path_params: [id]
- type: step.parallel
name: fetch-all
config:
error_strategy: collect_errors
steps:
- name: profile
type: step.set
values:
name: "User {{ .path_params.id }}"
email: "user@example.com"
- name: orders
type: step.set
values:
items: "3"
total: "150.00"
- name: preferences
type: step.set
values:
theme: dark
language: en
- type: step.json_response
name: respond
config:
status_code: 200
body: '{{ json .steps.fetch-all.results }}'
# Fan-out: concurrent foreach
- path: /batch
method: POST
handler: api-handler
pipeline:
steps:
- type: step.request_parse
name: parse
config:
parse_body: true
- type: step.foreach
name: process-items
config:
collection: body.items
item_var: item
concurrency: 5
error_strategy: collect_errors
step:
name: process
type: step.set
values:
processed: "true"
id: "{{ .item.id }}"
- type: step.set
name: summary
config:
values:
total: '{{ .steps.process-items.count }}'
processed: '{{ json (pluck .steps.process-items.results "id") }}'
- type: step.json_response
name: respond
config:
status_code: 200
# Map/reduce with template functions
- path: /stats
method: GET
handler: api-handler
pipeline:
steps:
- type: step.set
name: data
config:
values:
sales:
- region: east
amount: 100
- region: west
amount: 200
- region: east
amount: 150
- region: west
amount: 50
- type: step.set
name: stats
config:
values:
total: '{{ sum .steps.data.sales "amount" }}'
max_sale: '{{ max .steps.data.sales "amount" }}'
min_sale: '{{ min .steps.data.sales "amount" }}'
regions: '{{ json (unique (pluck .steps.data.sales "region")) }}'
by_region: '{{ json (groupBy .steps.data.sales "region") }}'
- type: step.json_response
name: respond
config:
status_code: 200