-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfind_datasource_tables.py
More file actions
143 lines (128 loc) · 5.19 KB
/
find_datasource_tables.py
File metadata and controls
143 lines (128 loc) · 5.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/env python3
import sys, csv
from pathlib import Path
import pyodbc
# ---- CONFIG: edit these for your environment ----
SERVER = "SQL Server"
DATABASE = "NM_Aquifer_Testing_DB"
ODBC_DRIVER = "{ODBC Driver 17 for SQL Server}" # or "{ODBC Driver 18 for SQL Server}"
TRUSTED_CONNECTION = True # set False and add UID/PWD if needed
UID = ""
PWD = ""
SCHEMA = "dbo"
# -------------------------------------------------
def qident(name: str) -> str:
"""Bracket-quote an identifier and escape closing bracket."""
return f"[{name.replace(']', ']]')}]"
def get_conn():
parts = [f"Driver={ODBC_DRIVER}", f"Server={SERVER}", f"Database={DATABASE}"]
if TRUSTED_CONNECTION:
parts.append("Trusted_Connection=yes")
else:
parts += [f"UID={UID}", f"PWD={PWD}"]
return pyodbc.connect(";".join(parts))
def read_csv_values(path: Path):
vals = []
with open(path, "r", newline="", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
if "DataSource" not in reader.fieldnames:
raise ValueError("Input CSV must have a 'DataSource' column.")
for row in reader:
v = (row["DataSource"] or "").strip()
if v:
vals.append(v)
# de-dup while preserving order
seen, out = set(), []
for v in vals:
if v.lower() not in seen:
seen.add(v.lower())
out.append(v)
return out
def discover_objects(cur):
"""Return list of (schema, object, has_ds, has_ma) for dbo tables/views."""
cur.execute("""
SELECT
s.name AS schema_name,
o.name AS object_name,
MAX(CASE WHEN c.name = 'DataSource' THEN 1 ELSE 0 END) AS has_ds,
MAX(CASE WHEN c.name = 'MeasuringAgency' THEN 1 ELSE 0 END) AS has_ma
FROM sys.objects o
JOIN sys.schemas s ON s.schema_id = o.schema_id
JOIN sys.columns c ON c.object_id = o.object_id
WHERE s.name = ?
AND o.type IN ('U','V')
AND c.name IN ('DataSource','MeasuringAgency')
GROUP BY s.name, o.name
ORDER BY o.name
""", (SCHEMA,))
return [(r.schema_name, r.object_name, int(r.has_ds), int(r.has_ma)) for r in cur.fetchall()]
def main(in_csv: Path, out_csv: Path):
values = read_csv_values(in_csv)
if not values:
with open(out_csv, "w", newline="", encoding="utf-8") as f:
w = csv.writer(f); w.writerow(["DataSource","DataTable"])
print("No DataSource values in input; wrote empty output with header.")
return
conn = get_conn()
conn.timeout = 0 # no query timeout
pairs = [] # (DataSource, DataTable)
with conn.cursor() as cur:
objs = discover_objects(cur)
if not objs:
with open(out_csv, "w", newline="", encoding="utf-8") as f:
csv.writer(f).writerow(["DataSource","DataTable"])
print("No dbo tables/views with DataSource/MeasuringAgency found.")
return
# Prebuild per-object predicate & parameter layout
obj_checks = []
for sch, obj, has_ds, has_ma in objs:
where_clauses = []
params_layout = [] # 'ds' or 'ma' per placeholder position
if has_ds:
where_clauses.append(
"LTRIM(RTRIM(CONVERT(NVARCHAR(4000), t.[DataSource]))) "
"COLLATE SQL_Latin1_General_CP1_CI_AS = LTRIM(RTRIM(?)) COLLATE SQL_Latin1_General_CP1_CI_AS"
)
params_layout.append('v')
if has_ma:
where_clauses.append(
"LTRIM(RTRIM(CONVERT(NVARCHAR(4000), t.[MeasuringAgency]))) "
"COLLATE SQL_Latin1_General_CP1_CI_AS = LTRIM(RTRIM(?)) COLLATE SQL_Latin1_General_CP1_CI_AS"
)
params_layout.append('v')
sql = (
f"SELECT TOP (1) 1 "
f"FROM {qident(sch)}.{qident(obj)} AS t "
f"WHERE " + " OR ".join(where_clauses)
)
obj_checks.append((sch, obj, sql, params_layout))
# For each input value, probe each object
for v in values:
for sch, obj, sql, params_layout in obj_checks:
try:
params = tuple(v for _ in params_layout)
cur.execute(sql, params)
row = cur.fetchone()
if row:
pairs.append((v, f"{sch}.{obj}"))
except Exception as e:
# Skip problematic objects but keep going
print(f"Skipped {sch}.{obj}: {e}")
# de-dup final pairs
seen = set()
uniq_pairs = []
for ds, dt in pairs:
key = (ds.lower(), dt.lower())
if key not in seen:
seen.add(key)
uniq_pairs.append((ds, dt))
with open(out_csv, "w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(["DataSource","DataTable"])
w.writerows(uniq_pairs)
print(f"Wrote {len(uniq_pairs)} rows to {out_csv}")
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: python find_datasource_tables.py INPUT.csv OUTPUT.csv")
sys.exit(1)
main(Path(sys.argv[1]).expanduser(), Path(sys.argv[2]).expanduser())