-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdocument_level_fraud.py
More file actions
124 lines (103 loc) · 4.97 KB
/
document_level_fraud.py
File metadata and controls
124 lines (103 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""VynFi DataSynth 3.1.1 — document-level fraud injection.
Demonstrates the new `fraud.documentFraudRate` + `propagateToLines` /
`propagateToDocument` knobs. Lets you control whether fraud is expressed as
isolated line flips (useful for binary noise injection) or as full document-
level schemes (PO → GR → invoice → payment all marked fraudulent, matching
real investigative patterns).
The resulting `JournalEntryHeader.is_fraud_propagated` flag distinguishes
scheme-propagated fraud from direct line-level fraud for stratified ML
training. DS 3.1.1 now back-annotates the flag via DocumentRef on every
document-flow JE header, and the new `Jobs.fraud_split()` API aggregates
the counts server-side.
"""
import os
from collections import Counter
import vynfi
client = vynfi.VynFi(api_key=os.environ["VYNFI_API_KEY"], timeout=180.0)
config = {
"sector": "retail",
"country": "US",
"accountingFramework": "us_gaap",
"rows": 1000,
"companies": 3,
"periods": 2,
"periodLength": "monthly",
"processModels": ["o2c", "p2p"],
"exportFormat": "json",
"fraudPacks": ["revenue_fraud"],
"fraudRate": 0.03, # line-level
"fraud": {
"documentFraudRate": 0.05, # 5% of source documents fraud
"propagateToLines": True, # cascade to derived JEs
"propagateToDocument": True, # and back from line-injected fraud
},
}
print("Submitting 3.1 config with document-level fraud injection...")
job = client.jobs.generate_config(config=config)
done = client.jobs.wait(job.id, timeout=300)
print(f"Job: {done.id} status: {done.status}")
if done.status != "completed":
raise SystemExit(done.error_detail)
archive = client.jobs.download_archive(done.id)
print(f"{archive}\n")
# ── Server-side aggregation via the 3.1.1 fraud-split endpoint ──────────────
split = client.jobs.fraud_split(done.id)
print("Fraud origin split (from /v1/jobs/{id}/fraud-split):")
print(f" total entries : {split.total_entries}")
print(f" fraud entries : {split.fraud_entries}")
print(f" scheme-propagated : {split.scheme_propagated}")
print(f" direct-injection : {split.direct_injection}")
print(f" propagation rate : {split.propagation_rate:.1%}")
if split.by_fraud_type:
print(" by fraud_type:")
for ftype, detail in sorted(split.by_fraud_type.items(), key=lambda kv: -kv[1].total):
print(
f" {ftype:28s} total={detail.total:3d} "
f"scheme={detail.scheme_propagated:3d} direct={detail.direct_injection:3d}"
)
print()
# ── Stratify fraud into scheme-propagated vs directly-injected ──────────────
entries = archive.json("journal_entries.json")
fraud_entries = [e for e in entries if e.get("header", e).get("is_fraud")]
scheme_entries = [e for e in fraud_entries if e.get("header", e).get("is_fraud_propagated")]
direct_entries = [e for e in fraud_entries if not e.get("header", e).get("is_fraud_propagated")]
print(f"Total JE documents: {len(entries)}")
fraud_pct = 100 * len(fraud_entries) / max(len(entries), 1)
print(f" Fraud: {len(fraud_entries)} ({fraud_pct:.1f}%)")
print(f" Scheme-propagated: {len(scheme_entries)}")
print(f" Directly-injected: {len(direct_entries)}")
# Unique source documents for the scheme population
source_docs = Counter(
e.get("header", e).get("fraud_source_document_id")
for e in scheme_entries
if e.get("header", e).get("fraud_source_document_id")
)
print(f"\nDistinct source documents driving scheme-level fraud: {len(source_docs)}")
for src_doc, n in source_docs.most_common(5):
print(f" {src_doc:38s} → {n} derived fraudulent JEs")
# ── Check fraud on document flows ───────────────────────────────────────────
print("\nFraud flags on document flow records:")
for name, path in [
("PO", "document_flows/purchase_orders.json"),
("VI", "document_flows/vendor_invoices.json"),
("Pay", "document_flows/payments.json"),
]:
try:
docs = archive.json(path)
fraud_n = sum(1 for d in docs if d.get("header", d).get("is_fraud"))
print(f" {name}: {fraud_n}/{len(docs)} fraud ({100 * fraud_n / max(len(docs), 1):.1f}%)")
except KeyError:
pass
# ── Usage tip: stratified ML train/test split ────────────────────────────────
print("""
For ML training you can now cleanly partition:
- Scheme-propagated examples = coherent fraudulent workflows; teach the model
to detect cross-document patterns (ring, kickback, channel stuffing).
- Directly-injected examples = isolated noisy lines; teach robustness to
single-entry anomalies (clerical errors, one-off embezzlement).
```python
train_scheme = [e for e in entries if e["header"].get("is_fraud_propagated")]
train_direct = [e for e in entries if e["header"].get("is_fraud")
and not e["header"].get("is_fraud_propagated")]
```""")
archive.close()