Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .containerignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
.pytest_cache/
.buildkite/
.claude/
.codex
dist/
build/
tests/
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ build/
.env
*.token*.json
notes/
.codex
.claude
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,8 @@ psi serve Run the secret lookup service
psi setup Discover secrets, register with Podman, generate drop-ins
psi setup --provider infisical Setup only Infisical-backed workloads (with retry)
psi setup --provider nitrokeyhsm Setup only Nitrokey HSM-backed workloads
psi setup --dry-run Inspect every shell-driver Podman secret without mutating;
classifies each as managed / stale-opts / orphaned
psi install Generate containers.conf.d/psi.conf
psi systemd install Generate systemd units (--mode native or container)
```
Expand Down
16 changes: 15 additions & 1 deletion psi/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,25 @@ def setup(
str | None,
typer.Option("--provider", help="Only setup workloads for this provider."),
] = None,
dry_run: Annotated[
bool,
typer.Option(
"--dry-run",
help=(
"Inspect Podman secret state without mutating anything. "
"Classifies every shell-driver secret as managed, stale-opts, "
"or orphaned."
),
),
] = False,
) -> None:
"""Discover secrets, register with Podman, generate systemd drop-ins."""
from psi.setup import run_setup
from psi.setup import dry_run_setup, run_setup

settings = load_settings(config, scope=detect_scope())
if dry_run:
dry_run_setup(settings)
return
run_setup(settings, provider=provider)


Expand Down
130 changes: 130 additions & 0 deletions psi/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from psi.systemd import daemon_reload

if TYPE_CHECKING:
from pathlib import Path

from psi.cache import Cache
from psi.settings import PsiSettings

Expand Down Expand Up @@ -261,3 +263,131 @@ def _generate_drop_in(

drop_in_path.write_text("\n".join(lines) + "\n")
logger.info("Wrote drop-in: {}", drop_in_path)


def dry_run_setup(settings: PsiSettings) -> None:
"""Inspect Podman secret state without mutating anything.

For each shell-driver secret registered with Podman, classify it as one
of:

- ``managed`` — a mapping file exists in ``state_dir`` and the stored
``Spec.Driver.Options`` match the current ``containers.conf.d/psi.conf``.
- ``stale-opts`` — a mapping file exists but the stored driver opts
differ from the current conf (e.g. the socket token was rotated but
``psi setup`` has not been re-run). Re-run ``psi setup`` to refresh.
- ``orphaned`` — no mapping file in ``state_dir``. A lookup would return
404. Candidate for a future ``psi orphans --prune``.

Does not fetch from Infisical/HSM or contact anything other than the
local Podman API and the on-disk ``state_dir``. Safe to run at any time.
"""
from psi.token import resolve_socket_token
from psi.unitgen import generate_driver_conf

current_opts = _parse_driver_opts(
generate_driver_conf(settings.scope, token=resolve_socket_token(settings))
)

try:
secrets = _list_podman_shell_secrets()
except httpx.HTTPError as e:
msg = f"Cannot reach Podman API at {_podman_socket_url()}: {e}"
raise ProviderError(msg, provider_name="podman") from e

managed, stale, orphaned = _classify_secrets(secrets, settings.state_dir, current_opts)
_print_dry_run_report(managed, stale, orphaned)


_SHELL_OPT_KEYS = ("lookup", "store", "delete", "list")


def _parse_driver_opts(conf_text: str) -> dict[str, str]:
"""Extract the shell-driver opts from generated ``psi.conf`` TOML text.

Avoids a full TOML parser — the generator produces a fixed shape, and
the comparison only needs ``lookup``/``store``/``delete``/``list``.
"""
opts: dict[str, str] = {}
for line in conf_text.splitlines():
for key in _SHELL_OPT_KEYS:
prefix = f'{key} = "'
if line.startswith(prefix) and line.endswith('"'):
opts[key] = line[len(prefix) : -1]
return opts


def _list_podman_shell_secrets() -> list[dict]:
"""Return every Podman secret whose driver is ``shell``."""
transport = httpx.HTTPTransport(uds=_podman_socket_url())
base = f"http://localhost/{_PODMAN_API_VERSION}"
with httpx.Client(transport=transport, timeout=30.0) as client:
resp = client.get(f"{base}/libpod/secrets/json")
resp.raise_for_status()
secrets = resp.json()
return [s for s in secrets if s.get("Spec", {}).get("Driver", {}).get("Name") == "shell"]


def _classify_secrets(
secrets: list[dict],
state_dir: Path,
current_opts: dict[str, str],
) -> tuple[list[str], list[str], list[str]]:
"""Bucket secrets into (managed, stale-opts, orphaned) by name."""
managed: list[str] = []
stale: list[str] = []
orphaned: list[str] = []

for secret in secrets:
spec = secret.get("Spec", {})
name = spec.get("Name", "")
if not name:
continue
raw_opts = spec.get("Driver", {}).get("Options", {})
stored_opts = {k: raw_opts.get(k, "") for k in _SHELL_OPT_KEYS}
mapping_exists = (state_dir / name).exists()
if not mapping_exists:
orphaned.append(name)
elif stored_opts != current_opts:
stale.append(name)
else:
managed.append(name)

managed.sort()
stale.sort()
orphaned.sort()
return managed, stale, orphaned


def _print_dry_run_report(managed: list[str], stale: list[str], orphaned: list[str]) -> None:
from rich.console import Console
from rich.table import Table

console = Console()
total = len(managed) + len(stale) + len(orphaned)

summary = Table(title=f"psi setup --dry-run ({total} shell-driver secrets)")
summary.add_column("Status")
summary.add_column("Count", justify="right")
summary.add_row("[green]managed[/green]", str(len(managed)))
summary.add_row("[yellow]stale-opts[/yellow]", str(len(stale)))
summary.add_row("[red]orphaned[/red]", str(len(orphaned)))
console.print(summary)

if stale:
console.print(
"\n[yellow]Stale-opts[/yellow] — driver opts differ from current "
"psi.conf; re-run `psi setup` to refresh:"
)
for name in stale:
console.print(f" {name}")

if orphaned:
console.print(
"\n[red]Orphaned[/red] — no mapping file in state_dir; lookups would return 404:"
)
for name in orphaned:
console.print(f" {name}")

if not stale and not orphaned:
console.print("\n[green]All secrets are managed — nothing to do.[/green]")
203 changes: 203 additions & 0 deletions tests/test_dry_run_setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
"""Tests for psi.setup.dry_run_setup — read-only Podman secret inspection."""

from __future__ import annotations

from typing import TYPE_CHECKING
from unittest.mock import patch

import httpx
import pytest

from psi.errors import ProviderError
from psi.models import SystemdScope
from psi.setup import (
_classify_secrets,
_parse_driver_opts,
dry_run_setup,
)

if TYPE_CHECKING:
from pathlib import Path


def _fake_settings(tmp_path: Path):
from unittest.mock import MagicMock

settings = MagicMock()
settings.state_dir = tmp_path / "state"
settings.state_dir.mkdir()
settings.scope = SystemdScope.SYSTEM
settings.socket_token = None
return settings


def _shell_secret(
name: str,
*,
lookup: str = "curl-lookup",
store: str = "curl-store",
delete: str = "curl-delete",
list_: str = "curl-list",
) -> dict:
return {
"Spec": {
"Name": name,
"Driver": {
"Name": "shell",
"Options": {
"lookup": lookup,
"store": store,
"delete": delete,
"list": list_,
},
},
},
}


class TestParseDriverOpts:
def test_extracts_all_four_opts(self) -> None:
conf = (
"[secrets]\n"
'driver = "shell"\n'
"\n"
"[secrets.opts]\n"
'store = "curl -X POST http://s"\n'
'lookup = "curl http://l"\n'
'delete = "curl -X DELETE http://d"\n'
'list = "curl http://ls"\n'
)
opts = _parse_driver_opts(conf)
assert opts == {
"store": "curl -X POST http://s",
"lookup": "curl http://l",
"delete": "curl -X DELETE http://d",
"list": "curl http://ls",
}


class TestClassifySecrets:
CURRENT = {
"lookup": "curl-lookup",
"store": "curl-store",
"delete": "curl-delete",
"list": "curl-list",
}

def test_managed_when_mapping_and_opts_match(self, tmp_path: Path) -> None:
(tmp_path / "app--DB_URL").write_text("{}")
managed, stale, orphaned = _classify_secrets(
[_shell_secret("app--DB_URL")],
tmp_path,
self.CURRENT,
)
assert managed == ["app--DB_URL"]
assert stale == []
assert orphaned == []

def test_stale_when_opts_drift(self, tmp_path: Path) -> None:
(tmp_path / "app--DB_URL").write_text("{}")
managed, stale, orphaned = _classify_secrets(
[_shell_secret("app--DB_URL", lookup="curl-old-lookup")],
tmp_path,
self.CURRENT,
)
assert stale == ["app--DB_URL"]
assert managed == []
assert orphaned == []

def test_orphaned_when_no_mapping_file(self, tmp_path: Path) -> None:
managed, stale, orphaned = _classify_secrets(
[_shell_secret("buildkite-agent--GHCR_PAT")],
tmp_path,
self.CURRENT,
)
assert orphaned == ["buildkite-agent--GHCR_PAT"]

def test_mixed_state_sorted_per_bucket(self, tmp_path: Path) -> None:
(tmp_path / "b-managed").write_text("{}")
(tmp_path / "a-managed").write_text("{}")
(tmp_path / "stale").write_text("{}")
managed, stale, orphaned = _classify_secrets(
[
_shell_secret("b-managed"),
_shell_secret("a-managed"),
_shell_secret("stale", lookup="drifted"),
_shell_secret("orph-b"),
_shell_secret("orph-a"),
],
tmp_path,
self.CURRENT,
)
assert managed == ["a-managed", "b-managed"]
assert stale == ["stale"]
assert orphaned == ["orph-a", "orph-b"]

def test_skips_entries_with_no_name(self, tmp_path: Path) -> None:
managed, stale, orphaned = _classify_secrets(
[{"Spec": {"Driver": {"Name": "shell", "Options": {}}}}],
tmp_path,
self.CURRENT,
)
assert managed == stale == orphaned == []


class TestDryRunSetup:
def test_surfaces_podman_api_failure_as_provider_error(self, tmp_path: Path) -> None:
settings = _fake_settings(tmp_path)
with (
patch(
"psi.setup._list_podman_shell_secrets",
side_effect=httpx.ConnectError("refused"),
),
pytest.raises(ProviderError, match="Cannot reach Podman API"),
):
dry_run_setup(settings)

def test_prints_report_and_does_not_mutate(
self, tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
settings = _fake_settings(tmp_path)
(settings.state_dir / "app--OK").write_text("{}")
(settings.state_dir / "app--STALE").write_text("{}")

from psi.unitgen import generate_driver_conf

current_conf = generate_driver_conf(settings.scope, token=None)
opts = _parse_driver_opts(current_conf)
stale_opts = {**opts, "lookup": "old-lookup-command"}

secrets = [
{
"Spec": {
"Name": "app--OK",
"Driver": {"Name": "shell", "Options": opts},
},
},
{
"Spec": {
"Name": "app--STALE",
"Driver": {"Name": "shell", "Options": stale_opts},
},
},
{
"Spec": {
"Name": "app--ORPHAN",
"Driver": {"Name": "shell", "Options": opts},
},
},
]

with patch("psi.setup._list_podman_shell_secrets", return_value=secrets):
dry_run_setup(settings)

out = capsys.readouterr().out
assert "app--OK" not in out or "managed" in out # not in the stale/orphan lists
assert "app--STALE" in out
assert "app--ORPHAN" in out
assert "dry-run" in out
# state_dir must not have gained any new files
assert sorted(p.name for p in settings.state_dir.iterdir()) == [
"app--OK",
"app--STALE",
]
Loading