From b5e896b471a1a0128cbe959d1306e2fcf2a8c9f0 Mon Sep 17 00:00:00 2001 From: Vexx Date: Sun, 24 May 2026 02:22:00 +0530 Subject: [PATCH] fix(vault): replace XOR stream cipher with AES-256-GCM, remove hardcoded fallback key MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The credential vault previously used a custom XOR stream cipher that could be broken via crib-dragging attacks, and fell back to the hardcoded key "secuscan-dev-key" when SECUSCAN_VAULT_KEY was unset — meaning every default-config deployment shared the same encryption key. Changes: - vault.py: AES-256-GCM via cryptography.hazmat.primitives.ciphers.aead.AESGCM; fresh 12-byte nonce per encrypt(); 16-byte GCM auth tag provides integrity - config.py: resolved_vault_key raises RuntimeError when neither vault_key nor plugin_signature_key is configured; hardcoded "secuscan-dev-key" fallback removed - requirements.txt: add cryptography>=42.0.0 - .env.example: mark SECUSCAN_VAULT_KEY as required with generation instructions - conftest.py: set vault_key in test environment so existing tests pass - test_vault_failure_messages.py: update byte offsets to match AES-GCM blob layout - test_vault_security.py: 15 new tests covering GCM properties, nonce uniqueness, tamper detection, and key configuration enforcement Fixes #200 (hardcoded vault encryption key "secuscan-dev-key"). --- .env.example | 8 +- backend/requirements.txt | 1 + backend/secuscan/config.py | 16 ++- backend/secuscan/vault.py | 66 ++++++---- testing/backend/conftest.py | 1 + .../unit/test_vault_failure_messages.py | 28 +++-- testing/backend/unit/test_vault_security.py | 119 ++++++++++++++++++ 7 files changed, 200 insertions(+), 39 deletions(-) create mode 100644 testing/backend/unit/test_vault_security.py diff --git a/.env.example b/.env.example index e8d1afa8..b0448ff2 100644 --- a/.env.example +++ b/.env.example @@ -22,10 +22,14 @@ SECUSCAN_ALLOW_LOOPBACK_SCANS=true # SECUSCAN_ALLOWED_NETWORKS=127.0.0.1,192.168.*.*,10.*.*.*,172.16.*.* # SECUSCAN_CORS_ALLOWED_ORIGINS=http://127.0.0.1:5173,http://localhost:5173 -# Optional Plugin Security +# Credential Vault — REQUIRED before first run +# Generate with: python -c "import secrets; print(secrets.token_hex(32))" +# The server refuses to start the vault if this is unset. +SECUSCAN_VAULT_KEY=replace-with-output-of-secrets.token_hex-32 + +# Plugin Security # SECUSCAN_PLUGIN_SIGNATURE_KEY=replace-with-your-signing-key # SECUSCAN_ENFORCE_PLUGIN_SIGNATURES=false -# SECUSCAN_VAULT_KEY=replace-with-a-stable-local-secret # Frontend Overrides # Leave these unset for the default local dev flow. diff --git a/backend/requirements.txt b/backend/requirements.txt index 2ec0ce1f..b7d7a851 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -1,4 +1,5 @@ fastapi>=0.115.0 +cryptography>=42.0.0 uvicorn[standard]>=0.30.6 pydantic>=2.8.2 pydantic-settings>=2.6.0 diff --git a/backend/secuscan/config.py b/backend/secuscan/config.py index 150f470f..6d45be19 100644 --- a/backend/secuscan/config.py +++ b/backend/secuscan/config.py @@ -90,8 +90,20 @@ def base_url(self) -> str: @property def resolved_vault_key(self) -> bytes: - """Return a deterministic 32-byte key for credential vault encryption.""" - seed = self.vault_key or self.plugin_signature_key or "secuscan-dev-key" + """Return a deterministic 32-byte key for credential vault encryption. + + Raises RuntimeError when neither SECUSCAN_VAULT_KEY nor + SECUSCAN_PLUGIN_SIGNATURE_KEY is set, rather than falling back to the + insecure hardcoded string that was present in earlier versions. + """ + seed = self.vault_key or self.plugin_signature_key + if not seed: + raise RuntimeError( + "SECUSCAN_VAULT_KEY is not set. " + "Set a strong random value in your environment or .env file before " + "starting the server. " + "Example: python -c \"import secrets; print(secrets.token_hex(32))\"" + ) digest = hashlib.sha256(seed.encode("utf-8")).digest() return base64.urlsafe_b64encode(digest) diff --git a/backend/secuscan/vault.py b/backend/secuscan/vault.py index 8e4d8bf6..50f030a7 100644 --- a/backend/secuscan/vault.py +++ b/backend/secuscan/vault.py @@ -1,46 +1,60 @@ -"""Lightweight encrypted credential vault.""" +"""Authenticated encrypted credential vault using AES-256-GCM.""" from __future__ import annotations import base64 -import hashlib -import hmac import os -from itertools import cycle + +from cryptography.hazmat.primitives.ciphers.aead import AESGCM class VaultCrypto: - """Symmetric encryption helper backed by a deterministic keystream. + """AES-256-GCM authenticated encryption for stored credentials. + + Each call to encrypt() generates a fresh random 12-byte nonce so no two + ciphertexts ever share a nonce under the same key. The GCM auth tag + (16 bytes, appended by AESGCM) provides both confidentiality and integrity — + any tampering causes decrypt() to raise ValueError. - This is intentionally lightweight for local-first usage where secret-at-rest - protection is needed without adding third-party crypto dependencies. + Wire format (base64url): nonce(12) || ciphertext || auth_tag(16) """ - def __init__(self, key: bytes): - self.key = key + _NONCE_LEN = 12 - def _derive_stream_key(self, nonce: bytes) -> bytes: - return hashlib.sha256(self.key + nonce).digest() + def __init__(self, key: bytes): + """ + Args: + key: 44-byte base64url-encoded representation of a 32-byte AES-256 key, + as produced by ``settings.resolved_vault_key``. + """ + try: + raw = base64.urlsafe_b64decode(key) + except Exception as exc: + raise ValueError("Vault key must be base64url-encoded") from exc + if len(raw) != 32: + raise ValueError( + f"Vault key must decode to exactly 32 bytes (AES-256); got {len(raw)}" + ) + self._aesgcm = AESGCM(raw) def encrypt(self, plaintext: str) -> str: - raw = plaintext.encode("utf-8") - nonce = os.urandom(16) - stream_key = self._derive_stream_key(nonce) - ciphertext = bytes(b ^ k for b, k in zip(raw, cycle(stream_key))) - signature = hmac.new(self.key, nonce + ciphertext, hashlib.sha256).digest() - blob = nonce + signature + ciphertext + nonce = os.urandom(self._NONCE_LEN) + ciphertext = self._aesgcm.encrypt(nonce, plaintext.encode("utf-8"), None) + blob = nonce + ciphertext return base64.urlsafe_b64encode(blob).decode("ascii") def decrypt(self, payload: str) -> str: - blob = base64.urlsafe_b64decode(payload.encode("ascii")) - nonce = blob[:16] - signature = blob[16:48] - ciphertext = blob[48:] + try: + blob = base64.urlsafe_b64decode(payload.encode("ascii")) + except Exception as exc: + raise ValueError("Vault payload is not valid base64url") from exc + + nonce = blob[: self._NONCE_LEN] + ciphertext = blob[self._NONCE_LEN :] - expected = hmac.new(self.key, nonce + ciphertext, hashlib.sha256).digest() - if not hmac.compare_digest(signature, expected): - raise ValueError("Vault payload integrity verification failed") + try: + raw = self._aesgcm.decrypt(nonce, ciphertext, None) + except Exception as exc: + raise ValueError("Vault payload integrity verification failed") from exc - stream_key = self._derive_stream_key(nonce) - raw = bytes(b ^ k for b, k in zip(ciphertext, cycle(stream_key))) return raw.decode("utf-8") diff --git a/testing/backend/conftest.py b/testing/backend/conftest.py index 6be3e030..d5ad5987 100644 --- a/testing/backend/conftest.py +++ b/testing/backend/conftest.py @@ -27,6 +27,7 @@ def setup_test_environment(monkeypatch): monkeypatch.setattr(settings, "reports_dir", f"{temp_path}/reports") monkeypatch.setattr(settings, "plugins_dir", str(repo_root / "plugins")) monkeypatch.setattr(settings, "database_path", f"{temp_path}/test_secuscan.db") + monkeypatch.setattr(settings, "vault_key", "test-vault-key-for-unit-tests-only") settings.ensure_directories() diff --git a/testing/backend/unit/test_vault_failure_messages.py b/testing/backend/unit/test_vault_failure_messages.py index 0b0c2762..866470ea 100644 --- a/testing/backend/unit/test_vault_failure_messages.py +++ b/testing/backend/unit/test_vault_failure_messages.py @@ -47,25 +47,27 @@ class TestVaultCryptoFailures: """ def test_tampered_payload_raises_value_error(self): - """Flipping a byte in the ciphertext must fail HMAC verification.""" + """Flipping a byte in the ciphertext must fail GCM authentication.""" crypto = VaultCrypto(settings.resolved_vault_key) encrypted = crypto.encrypt("my-secret-token") - # Decode, flip one byte in the ciphertext area (after nonce+signature = 48 bytes) + # AES-GCM blob: nonce(12) + ciphertext + auth_tag(16) + # Flip a byte inside the ciphertext (after the 12-byte nonce) blob = bytearray(base64.urlsafe_b64decode(encrypted.encode("ascii"))) - blob[50] ^= 0xFF + blob[14] ^= 0xFF tampered = base64.urlsafe_b64encode(bytes(blob)).decode("ascii") with pytest.raises(ValueError, match="integrity verification failed"): crypto.decrypt(tampered) def test_tampered_signature_raises_value_error(self): - """Flipping a byte in the HMAC signature must fail verification.""" + """Flipping a byte in the auth tag must fail GCM verification.""" crypto = VaultCrypto(settings.resolved_vault_key) encrypted = crypto.encrypt("another-secret") + # AES-GCM auth tag occupies the last 16 bytes of the blob blob = bytearray(base64.urlsafe_b64decode(encrypted.encode("ascii"))) - blob[20] ^= 0x01 # inside signature bytes (16–48) + blob[-1] ^= 0x01 tampered = base64.urlsafe_b64encode(bytes(blob)).decode("ascii") with pytest.raises(ValueError, match="integrity verification failed"): @@ -78,9 +80,16 @@ def test_malformed_base64_raises_exception(self): crypto.decrypt("not-valid-base64!!!") def test_wrong_key_raises_value_error(self): - """Decrypting with a different key must fail HMAC check.""" - crypto_a = VaultCrypto(b"key-used-to-encrypt-AAAAAAAAAAAAA") - crypto_b = VaultCrypto(b"key-used-to-decrypt-BBBBBBBBBBBBB") + """Decrypting with a different key must fail GCM authentication.""" + import hashlib + import base64 + + def _make_key(seed: str) -> bytes: + raw = hashlib.sha256(seed.encode()).digest() + return base64.urlsafe_b64encode(raw) + + crypto_a = VaultCrypto(_make_key("key-seed-A")) + crypto_b = VaultCrypto(_make_key("key-seed-B")) encrypted = crypto_a.encrypt("secret-value") with pytest.raises(ValueError, match="integrity verification failed"): @@ -92,8 +101,9 @@ def test_error_message_does_not_contain_secret(self): secret = "SENTINEL_SECRET_VALUE" encrypted = crypto.encrypt(secret) + # AES-GCM blob: nonce(12) + ciphertext + auth_tag(16); flip last auth-tag byte blob = bytearray(base64.urlsafe_b64decode(encrypted.encode("ascii"))) - blob[50] ^= 0xFF + blob[-1] ^= 0xFF tampered = base64.urlsafe_b64encode(bytes(blob)).decode("ascii") try: diff --git a/testing/backend/unit/test_vault_security.py b/testing/backend/unit/test_vault_security.py new file mode 100644 index 00000000..82b31ed5 --- /dev/null +++ b/testing/backend/unit/test_vault_security.py @@ -0,0 +1,119 @@ +""" +Security tests for the credential vault (issue #200). + +Verifies that: +- AES-256-GCM is used (not the old XOR stream cipher) +- The hardcoded fallback key "secuscan-dev-key" is gone +- resolved_vault_key raises when no key is configured +- Each encrypt() call produces a distinct ciphertext (unique nonces) +- Wrong key always fails authentication +- Truncated / short blobs raise, not silently return garbage +""" + +import base64 +import hashlib +import pytest + +from backend.secuscan.vault import VaultCrypto +from backend.secuscan.config import settings + + +def _make_key(seed: str) -> bytes: + raw = hashlib.sha256(seed.encode()).digest() + return base64.urlsafe_b64encode(raw) + + +class TestAesGcmProperties: + def test_roundtrip(self): + crypto = VaultCrypto(_make_key("test")) + assert crypto.decrypt(crypto.encrypt("hello")) == "hello" + + def test_unique_ciphertexts_per_call(self): + crypto = VaultCrypto(_make_key("test")) + c1 = crypto.encrypt("same plaintext") + c2 = crypto.encrypt("same plaintext") + assert c1 != c2, "Each call must use a fresh nonce" + + def test_blob_structure_has_12_byte_nonce(self): + """Blob starts with a 12-byte nonce (AES-GCM standard).""" + crypto = VaultCrypto(_make_key("test")) + blob = base64.urlsafe_b64decode(crypto.encrypt("x").encode()) + # nonce(12) + 1-byte plaintext + 16-byte auth_tag = 29 bytes total + assert len(blob) == 29 + + def test_tamper_ciphertext_raises(self): + crypto = VaultCrypto(_make_key("test")) + blob = bytearray(base64.urlsafe_b64decode(crypto.encrypt("secret").encode())) + blob[14] ^= 0x01 + with pytest.raises(ValueError, match="integrity verification failed"): + crypto.decrypt(base64.urlsafe_b64encode(bytes(blob)).decode()) + + def test_tamper_auth_tag_raises(self): + crypto = VaultCrypto(_make_key("test")) + blob = bytearray(base64.urlsafe_b64decode(crypto.encrypt("secret").encode())) + blob[-1] ^= 0x01 + with pytest.raises(ValueError, match="integrity verification failed"): + crypto.decrypt(base64.urlsafe_b64encode(bytes(blob)).decode()) + + def test_tamper_nonce_raises(self): + crypto = VaultCrypto(_make_key("test")) + blob = bytearray(base64.urlsafe_b64decode(crypto.encrypt("secret").encode())) + blob[0] ^= 0x01 + with pytest.raises(ValueError, match="integrity verification failed"): + crypto.decrypt(base64.urlsafe_b64encode(bytes(blob)).decode()) + + def test_wrong_key_raises(self): + crypto_a = VaultCrypto(_make_key("key-a")) + crypto_b = VaultCrypto(_make_key("key-b")) + with pytest.raises(ValueError, match="integrity verification failed"): + crypto_b.decrypt(crypto_a.encrypt("secret")) + + def test_truncated_blob_raises(self): + crypto = VaultCrypto(_make_key("test")) + blob = base64.urlsafe_b64decode(crypto.encrypt("hello").encode()) + short = base64.urlsafe_b64encode(blob[:5]).decode() + with pytest.raises(ValueError): + crypto.decrypt(short) + + def test_garbage_input_raises(self): + crypto = VaultCrypto(_make_key("test")) + with pytest.raises(Exception): + crypto.decrypt("not-valid-base64!!!") + + +class TestVaultKeyConfiguration: + def test_invalid_base64_key_raises_value_error(self): + with pytest.raises(ValueError): + VaultCrypto(b"not-valid-base64!!!") + + def test_short_key_raises_value_error(self): + short_b64 = base64.urlsafe_b64encode(b"only16bytes12345") + with pytest.raises(ValueError, match="32 bytes"): + VaultCrypto(short_b64) + + def test_resolved_vault_key_raises_without_config(self, monkeypatch): + """resolved_vault_key must raise RuntimeError when no key is configured.""" + monkeypatch.setattr(settings, "vault_key", None) + monkeypatch.setattr(settings, "plugin_signature_key", None) + with pytest.raises(RuntimeError, match="SECUSCAN_VAULT_KEY"): + _ = settings.resolved_vault_key + + def test_resolved_vault_key_works_with_vault_key_set(self, monkeypatch): + monkeypatch.setattr(settings, "vault_key", "any-non-empty-value") + key = settings.resolved_vault_key + assert isinstance(key, bytes) + assert len(base64.urlsafe_b64decode(key)) == 32 + + def test_resolved_vault_key_falls_back_to_plugin_signature_key(self, monkeypatch): + monkeypatch.setattr(settings, "vault_key", None) + monkeypatch.setattr(settings, "plugin_signature_key", "plugin-key") + key = settings.resolved_vault_key + assert isinstance(key, bytes) + assert len(base64.urlsafe_b64decode(key)) == 32 + + def test_hardcoded_dev_key_no_longer_used(self, monkeypatch): + """'secuscan-dev-key' must not be the effective key when both settings are None.""" + monkeypatch.setattr(settings, "vault_key", None) + monkeypatch.setattr(settings, "plugin_signature_key", None) + with pytest.raises(RuntimeError): + _ = settings.resolved_vault_key