Skip to content

Commit e8d8bf3

Browse files
committed
feat: implement TransferResultsBuilder and comparison specs for transfer input validation
1 parent 1195f1a commit e8d8bf3

5 files changed

Lines changed: 770 additions & 332 deletions

File tree

transfers/transfer_results.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
from __future__ import annotations
2+
3+
import argparse
4+
from pathlib import Path
5+
6+
from transfers.transfer_results_builder import TransferResultsBuilder
7+
from transfers.transfer_results_specs import (
8+
TRANSFER_COMPARISON_SPECS,
9+
TransferComparisonSpec,
10+
)
11+
from transfers.transfer_results_types import * # noqa: F401,F403
12+
13+
14+
__all__ = [
15+
"TransferResultsBuilder",
16+
"TransferComparisonSpec",
17+
"TRANSFER_COMPARISON_SPECS",
18+
]
19+
20+
21+
def _parse_args() -> argparse.Namespace:
22+
parser = argparse.ArgumentParser(
23+
description="Compare each transfer input CSV against destination Postgres rows."
24+
)
25+
parser.add_argument(
26+
"--summary-path",
27+
type=Path,
28+
default=Path("transfers") / "metrics" / "transfer_results_summary.md",
29+
help="Output path for markdown summary table.",
30+
)
31+
parser.add_argument(
32+
"--sample-limit",
33+
type=int,
34+
default=25,
35+
help="Max missing/extra key samples stored per transfer.",
36+
)
37+
return parser.parse_args()
38+
39+
40+
def main() -> None:
41+
args = _parse_args()
42+
builder = TransferResultsBuilder(sample_limit=args.sample_limit)
43+
results = builder.build()
44+
args.summary_path.parent.mkdir(parents=True, exist_ok=True)
45+
TransferResultsBuilder.write_summary(args.summary_path, results)
46+
print(f"Wrote comparison summary: {args.summary_path}")
47+
print(f"Transfer comparisons: {len(results.results)}")
48+
49+
50+
if __name__ == "__main__":
51+
main()
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
from __future__ import annotations
2+
3+
from pathlib import Path
4+
from typing import Any
5+
6+
import pandas as pd
7+
from sqlalchemy import select, func
8+
9+
from db.engine import session_ctx
10+
from transfers.transfer_results_specs import (
11+
TRANSFER_COMPARISON_SPECS,
12+
TransferComparisonSpec,
13+
)
14+
from transfers.transfer_results_types import (
15+
TransferComparisonResults,
16+
TransferResult,
17+
)
18+
from transfers.util import read_csv
19+
20+
21+
def _normalize_key(value: Any) -> str | None:
22+
if value is None:
23+
return None
24+
try:
25+
if pd.isna(value):
26+
return None
27+
except TypeError:
28+
pass
29+
s = str(value).strip()
30+
if not s:
31+
return None
32+
return s.lower()
33+
34+
35+
def _source_keys(df: pd.DataFrame, key_col: str) -> set[str]:
36+
if key_col not in df.columns:
37+
return set()
38+
return {
39+
key
40+
for key in (_normalize_key(v) for v in df[key_col].tolist())
41+
if key is not None
42+
}
43+
44+
45+
def _normalized_series(df: pd.DataFrame, key_col: str) -> pd.Series:
46+
if key_col not in df.columns:
47+
return pd.Series([], dtype=object)
48+
s = df[key_col].map(_normalize_key).dropna()
49+
if s.empty:
50+
return pd.Series([], dtype=object)
51+
return s.astype(str)
52+
53+
54+
class TransferResultsBuilder:
55+
"""Compare transfer input CSV keys to destination database keys per transfer."""
56+
57+
def __init__(self, sample_limit: int = 25):
58+
self.sample_limit = sample_limit
59+
60+
def build(self) -> TransferComparisonResults:
61+
results: dict[str, TransferResult] = {}
62+
for spec in TRANSFER_COMPARISON_SPECS:
63+
results[spec.transfer_name] = self._build_one(spec)
64+
return TransferComparisonResults(
65+
generated_at=pd.Timestamp.utcnow().isoformat(),
66+
results=results,
67+
)
68+
69+
def _build_one(self, spec: TransferComparisonSpec) -> TransferResult:
70+
source_df = read_csv(spec.source_csv)
71+
if spec.source_filter:
72+
source_df = spec.source_filter(source_df)
73+
source_series = _normalized_series(source_df, spec.source_key_column)
74+
source_keys = set(source_series.unique().tolist())
75+
source_keyed_row_count = int(source_series.shape[0])
76+
source_duplicate_key_row_count = source_keyed_row_count - len(source_keys)
77+
agreed_transfer_row_count = int(len(source_df))
78+
if spec.agreed_row_counter is not None:
79+
try:
80+
agreed_transfer_row_count = int(spec.agreed_row_counter())
81+
except Exception:
82+
agreed_transfer_row_count = int(len(source_df))
83+
84+
model = spec.destination_model
85+
key_col = getattr(model, spec.destination_key_column)
86+
with session_ctx() as session:
87+
key_sql = select(key_col).where(key_col.is_not(None))
88+
count_sql = select(func.count()).select_from(model)
89+
90+
if spec.destination_where:
91+
where_clause = spec.destination_where(model)
92+
key_sql = key_sql.where(where_clause)
93+
count_sql = count_sql.where(where_clause)
94+
95+
raw_dest_keys = session.execute(key_sql).scalars().all()
96+
destination_row_count = int(session.execute(count_sql).scalar_one())
97+
98+
destination_series = pd.Series(
99+
[_normalize_key(v) for v in raw_dest_keys], dtype=object
100+
).dropna()
101+
if destination_series.empty:
102+
destination_series = pd.Series([], dtype=object)
103+
else:
104+
destination_series = destination_series.astype(str)
105+
106+
destination_keys = set(destination_series.unique().tolist())
107+
destination_keyed_row_count = int(destination_series.shape[0])
108+
destination_duplicate_key_row_count = destination_keyed_row_count - len(
109+
destination_keys
110+
)
111+
112+
missing = sorted(source_keys - destination_keys)
113+
extra = sorted(destination_keys - source_keys)
114+
115+
return spec.result_cls(
116+
transfer_name=spec.transfer_name,
117+
source_csv=spec.source_csv,
118+
source_key_column=spec.source_key_column,
119+
destination_model=model.__name__,
120+
destination_key_column=spec.destination_key_column,
121+
source_row_count=len(source_df),
122+
agreed_transfer_row_count=agreed_transfer_row_count,
123+
source_keyed_row_count=source_keyed_row_count,
124+
source_key_count=len(source_keys),
125+
source_duplicate_key_row_count=source_duplicate_key_row_count,
126+
destination_row_count=destination_row_count,
127+
destination_keyed_row_count=destination_keyed_row_count,
128+
destination_key_count=len(destination_keys),
129+
destination_duplicate_key_row_count=destination_duplicate_key_row_count,
130+
matched_key_count=len(source_keys & destination_keys),
131+
missing_in_destination_count=len(missing),
132+
extra_in_destination_count=len(extra),
133+
missing_in_destination_sample=missing[: self.sample_limit],
134+
extra_in_destination_sample=extra[: self.sample_limit],
135+
)
136+
137+
@staticmethod
138+
def write_summary(path: Path, comparison: TransferComparisonResults) -> None:
139+
lines = [
140+
f"generated_at={comparison.generated_at}",
141+
"",
142+
"| Transfer | Source CSV | Source Rows | Agreed Rows | Dest Model | Dest Rows | Missing Agreed | Matched | Missing | Extra |",
143+
"|---|---|---:|---:|---|---:|---:|---:|---:|---:|",
144+
]
145+
for name in sorted(comparison.results.keys()):
146+
r = comparison.results[name]
147+
missing_agreed = r.agreed_transfer_row_count - r.destination_row_count
148+
lines.append(
149+
f"| {name} | {r.source_csv} | {r.source_row_count} | {r.agreed_transfer_row_count} | "
150+
f"{r.destination_model} | {r.destination_row_count} | {missing_agreed} | "
151+
f"{r.matched_key_count} | {r.missing_in_destination_count} | {r.extra_in_destination_count} |"
152+
)
153+
path.write_text("\n".join(lines) + "\n")

0 commit comments

Comments
 (0)