-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwebhook-pipeline.yaml
More file actions
138 lines (119 loc) · 2.98 KB
/
webhook-pipeline.yaml
File metadata and controls
138 lines (119 loc) · 2.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
name: webhook-pipeline-demo
version: "1.0"
description: >
Demonstrates the composable pipeline architecture.
A webhook endpoint receives JSON, validates it, transforms it, logs it,
and publishes an event. No monolithic handler needed.
requires:
plugins:
- name: messaging
- name: pipeline-steps
- name: workflow-plugin-http
modules:
- name: webhook-server
type: http.server
config:
address: ":8085"
- name: webhook-router
type: http.router
dependsOn:
- webhook-server
- name: event-broker
type: messaging.broker
workflows:
http:
server: webhook-server
router: webhook-router
routes: []
pipelines:
# A simple webhook-to-event pipeline
stripe-payment:
trigger:
type: http
config:
path: /webhooks/stripe
method: POST
steps:
- name: validate-payload
type: step.validate
config:
strategy: required_fields
required_fields:
- type
- data
- name: extract-event
type: step.set
config:
values:
event_type: "{{ .type }}"
payload_id: "{{ .data.id }}"
- name: log-received
type: step.log
config:
level: info
message: "Received webhook: {{ .event_type }} for {{ .payload_id }}"
- name: route-by-type
type: step.conditional
config:
field: event_type
routes:
"payment_intent.succeeded": process-success
"payment_intent.failed": process-failure
default: log-unknown
- name: process-success
type: step.set
config:
values:
status: succeeded
processed: "true"
- name: process-failure
type: step.set
config:
values:
status: failed
processed: "true"
- name: log-unknown
type: step.log
config:
level: warn
message: "Unknown event type: {{ .event_type }}"
on_error: stop
timeout: 30s
# A data enrichment pipeline
order-enrichment:
trigger:
type: http
config:
path: /api/orders/enrich
method: POST
steps:
- name: validate-order
type: step.validate
config:
strategy: json_schema
schema:
type: object
required:
- customer_id
- items
- total
properties:
customer_id:
type: string
items:
type: array
total:
type: number
- name: normalize-fields
type: step.set
config:
values:
order_status: pending
created_at: "{{ .meta.started_at }}"
- name: log-order
type: step.log
config:
level: info
message: "Processing order for customer {{ .customer_id }}: ${{ .total }}"
on_error: stop
timeout: 60s