-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathecmr_workflow.py
More file actions
102 lines (83 loc) · 3.31 KB
/
ecmr_workflow.py
File metadata and controls
102 lines (83 loc) · 3.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
"""DeCA + eCMR Combined Workflow"""
import requests
from typing import Dict, Any, List
class DeCAWorkflow:
"""Complete workflow for DeCA + eCMR"""
def __init__(self, api_key: str, base_url: str = "https://ecmr.api.cargoffer.com"):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Apikey {api_key}",
"Content-Type": "application/json"
})
def create_shipment(
self,
sender: Dict[str, str],
receiver: Dict[str, str],
vehicle: Dict[str, str],
driver: Dict[str, str],
goods: Dict[str, Any]
) -> Dict[str, Any]:
"""Create complete shipment with DeCA and eCMR"""
payload = {
"sender": sender,
"receiver": receiver,
"from": sender,
"to": receiver,
"vehicle": {
"plate": vehicle["plate"],
"type": vehicle.get("type", "truck")
},
"driver": {
"name": driver["name"],
"license": driver["license"]
},
"goods": goods,
"options": {
"generate_deca": True,
"generate_qr": True
}
}
response = self.session.post(f"{self.base_url}/ecmr", json=payload)
if response.status_code == 200:
data = response.json()
return {
"service_code": data["data"]["service_code"],
"deca_code": data["data"].get("deca_code"),
"qr_code": data["data"].get("qr_code"),
"status": data["data"]["status"]
}
else:
raise Exception(f"Failed: {response.text}")
def execute_pickup(self, service_code: str, signature: str) -> Dict[str, Any]:
"""Execute pickup - sign"""
response = self.session.put(
f"{self.base_url}/ecmr/sign/pickup/{service_code}",
json={"signature": signature}
)
return {"signed": response.status_code == 200}
def execute_delivery(self, service_code: str, signature: str) -> Dict[str, Any]:
"""Execute delivery - sign"""
response = self.session.put(
f"{self.base_url}/ecmr/sign/delivery/{service_code}",
json={"signature": signature}
)
return {"signed": response.status_code == 200}
def close_shipment(self, service_code: str) -> Dict[str, Any]:
"""Legally close the eCMR (lock)"""
response = self.session.put(
f"{self.base_url}/ecmr/lock/{service_code}"
)
return {"locked": response.status_code == 200}
if __name__ == "__main__":
workflow = DeCAWorkflow("your-api-key")
result = workflow.create_shipment(
sender={"company_name": "Sender SL", "cif": "B12345678"},
receiver={"company_name": "Receiver SL", "cif": "B87654321"},
vehicle={"plate": "1234-ABC", "type": "truck"},
driver={"name": "John Doe", "license": "CD-12345"},
goods={"description": "Goods", "packages": 2, "weight": 500}
)
print(f"Service Code: {result['service_code']}")
print(f"DeCA Code: {result['deca_code']}")