diff --git a/src/spark_cli/cli.py b/src/spark_cli/cli.py index f245cbf6..360b977c 100644 --- a/src/spark_cli/cli.py +++ b/src/spark_cli/cli.py @@ -2167,6 +2167,17 @@ def telegram_token_repair_command(secret_id: str) -> str: return "spark telegram connect" +def secret_file_path_inside_spark_home(secret_path: Path, spark_home: Path = SPARK_HOME) -> bool: + try: + candidate = secret_path.expanduser().resolve(strict=False) + root = spark_home.expanduser().resolve(strict=False) + candidate_text = os.path.normcase(str(candidate)) + root_text = os.path.normcase(str(root)) + return os.path.commonpath([candidate_text, root_text]) == root_text + except (OSError, ValueError): + return False + + def resolve_secret_input(value: str) -> str: stripped = value.strip() if stripped.lower() == "@clipboard": @@ -2183,8 +2194,11 @@ def resolve_secret_input(value: str) -> str: secret_path = stripped[6:].strip() if not secret_path: raise SystemExit("Invalid secret reference: @file: requires a path.") + path = Path(secret_path) + if not secret_file_path_inside_spark_home(path, SPARK_HOME): + raise SystemExit("Invalid secret reference: @file: paths must stay inside SPARK_HOME.") try: - return Path(secret_path).expanduser().read_text(encoding="utf-8").strip() + return path.expanduser().read_text(encoding="utf-8").strip() except OSError as exc: raise SystemExit(f"Could not read secret file {secret_path}: {exc}") from exc return value diff --git a/tests/test_cli.py b/tests/test_cli.py index 6fbf7c22..d0aeafe3 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1675,13 +1675,23 @@ def test_resolve_secret_input_can_read_environment_reference(self) -> None: resolve_secret_input("@env:SPARK_TEST_SECRET_MISSING") def test_resolve_secret_input_can_read_file_reference(self) -> None: - with tempfile.NamedTemporaryFile("w", encoding="utf-8", delete=False) as handle: - handle.write("secret-from-file\n") - secret_path = Path(handle.name) - try: - self.assertEqual(resolve_secret_input(f"@file:{secret_path}"), "secret-from-file") - finally: - secret_path.unlink(missing_ok=True) + with tempfile.TemporaryDirectory() as tmpdir: + spark_home = Path(tmpdir) / "spark-home" + secret_path = spark_home / "config" / "secret.txt" + secret_path.parent.mkdir(parents=True) + secret_path.write_text("secret-from-file\n", encoding="utf-8") + with patch("spark_cli.cli.SPARK_HOME", spark_home): + self.assertEqual(resolve_secret_input(f"@file:{secret_path}"), "secret-from-file") + + def test_resolve_secret_input_rejects_file_reference_outside_spark_home(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + spark_home = root / "spark-home" + outside_secret = root / "outside-secret.txt" + outside_secret.write_text("secret-from-file\n", encoding="utf-8") + with patch("spark_cli.cli.SPARK_HOME", spark_home), self.assertRaises(SystemExit) as error: + resolve_secret_input(f"@file:{outside_secret}") + self.assertIn("inside SPARK_HOME", str(error.exception)) def test_llm_doctor_redacts_tokens_and_secret_fields(self) -> None: text = "BOT_TOKEN=1234567890:AAabcdefghijklmnopqrstuvwxyz1234567890 and Authorization: Bearer sk-proj-secretvalue1234567890"