-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun.py
More file actions
139 lines (118 loc) · 4.64 KB
/
run.py
File metadata and controls
139 lines (118 loc) · 4.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import sys
import subprocess
from analysis.preprocessing import load_dataset, preprocess_data, save_processed_data
from analysis.eda import generate_eda
from graph.arango_setup import ArangoDatabaseManager
from ingestion.streaming_ingest import StreamingIngestor
def main():
print("\n" + "═" * 70)
print("🚀 ElliptiGraph - Bitcoin Transaction Analysis Dashboard")
print("═" * 70)
# Load dataset
print("\n[1/5] 📂 Loading dataset...")
try:
features_df, edges_df, classes_df, merged_df = load_dataset("./dataset")
print(f" ✅ {len(features_df):,} transactions, {len(edges_df):,} edges")
except Exception as e:
print(f" ❌ {e}")
return
# Preprocess data
print("\n[2/5] 🔧 Preprocessing...")
try:
processed_df = preprocess_data(merged_df, features_df)
save_processed_data(processed_df, "./output")
print(" ✅ Data preprocessed")
except Exception as e:
print(f" ❌ {e}")
return
# Generate EDA
print("\n[3/5] 📊 Generating plots...")
try:
generate_eda(processed_df, edges_df, "./visualization/plots")
print(" ✅ Plots saved")
except Exception as e:
print(f" ⚠️ {e}")
# Connect to ArangoDB
print("\n[4/5] 🔌 Connecting to ArangoDB...")
try:
db = ArangoDatabaseManager("http://localhost:8529", "root", "root")
if not db.connect():
print(" ❌ Connection failed")
print(" 💡 Run: docker run -p 8529:8529 -e ARANGO_ROOT_PASSWORD=root arangodb/arangodb")
return
db.create_database("elliptic_graph")
db.use_database("elliptic_graph")
db.create_graph_structure()
print(" ✅ ArangoDB ready")
except Exception as e:
print(f" ❌ {e}")
return
# Ingest data
print("\n[5/5] 📥 Ingesting to ArangoDB...")
try:
ingestor = StreamingIngestor(db, edges_df, processed_df)
ingestor.stream_by_time_step(sleep_seconds=0.01, sample_size=None)
print(" ✅ Ingestion complete")
except Exception as e:
print(f" ⚠️ {e}")
# Execute queries and save results
print("\n[6/7] 🔍 Executing queries...")
try:
from graph.queries_simple import SimpleQueries
from graph.queries_complex import ComplexQueries
import json
simple_q = SimpleQueries(db)
complex_q = ComplexQueries(db)
# Execute simple queries
q1_results = simple_q.query_1_count_by_class()
# Execute complex queries
cq1_results = complex_q.query_1_two_hop_neighbors()
cq2_results = complex_q.query_2_illicit_clusters()
# Save results
output_path = "./output"
import pandas as pd
if q1_results:
pd.DataFrame(q1_results).to_csv(f"{output_path}/query_results_simple.csv", index=False)
combined_complex = []
if cq1_results:
combined_complex.append({"Query": "Two-Hop Neighbors", "Result": str(cq1_results)})
if cq2_results:
for i, result in enumerate(cq2_results[:10]):
combined_complex.append({"Query": "Illicit Clusters", "Index": i, "Result": str(result)})
if combined_complex:
pd.DataFrame(combined_complex).to_csv(f"{output_path}/query_results_complex.csv", index=False)
print(" ✅ Query results saved")
except Exception as e:
print(f" ⚠️ Query execution: {e}")
# CRITICAL: Close DB connection to free resources before dashboard starts
try:
del db
del ingestor
print("\n[7/7] 🔌 Closing connections...")
print(" ✅ Connections closed")
except:
pass
# Force garbage collection to free memory
import gc
gc.collect()
# Launch Dash Dashboard
print("\n" + "═" * 70)
print("🎨 Launching ElliptiGraph Dashboard")
print("═" * 70)
print("📍 URL: http://localhost:8050")
print("✨ Modern Dash UI with live ArangoDB integration")
print("🔍 Interactive query execution & network visualization")
print("🔄 Press Ctrl+C to stop")
print("═" * 70 + "\n")
try:
subprocess.run([
sys.executable,
"visualization/dash_app.py"
])
except KeyboardInterrupt:
print("\n👋 Dashboard stopped")
except Exception as e:
print(f"\n⚠️ Dashboard launch error: {e}")
print("💡 Try: python visualization/dash_app.py")
if __name__ == "__main__":
main()