Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions scripts/install.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,12 @@ function Require-Command {

function Test-PythonCompatible {
param([string]$PythonExe)
& $PythonExe -c 'import sys; raise SystemExit(0 if (3, 11) <= sys.version_info < (3, 14) else 1)' 2>$null | Out-Null
return $LASTEXITCODE -eq 0
try {
& $PythonExe -c 'import sys; raise SystemExit(0 if (3, 11) <= sys.version_info < (3, 14) else 1)' 2>$null | Out-Null
return $LASTEXITCODE -eq 0
} catch {
return $false
}
}

function Find-SystemPython {
Expand Down
209 changes: 162 additions & 47 deletions src/spark_cli/cli.py

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion src/spark_cli/sandbox/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ def read_env_file(path: Path) -> dict[str, str]:

def write_env_file(path: Path, values: dict[str, str]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(f"{key}={value}" for key, value in values.items()) + "\n", encoding="utf-8")
# Strip newlines from values to prevent env var injection
sanitized = {k: v.replace("\n", "").replace("\r", "") for k, v in values.items()}
path.write_text("\n".join(f"{key}={value}" for key, value in sanitized.items()) + "\n", encoding="utf-8")


def level5_env_paths(*, home: Path | None = None, env: dict[str, str] | None = None) -> dict[str, Path]:
Expand Down
2 changes: 1 addition & 1 deletion src/spark_cli/sandbox/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ def load_ssh_targets(*, home: Path | None = None) -> dict[str, SshTarget]:
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError as error:
raise ValueError("SSH target store is not valid JSON.") from error
raise ValueError(f"SSH target store is corrupt or not valid JSON: {error}") from error
if not isinstance(payload, dict) or payload.get("schema_version") != SSH_TARGETS_SCHEMA_VERSION:
raise ValueError("Unsupported SSH target store schema.")
targets = payload.get("targets")
Expand Down
50 changes: 50 additions & 0 deletions src/spark_cli/security/approval.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,56 @@ def approval_required_for_command(argv: list[str], context: CommandContext | Non
confirmation_phrase="approve container privilege",
)

if (first == "docker" and second == "exec") or (
first == "docker" and lowered[1:3] == ["container", "exec"]
):
return _decision(
parts,
ctx,
"container_privilege_escalation",
"high",
"docker exec runs a command inside a running container, which may carry elevated privileges or host-mounted paths.",
target_display=" ".join(parts[:4]),
confirmation_phrase="approve container exec",
)

if first == "nsenter":
return _decision(
parts,
ctx,
"container_privilege_escalation",
"critical",
"nsenter enters one or more Linux namespaces of a target process and can escape container isolation on the host.",
target_display=" ".join(parts[:4]),
confirmation_phrase="approve namespace entry",
)

if first == "chroot":
return _decision(
parts,
ctx,
"container_privilege_escalation",
"high",
"chroot changes the root directory for a process, which can escape filesystem containment or grant access to an alternative OS tree.",
target_display=" ".join(parts[:3]),
confirmation_phrase="approve chroot",
)

if first in {
"adduser", "useradd", "usermod", "userdel", "deluser",
"groupadd", "groupmod", "groupdel",
"passwd", "chpasswd",
}:
return _decision(
parts,
ctx,
"identity_access_mutation",
"high",
"Command modifies local user accounts, groups, or credentials.",
target_display=" ".join(parts[:3]),
confirmation_phrase="approve user account change",
)

if first in {"railway", "vercel", "flyctl", "serverless"} and _contains_any(lowered, {"up", "deploy", "redeploy"}):
return _decision(
parts,
Expand Down
105 changes: 104 additions & 1 deletion src/spark_cli/security/prompt_injection.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import re
import unicodedata
from dataclasses import dataclass
from pathlib import Path

Expand Down Expand Up @@ -43,6 +44,80 @@
)


# Map of common Unicode homoglyphs to their ASCII equivalents.
# This covers Cyrillic, Greek, and other scripts that have visually
# similar characters to Latin letters used in English.
HOMOGLYPH_MAP = {
# Cyrillic lowercase → Latin
'\u0430': 'a', # а (Cyrillic)
'\u0431': 'b', # б (Cyrillic)
'\u0432': 'v', # в (Cyrillic)
'\u0433': 'r', # г (Cyrillic)
'\u0435': 'e', # е (Cyrillic)
'\u0438': 'u', # и (Cyrillic)
'\u043a': 'k', # к (Cyrillic)
'\u043c': 'm', # м (Cyrillic)
'\u043e': 'o', # о (Cyrillic)
'\u043f': 'n', # п (Cyrillic)
'\u0440': 'p', # р (Cyrillic)
'\u0441': 'c', # с (Cyrillic)
'\u0442': 't', # т (Cyrillic)
'\u0443': 'y', # у (Cyrillic)
'\u0445': 'x', # х (Cyrillic)
'\u0446': 'c', # ц (Cyrillic)
'\u0448': 'w', # ш (Cyrillic)
'\u044b': 'b', # ы (Cyrillic)
'\u044d': 'e', # э (Cyrillic)
'\u044e': 'u', # ю (Cyrillic)
'\u044f': 'a', # я (Cyrillic)
# Cyrillic uppercase → Latin
'\u0410': 'A', # А
'\u0412': 'B', # В
'\u0413': 'R', # Г
'\u0415': 'E', # Е
'\u0418': 'U', # И
'\u041a': 'K', # К
'\u041c': 'M', # М
'\u041e': 'O', # О
'\u041f': 'N', # П
'\u0420': 'P', # Р
'\u0421': 'C', # С
'\u0422': 'T', # Т
'\u0423': 'Y', # У
'\u0425': 'X', # Х
'\u0426': 'C', # Ц
'\u0428': 'W', # Ш
'\u042b': 'B', # Ы
'\u042d': 'E', # Э
'\u042e': 'U', # Ю
'\u042f': 'A', # Я
# Greek lowercase → Latin
'\u03b1': 'a', # α (alpha)
'\u03b2': 'b', # β (beta)
'\u03b5': 'e', # ε (epsilon)
'\u03b7': 'n', # η (eta)
'\u03b9': 'i', # ι (iota)
'\u03ba': 'k', # κ (kappa)
'\u03bf': 'o', # ο (omicron)
'\u03c1': 'p', # ρ (rho)
'\u03c4': 't', # τ (tau)
'\u03c5': 'y', # υ (upsilon)
'\u03c7': 'x', # χ (chi)
# Greek uppercase → Latin
'\u0391': 'A', # Α
'\u0392': 'B', # Β
'\u0395': 'E', # Ε
'\u0397': 'H', # Η
'\u0399': 'I', # Ι
'\u039a': 'K', # Κ
'\u039f': 'O', # Ο
'\u03a1': 'P', # Ρ
'\u03a4': 'T', # Τ
'\u03a5': 'Y', # Υ
'\u03a7': 'X', # Χ
}


@dataclass(frozen=True)
class PromptInjectionFinding:
category: str
Expand All @@ -51,6 +126,30 @@ class PromptInjectionFinding:
detail: str


def normalize_unicode(text: str) -> str:
"""
Normalize Unicode text using NFKD decomposition and homoglyph mapping.

Unicode homoglyphs (e.g., Cyrillic 'о' U+043E vs Latin 'o' U+006F)
are visually similar but have different byte representations. This
function:
1. Applies NFKD normalization to decompose compatibility characters
2. Maps known homoglyphs to their ASCII equivalents

This prevents bypass attacks using visually similar characters from
other scripts (Cyrillic, Greek, etc.) to evade pattern matching.
"""
# First apply NFKD decomposition
normalized = unicodedata.normalize("NFKD", text)

# Then map homoglyphs to ASCII equivalents
result = []
for char in normalized:
result.append(HOMOGLYPH_MAP.get(char, char))

return "".join(result)


def is_agent_context_path(path_label: str) -> bool:
path = Path(path_label)
name = path.name.lower()
Expand All @@ -65,7 +164,11 @@ def scan_prompt_injection_text(path_label: str, text: str) -> list[PromptInjecti
if not is_agent_context_path(path_label):
return []
findings: list[PromptInjectionFinding] = []

# Normalize Unicode to collapse homoglyphs before pattern matching
normalized_text = normalize_unicode(text)

for category, severity, pattern, detail in PROMPT_INJECTION_PATTERNS:
if pattern.search(text):
if pattern.search(normalized_text):
findings.append(PromptInjectionFinding(category, severity, path_label, detail))
return findings
Loading
Loading