-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbrowser_monitor.py
More file actions
328 lines (289 loc) · 12.1 KB
/
browser_monitor.py
File metadata and controls
328 lines (289 loc) · 12.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
import asyncio
import json
import logging
import os
import glob
import sqlite3
import shutil
import tempfile
import psutil
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import websockets
from websockets.http11 import Response
from websockets.datastructures import Headers
from port_utils import kill_port_holder
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("BrowserMonitor")
# 1. BROWSER_PROFILES dictionary mapping browser name to list of possible history db paths
BROWSER_PROFILES = {
"Chrome": [
os.path.expandvars(r"%LOCALAPPDATA%\Google\Chrome\User Data\Default\History"),
os.path.expandvars(r"%LOCALAPPDATA%\Google\Chrome\User Data\Profile*\History"),
os.path.expanduser(r"~/Library/Application Support/Google/Chrome/Default/History"),
os.path.expanduser(r"~/Library/Application Support/Google/Chrome/Profile*/History"),
os.path.expanduser(r"~/.config/google-chrome/Default/History"),
os.path.expanduser(r"~/.config/google-chrome/Profile*/History")
],
"Edge": [
os.path.expandvars(r"%LOCALAPPDATA%\Microsoft\Edge\User Data\Default\History"),
os.path.expandvars(r"%LOCALAPPDATA%\Microsoft\Edge\User Data\Profile*\History"),
os.path.expanduser(r"~/Library/Application Support/Microsoft Edge/Default/History"),
os.path.expanduser(r"~/Library/Application Support/Microsoft Edge/Profile*/History"),
os.path.expanduser(r"~/.config/microsoft-edge/Default/History"),
os.path.expanduser(r"~/.config/microsoft-edge/Profile*/History")
],
"Firefox": [
os.path.expandvars(r"%APPDATA%\Mozilla\Firefox\Profiles\*.default*\places.sqlite"),
os.path.expandvars(r"%APPDATA%\Mozilla\Firefox\Profiles\*.default-release*\places.sqlite"),
os.path.expanduser(r"~/Library/Application Support/Firefox/Profiles/*.default*/places.sqlite"),
os.path.expanduser(r"~/.mozilla/firefox/*.default*/places.sqlite")
],
"Brave": [
os.path.expandvars(r"%LOCALAPPDATA%\BraveSoftware\Brave-Browser\User Data\Default\History"),
os.path.expandvars(r"%LOCALAPPDATA%\BraveSoftware\Brave-Browser\User Data\Profile*\History"),
os.path.expanduser(r"~/Library/Application Support/BraveSoftware/Brave-Browser/Default/History"),
os.path.expanduser(r"~/.config/BraveSoftware/Brave-Browser/Default/History")
],
"Opera": [
os.path.expandvars(r"%APPDATA%\Opera Software\Opera Stable\History"),
os.path.expanduser(r"~/Library/Application Support/com.operasoftware.Opera/History"),
os.path.expanduser(r"~/.config/opera/History")
],
"Vivaldi": [
os.path.expandvars(r"%LOCALAPPDATA%\Vivaldi\User Data\Default\History"),
os.path.expanduser(r"~/Library/Application Support/Vivaldi/Default/History"),
os.path.expanduser(r"~/.config/vivaldi/Default/History")
],
"Arc": [
os.path.expandvars(r"%LOCALAPPDATA%\Packages\TheBrowserCompany*\LocalCache\Local\Arc\User Data\Default\History"),
os.path.expanduser(r"~/Library/Application Support/Arc/User Data\Default\History")
]
}
DETECTED_BROWSERS = {}
def scan_browsers():
detected = {}
for name, patterns in BROWSER_PROFILES.items():
matches = []
for p in patterns:
matches.extend(glob.glob(p))
# If multiple profiles exist, pick the one with the most recent modification time
best_match = None
best_mtime = 0
for m in matches:
if os.path.exists(m):
try:
mtime = os.path.getmtime(m)
if mtime > best_mtime:
best_mtime = mtime
best_match = m
except:
pass
if best_match:
detected[name] = best_match
return detected
def query_history(browser_name, db_path):
if not os.path.exists(db_path):
return []
tmp = None
conn = None
try:
fd, tmp = tempfile.mkstemp(suffix=".db")
os.close(fd)
shutil.copy2(db_path, tmp)
conn = sqlite3.connect(f"file:{tmp}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
if browser_name == "Firefox":
rows = conn.execute("""
SELECT url, title, last_visit_date
FROM moz_places
WHERE hidden=0 AND last_visit_date IS NOT NULL
AND url NOT LIKE 'place:%' AND url NOT LIKE 'about:%'
ORDER BY last_visit_date DESC LIMIT 30
""").fetchall()
formatted = []
for r in rows:
raw_time = r['last_visit_date']
visited_at = None
if raw_time:
try: visited_at = (raw_time / 1000)
except: pass
formatted.append({
"url": r['url'],
"title": r['title'],
"visitedAt": visited_at
})
else:
rows = conn.execute("""
SELECT url, title, last_visit_time
FROM urls
WHERE hidden=0
ORDER BY last_visit_time DESC LIMIT 30
""").fetchall()
CHROME_EPOCH_OFFSET_MS = 11644473600000
formatted = []
for r in rows:
raw_time = r['last_visit_time']
visited_at = None
if raw_time:
try: visited_at = (raw_time / 1000) - CHROME_EPOCH_OFFSET_MS
except: pass
url = r['url']
if url.startswith(('chrome:', 'edge:', 'about:', 'data:', 'blob:', 'chrome-extension:')):
continue
if 'localhost' in url or '127.0.0.1' in url:
continue
formatted.append({
"url": url,
"title": r['title'],
"visitedAt": visited_at
})
return formatted
except Exception as e:
logger.error(f"Error querying {browser_name} DB: {e}")
raise e
finally:
if conn:
try: conn.close()
except: pass
if tmp and os.path.exists(tmp):
for _ in range(3):
try:
os.remove(tmp)
break
except:
import time
time.sleep(0.1)
class HistoryFileHandler(FileSystemEventHandler):
def __init__(self, browser_name, file_path, loop, publish_callback):
self.browser_name = browser_name
self.file_path = os.path.normpath(file_path)
self.loop = loop
self.publish_callback = publish_callback
def on_modified(self, event):
if not event.is_directory and os.path.normpath(event.src_path) == self.file_path:
asyncio.run_coroutine_threadsafe(self.publish_callback(self.browser_name), self.loop)
observer = Observer()
browser_handlers = {}
clients = set()
async def push_detected_browsers_to_client(ws):
data = {"event": "detected_browsers", "browsers": list(DETECTED_BROWSERS.keys())}
try: await ws.send(json.dumps(data))
except: pass
async def push_detected_browsers():
if not clients: return
data = {"event": "detected_browsers", "browsers": list(DETECTED_BROWSERS.keys())}
msg = json.dumps(data)
for ws in list(clients):
try: await ws.send(msg)
except: pass
async def push_browser_data(browser_name, target_clients=None):
if target_clients is None:
target_clients = clients
if not target_clients: return
db_path = DETECTED_BROWSERS.get(browser_name)
if not db_path:
tabs = []
error = f"{browser_name} not detected. Open {browser_name} at least once to enable monitoring."
status = "info"
else:
try:
# Query history is a heavy operation (copying file + sqlite query)
tabs = await asyncio.to_thread(query_history, browser_name, db_path)
if not tabs:
error = f"No recent tabs found in {browser_name}."
status = "info"
else:
error = None
status = "ok"
except Exception as e:
error = f"Cannot read {browser_name} history. Try running SensorGuard as administrator."
status = "error"
tabs = []
logger.error(f"Cannot read db for {browser_name}: {e}")
msg = json.dumps({
"event": "tab_update",
"browser": browser_name,
"tabs": tabs,
"error": error,
"status": status
})
for ws in list(target_clients):
try: await ws.send(msg)
except: pass
async def process_event(browser_name):
await asyncio.sleep(0.1)
await push_browser_data(browser_name)
async def periodic_scan():
loop = asyncio.get_running_loop()
# Initial scan
new_detected = await asyncio.to_thread(scan_browsers)
await update_watchers_async(loop, new_detected)
while True:
await asyncio.sleep(60)
new_detected = await asyncio.to_thread(scan_browsers)
await update_watchers_async(loop, new_detected)
async def update_watchers_async(loop, new_detected):
global DETECTED_BROWSERS, browser_handlers
changed = False
for name, path in new_detected.items():
if name not in DETECTED_BROWSERS:
DETECTED_BROWSERS[name] = path
changed = True
dir_to_watch = os.path.dirname(path)
handler = HistoryFileHandler(name, path, loop, process_event)
watch = observer.schedule(handler, dir_to_watch, recursive=False)
browser_handlers[name] = watch
logger.info(f"Started watching {name} at {path}")
if changed:
await push_detected_browsers()
for name in DETECTED_BROWSERS:
await push_browser_data(name)
async def check_new_processes():
loop = asyncio.get_running_loop()
seen_pids = set()
browsers = ['chrome', 'edge', 'firefox', 'brave', 'opera', 'vivaldi', 'arc']
while True:
try:
# Iterating over processes can be slow
process_info = await asyncio.to_thread(lambda: [(p.pid, (p.info['name'] or '').lower()) for p in psutil.process_iter(['name'])])
for pid, name in process_info:
if pid not in seen_pids:
seen_pids.add(pid)
if any(b in name for b in browsers):
new_detected = await asyncio.to_thread(scan_browsers)
await update_watchers_async(loop, new_detected)
except Exception:
pass
await asyncio.sleep(5)
async def register(websocket):
clients.add(websocket)
try:
await push_detected_browsers_to_client(websocket)
for browser in DETECTED_BROWSERS:
await push_browser_data(browser, {websocket})
await websocket.wait_closed()
finally:
clients.remove(websocket)
async def main():
observer.start()
loop = asyncio.get_running_loop()
asyncio.create_task(periodic_scan())
asyncio.create_task(check_new_processes())
async def process_request(connection, request):
"""Provide a friendly message for non-WebSocket (HTTP) requests."""
if request.headers.get("Upgrade", "").lower() != "websocket":
return Response(
200,
"OK",
Headers([("Content-Type", "text/html"), ("Connection", "close")]),
b"<html><body><h1>SensorGuard WebSocket Server</h1><p>This port is for WebSocket connections only. Please use the SensorGuard UI to view data.</p></body></html>"
)
return None
await asyncio.to_thread(kill_port_holder, 8999)
# Use 127.0.0.1 to match what the frontend is now calling
async with websockets.serve(register, "127.0.0.1", 8999, process_request=process_request):
logger.info("Browser Monitor WebSocket active on ws://127.0.0.1:8999/browser-monitor")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())