-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathkyc_pj.py
More file actions
50 lines (42 loc) · 1.78 KB
/
kyc_pj.py
File metadata and controls
50 lines (42 loc) · 1.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# FonteData - KYC completo Pessoa Juridica
# Custo estimado: ~R$ 15,09 por empresa (sem socios)
# pip install requests
import requests, sys
from concurrent.futures import ThreadPoolExecutor, as_completed
API_KEY = "fd_live_SUA_CHAVE"
BASE_URL = "https://app.fontedata.com/api/v1/consulta"
HEADERS = {"X-API-Key": API_KEY}
CHECKS = {
"cnpj_qsa": "receita-federal-pj-qsa",
"beneficiario": "beneficiario-final",
"ceis": "ceis-sancoes",
"cnep": "cnep-sancoes",
"trabalho_forcado": "trabalho-forcado",
"pgfn": "pgfn-devedores",
"cnd": "cnd-debitos",
"cndt": "tst-cndt",
"fgts": "fgts-regularidade",
"processos": "processos-completa",
"ofac": "ofac-sancoes",
}
def _check(name, endpoint, doc):
try:
r = requests.get(f"{BASE_URL}/{endpoint}/{doc}", headers=HEADERS, timeout=60)
return name, {"status": r.status_code, "data": r.json() if r.ok else None, "cost": r.headers.get("X-Request-Cost")}
except Exception as e:
return name, {"status": "error", "error": str(e)}
def kyc_pj(cnpj: str) -> dict:
cnpj = cnpj.replace(".", "").replace("/", "").replace("-", "")
results = {}
with ThreadPoolExecutor(max_workers=4) as ex:
futures = {ex.submit(_check, n, ep, cnpj): n for n, ep in CHECKS.items()}
for f in as_completed(futures):
name, result = f.result()
results[name] = result
icon = "OK" if result["status"] == 200 else "FALHOU"
print(f" [{icon}] {name}: custo=R${result.get('cost','N/A')}")
return results
if __name__ == "__main__":
cnpj = sys.argv[1] if len(sys.argv) > 1 else "00000000000191"
print(f"KYC PJ para CNPJ {cnpj}:\n")
kyc_pj(cnpj)