-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcortex_pretool_enrich.sh
More file actions
executable file
·101 lines (87 loc) · 3.27 KB
/
cortex_pretool_enrich.sh
File metadata and controls
executable file
·101 lines (87 loc) · 3.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/bin/bash
# PreToolUse hook: enrich cortex operations with project context + audit trail
# For memory_store/update: auto-detect project from cwd, inject via updatedInput
# For ALL cortex tools: log operation to audit trail
INPUT=$(cat)
/usr/bin/python3 -W ignore - "$INPUT" 2>/dev/null <<'PYEOF'
import sys, json, os, time
OPS_LOG = os.path.expanduser("~/.claude/.cortex_ops_log.jsonl")
raw = sys.argv[1] if len(sys.argv) > 1 else ""
try:
d = json.loads(raw)
except Exception:
sys.exit(0)
tool_name = d.get("tool_name", "")
tool_input = d.get("tool_input", {})
cwd = d.get("cwd", "") or os.getcwd()
if not tool_name.startswith("mcp__cortex__"):
sys.exit(0)
# Extract the specific tool (memory_store, memory_search, etc.)
tool_short = tool_name.replace("mcp__cortex__", "")
# ================================================================
# Audit: log every cortex operation
# ================================================================
try:
entry = {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"),
"tool": tool_short,
"cwd": cwd,
}
# Include memory_id if available
if "memory_id" in tool_input:
entry["memory_id"] = tool_input["memory_id"]
if "query" in tool_input:
entry["query"] = tool_input["query"][:100]
with open(OPS_LOG, "a") as f:
f.write(json.dumps(entry) + "\n")
# Rotate if >500KB (keep last 7 days) — atomic write
if os.path.getsize(OPS_LOG) > 500_000:
import tempfile
cutoff = time.strftime("%Y-%m-%d", time.localtime(time.time() - 7 * 86400))
with open(OPS_LOG) as f:
lines = [l for l in f if l[16:26] >= cutoff]
tmp_fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(OPS_LOG))
try:
with os.fdopen(tmp_fd, "w") as f:
f.writelines(lines)
os.replace(tmp_path, OPS_LOG)
except Exception:
try: os.unlink(tmp_path)
except OSError: pass
except Exception:
pass
# ================================================================
# Enrich: auto-detect project for store/update if not set
# ================================================================
if tool_short in ("memory_store", "memory_update"):
current_project = tool_input.get("project", "")
if not current_project:
# Detect project dynamically from cwd path components
parts = cwd.replace("\\", "/").split("/")
detected = ""
skip = {"home", "Users", "projects", "src", "work", "dev", "repos", "code", ".claude", ""}
for p in reversed(parts):
if p not in skip and len(p) > 2 and not p.startswith("."):
detected = p
break
if detected:
updated_input = dict(tool_input)
updated_input["project"] = detected
output = json.dumps({
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "allow",
"updatedInput": updated_input
}
})
print(output)
sys.exit(0)
# For non-enriched calls, just allow
output = json.dumps({
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "allow"
}
})
print(output)
PYEOF