-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexception_engine.py
More file actions
22 lines (22 loc) · 1.65 KB
/
exception_engine.py
File metadata and controls
22 lines (22 loc) · 1.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import numpy as np
RULES=[{"type":"late_shipment","threshold_days":2,"severity":"medium","auto_action":"notify_customer"},
{"type":"late_shipment","threshold_days":5,"severity":"critical","auto_action":"expedite_and_notify"},
{"type":"quantity_mismatch","threshold_pct":5,"severity":"medium","auto_action":"log_discrepancy"},
{"type":"quantity_mismatch","threshold_pct":15,"severity":"critical","auto_action":"reject_and_reorder"},
{"type":"quality_fail","threshold_ppm":500,"severity":"high","auto_action":"quarantine"},
{"type":"cost_overrun","threshold_pct":10,"severity":"medium","auto_action":"flag_for_review"}]
def process_exceptions(events):
actions=[]
for event in events:
matched_rules=[r for r in RULES if r["type"]==event["type"]]
for rule in sorted(matched_rules,key=lambda r:r.get("threshold_days",r.get("threshold_pct",r.get("threshold_ppm",0)))):
threshold_key=[k for k in rule if "threshold" in k][0]
value_key=threshold_key.replace("threshold_","")
if event.get(value_key,0)>=rule[threshold_key]:
actions.append({"event_id":event["id"],"type":event["type"],"severity":rule["severity"],
"action":rule["auto_action"],"value":event.get(value_key,0)})
return sorted(actions,key=lambda a:{"critical":0,"high":1,"medium":2}.get(a["severity"],3))
if __name__=="__main__":
events=[{"id":"E001","type":"late_shipment","days":6},{"id":"E002","type":"quantity_mismatch","pct":12},
{"id":"E003","type":"quality_fail","ppm":800},{"id":"E004","type":"cost_overrun","pct":15}]
for a in process_exceptions(events): print(a)