-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocess_incremental.py
More file actions
225 lines (192 loc) Β· 8.75 KB
/
process_incremental.py
File metadata and controls
225 lines (192 loc) Β· 8.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import os
import sqlite3
import sys
import csv
import argparse
from tqdm import tqdm
from zstd_utils import open_readable
csv.field_size_limit(sys.maxsize)
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DB_FILE = os.path.join(SCRIPT_DIR, "fec_contributions.db")
def get_connection(db_path=None):
"""Create and return a database connection with optimized settings."""
path = db_path or DB_FILE
conn = sqlite3.connect(path)
cursor = conn.cursor()
cursor.execute('PRAGMA journal_mode = WAL;')
cursor.execute('PRAGMA synchronous = NORMAL;')
return conn, cursor
conn, cursor = get_connection()
# Create main contributions table (if it doesn't exist)
cursor.execute('''
CREATE TABLE IF NOT EXISTS contributions (
first_name TEXT,
last_name TEXT,
city TEXT,
state TEXT,
zip_code TEXT,
contribution_date TEXT,
recipient_name TEXT,
amount REAL,
recipient_type TEXT
)
''')
# Create a persistent temp table for duplicate detection (stays on disk, not in RAM)
print("π Building duplicate detection index from existing 2023+ records...")
cursor.execute('''
CREATE TEMPORARY TABLE IF NOT EXISTS temp_contribution_hashes (
record_hash TEXT PRIMARY KEY
)
''')
# Populate the temp table directly from the DB β no Python memory needed
cursor.execute('''
INSERT OR IGNORE INTO temp_contribution_hashes (record_hash)
SELECT
COALESCE(first_name, '') || '|' ||
COALESCE(last_name, '') || '|' ||
COALESCE(city, '') || '|' ||
COALESCE(state, '') || '|' ||
COALESCE(zip_code, '') || '|' ||
COALESCE(contribution_date, '') || '|' ||
COALESCE(recipient_name, '') || '|' ||
COALESCE(CAST(amount AS TEXT), '') || '|' ||
COALESCE(recipient_type, '')
FROM contributions
WHERE contribution_date >= '2023-01-01'
''')
conn.commit()
cursor.execute("SELECT COUNT(*) FROM temp_contribution_hashes")
hash_count = cursor.fetchone()[0]
print(f"β
Indexed {hash_count:,} existing 2023+ records for duplicate detection")
def record_exists(first_name, last_name, city, state, zip_code, contribution_date, recipient_name, amount, recipient_type):
"""Check if a record already exists in the database (using SQLite temp table lookup)"""
record_hash = '|'.join([
str(first_name or ''),
str(last_name or ''),
str(city or ''),
str(state or ''),
str(zip_code or ''),
str(contribution_date or ''),
str(recipient_name or ''),
str(amount or ''),
str(recipient_type or '')
])
cursor.execute("SELECT 1 FROM temp_contribution_hashes WHERE record_hash = ?", (record_hash,))
return cursor.fetchone() is not None
def add_record_to_temp_table(first_name, last_name, city, state, zip_code, contribution_date, recipient_name, amount, recipient_type):
"""Add a record hash to the SQLite temp table for future duplicate checks"""
record_hash = '|'.join([
str(first_name or ''),
str(last_name or ''),
str(city or ''),
str(state or ''),
str(zip_code or ''),
str(contribution_date or ''),
str(recipient_name or ''),
str(amount or ''),
str(recipient_type or '')
])
cursor.execute("INSERT OR IGNORE INTO temp_contribution_hashes (record_hash) VALUES (?)", (record_hash,))
def process_file_incrementally(file_path, description="Processing"):
"""Process a single file and add only new records"""
print(f"\nπ Processing {file_path}")
if not os.path.exists(file_path) and not os.path.exists(file_path + ".zst"):
print(f"β File not found: {file_path}")
return
with open_readable(file_path, encoding='latin-1') as f:
reader = csv.reader(f, delimiter='|')
batch = []
new_records = 0
duplicate_records = 0
error_records = 0
for row in tqdm(reader, desc=description):
try:
# Parse the record (same logic as original script)
name = row[7].strip().split(', ')
last_name = name[0] if len(name) > 0 else ''
first_name = name[1] if len(name) > 1 else ''
city = row[8].strip()
state = row[9].strip()
zip_code = row[10].strip()
raw_date = row[13].strip()
contribution_date = (
f"{raw_date[4:8]}-{raw_date[0:2]}-{raw_date[2:4]}"
if len(raw_date) == 8 else None
)
amount = float(row[14].strip())
# Determine the recipient name
other_id = row[15].strip()
cmte_id = row[0].strip()
recipient_name = other_id if other_id else cmte_id
recipient_type = row[16].strip()
# Check if this record already exists
if record_exists(first_name, last_name, city, state, zip_code,
contribution_date, recipient_name, amount, recipient_type):
duplicate_records += 1
continue
# Add to batch for insertion
batch.append((
first_name, last_name, city, state, zip_code,
contribution_date, recipient_name, amount, recipient_type
))
# Also add to our temporary tracking table
add_record_to_temp_table(first_name, last_name, city, state, zip_code,
contribution_date, recipient_name, amount, recipient_type)
new_records += 1
# Process batch when it gets large enough
if len(batch) >= 1000:
cursor.executemany('''
INSERT INTO contributions (
first_name, last_name, city, state, zip_code,
contribution_date, recipient_name, amount, recipient_type
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', batch)
conn.commit()
batch = []
except Exception as e:
error_records += 1
continue
# Final commit for remaining records
if batch:
cursor.executemany('''
INSERT INTO contributions (
first_name, last_name, city, state, zip_code,
contribution_date, recipient_name, amount, recipient_type
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', batch)
conn.commit()
print(f"β
Completed processing {file_path}")
print(f" π New records added: {new_records:,}")
print(f" π Duplicates skipped: {duplicate_records:,}")
print(f" β Errors skipped: {error_records:,}")
return new_records, duplicate_records, error_records
# Main execution
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Incrementally process FEC contribution data")
default_path = os.path.join(SCRIPT_DIR, "fec_data", "2025-2026", "itcont.txt")
parser.add_argument("file_path", nargs="?",
default=default_path,
help="Path to the itcont.txt file to process (will auto-detect .zst)")
args = parser.parse_args()
print("π Starting incremental FEC data processing with duplicate detection...")
file_path = args.file_path
total_new, total_duplicates, total_errors = process_file_incrementally(
file_path,
f"Adding records from {os.path.basename(file_path)}"
)
print(f"\nπ Processing complete!")
print(f"π Summary:")
print(f" β
New records added: {total_new:,}")
print(f" π Duplicates skipped: {total_duplicates:,}")
print(f" β Errors skipped: {total_errors:,}")
# Update indexes to maintain performance
print("\nπ§ Updating indexes...")
cursor.execute('CREATE INDEX IF NOT EXISTS idx_name ON contributions (first_name, last_name)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_location ON contributions (city, state, zip_code)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_contrib_date ON contributions (contribution_date)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_contrib_recipient ON contributions (recipient_name)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_contrib_flz_plus_date ON contributions (first_name, last_name, zip_code, contribution_date)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_contrib_flz_plus_amount ON contributions (first_name, last_name, zip_code, amount)')
conn.commit()
conn.close()
print(f"β
Database updated: {DB_FILE}")