diff --git a/etc/_devbase b/etc/_devbase index c5d0f14..b7944dd 100644 --- a/etc/_devbase +++ b/etc/_devbase @@ -74,6 +74,7 @@ _devbase() { 'edit:Open .env in editor' 'project:Setup project-specific variables' 'export:Export .env files as an encrypted bundle (age)' + 'import:Import .env bundle (age decrypt + merge)' ) plugin_subcommands=( @@ -163,6 +164,24 @@ _devbase() { '--passphrase-stdin[Read passphrase from stdin]' \ '--force-unencrypted[Write as plaintext tar.gz]' ;; + import) + _arguments \ + '1:source:_files' \ + '--merge[Merge strategy]:mode:(keep-existing prefer-incoming)' \ + '--replace-keys[Replace only these keys (comma-separated)]:keys:' \ + '--replace[Replace existing files entirely]' \ + '--dry-run[Preview changes without writing]' \ + '*--identity[age / OpenSSH private key file (repeatable)]:file:_files' \ + '--passphrase-env[Read passphrase from env var]:var:' \ + '--passphrase-stdin[Read passphrase from stdin]' \ + '*--include-project[Limit to specified project (repeatable)]:name:' \ + '*--exclude-project[Exclude project (repeatable)]:name:' \ + '--no-global[Do not import $DEVBASE_ROOT/.env]' \ + '--no-metadata[Do not import $DEVBASE_ROOT/.env.sources.yml]' \ + '--merge-metadata[Add only new sources entries from bundle]' \ + '--backup-dir[Override backup directory]:dir:_files -/' \ + '--keep-last[Keep only the last N backup directories]:n:' + ;; *) _describe -t env-commands 'env command' env_subcommands ;; diff --git a/etc/devbase-completion.bash b/etc/devbase-completion.bash index e7ef68c..81f6431 100644 --- a/etc/devbase-completion.bash +++ b/etc/devbase-completion.bash @@ -12,7 +12,7 @@ _devbase_completions() { local commands="init status shell-rc container ct env plugin pl snapshot ss up down login build ps help" local container_subcommands="up down ps login logs scale build" - local env_subcommands="init sync list set get delete edit project export" + local env_subcommands="init sync list set get delete edit project export import" local plugin_subcommands="list install uninstall update info sync repo" local repo_subcommands="add remove list refresh" local snapshot_subcommands="create list restore copy delete rotate" @@ -86,6 +86,11 @@ _devbase_completions() { COMPREPLY=($(compgen -W "--include-project --exclude-project --no-global --no-metadata --recipient --passphrase-env --passphrase-stdin --force-unencrypted" -- "$cur")) fi ;; + import) + if [[ "$cur" == -* ]]; then + COMPREPLY=($(compgen -W "--merge --replace-keys --replace --dry-run --identity --passphrase-env --passphrase-stdin --include-project --exclude-project --no-global --no-metadata --merge-metadata --backup-dir --keep-last" -- "$cur")) + fi + ;; esac fi # plugin subcommand arguments diff --git a/lib/devbase/cli.py b/lib/devbase/cli.py index 1da2160..d519db4 100644 --- a/lib/devbase/cli.py +++ b/lib/devbase/cli.py @@ -36,7 +36,7 @@ # Subcommand map for prefix resolution: {(aliases...): [subcmds]} SUBCMD_MAP = { ('container', 'ct'): ['up', 'down', 'ps', 'login', 'logs', 'scale', 'build'], - ('env',): ['init', 'sync', 'list', 'set', 'get', 'delete', 'edit', 'project', 'export'], + ('env',): ['init', 'sync', 'list', 'set', 'get', 'delete', 'edit', 'project', 'export', 'import'], ('plugin', 'pl'): ['list', 'install', 'uninstall', 'update', 'info', 'sync', 'repo'], ('snapshot', 'ss'): ['create', 'list', 'restore', 'copy', 'delete', 'rotate'], } @@ -47,6 +47,9 @@ SUBCMD_PREFIX_PREFERENCES = { ('env',): { 'e': 'edit', + # `import` 追加で `i` が `init` / `import` の両方にマッチして ambiguous に + # なるため、既存ショートカット (`devbase env i` → `init`) を維持する。 + 'i': 'init', }, } @@ -149,6 +152,55 @@ def _add_env_parser(subparsers): help='Write as plaintext tar.gz (rejected by default; ' 'warns when sensitive keys are detected)') + env_import = env_sub.add_parser( + 'import', + help='Import .env files from a bundle (age-encrypted or plaintext tar.gz)', + ) + env_import.add_argument('source', + help="Bundle path or '-' for stdin") + env_import.add_argument('--merge', choices=['keep-existing', 'prefer-incoming'], + default='keep-existing', + help=("Key-level merge mode. keep-existing (default) keeps " + "existing keys and adds new ones; prefer-incoming " + "overwrites with bundle values")) + env_import.add_argument('--replace-keys', metavar='KEYS', default='', + help=("Comma-separated keys to force-overwrite from bundle " + "(other keys behave like keep-existing). " + "Cannot be combined with --replace")) + env_import.add_argument('--replace', action='store_true', + help='Replace each target .env file wholesale (backup is taken)') + env_import.add_argument('--dry-run', action='store_true', + help='Show planned diff without writing') + env_import.add_argument('--identity', action='append', default=[], + metavar='FILE', dest='identities', + help=("age / OpenSSH private key file (repeatable). " + "Default: ~/.ssh/id_ed25519, then ~/.ssh/id_rsa " + "(first existing one)")) + env_import.add_argument('--passphrase-env', metavar='VAR', default=None, + help='Read passphrase from environment variable VAR') + env_import.add_argument('--passphrase-stdin', action='store_true', + help='Read passphrase from the first line of stdin') + env_import.add_argument('--include-project', action='append', default=None, + metavar='NAME', dest='include_projects', + help='Limit to specified project (repeatable)') + env_import.add_argument('--exclude-project', action='append', default=[], + metavar='NAME', dest='exclude_projects', + help='Exclude project (repeatable)') + env_import.add_argument('--no-global', action='store_true', + help='Do not import $DEVBASE_ROOT/.env') + env_import.add_argument('--no-metadata', action='store_true', + help='Do not import $DEVBASE_ROOT/.env.sources.yml ' + '(default behavior is reference-only copy; this fully ignores it)') + env_import.add_argument('--merge-metadata', action='store_true', + help='Merge new source entries into existing .env.sources.yml ' + '(machine-specific fields are preserved as-is from bundle; ' + 'run `devbase env sync` after import to refresh)') + env_import.add_argument('--backup-dir', metavar='DIR', default=None, + help='Override backup directory ' + '(default: $DEVBASE_ROOT/backups/env-import/)') + env_import.add_argument('--keep-last', type=int, default=10, metavar='N', + help='Keep only the last N backup directories (default: 10, 0 to disable)') + def _add_plugin_parser(subparsers): """Plugin group parser""" diff --git a/lib/devbase/commands/env.py b/lib/devbase/commands/env.py index a324fbd..3b43598 100644 --- a/lib/devbase/commands/env.py +++ b/lib/devbase/commands/env.py @@ -34,6 +34,7 @@ def cmd_env(devbase_root: Path, args) -> int: 'edit': lambda: cmd_env_edit(devbase_root), 'project': lambda: cmd_env_project(devbase_root), 'export': lambda: cmd_env_export(devbase_root, args), + 'import': lambda: cmd_env_import(devbase_root, args), } handler = handlers.get(subcmd) @@ -401,6 +402,33 @@ def cmd_env_export(devbase_root: Path, args) -> int: return export(devbase_root, opts) +def cmd_env_import(devbase_root: Path, args) -> int: + """devbase env import""" + from devbase.env.io_import import ImportOptions, import_bundle + + replace_keys_arg = getattr(args, 'replace_keys', '') or '' + replace_keys = [k.strip() for k in replace_keys_arg.split(',') if k.strip()] + + opts = ImportOptions( + source=getattr(args, 'source'), + merge=getattr(args, 'merge', 'keep-existing'), + replace_keys=replace_keys, + replace=getattr(args, 'replace', False), + dry_run=getattr(args, 'dry_run', False), + identities=list(getattr(args, 'identities', []) or []), + passphrase_env=getattr(args, 'passphrase_env', None), + passphrase_stdin=getattr(args, 'passphrase_stdin', False), + include_projects=getattr(args, 'include_projects', None), + exclude_projects=list(getattr(args, 'exclude_projects', []) or []), + include_global=not getattr(args, 'no_global', False), + include_metadata=not getattr(args, 'no_metadata', False), + merge_metadata=getattr(args, 'merge_metadata', False), + backup_dir=getattr(args, 'backup_dir', None), + keep_last=getattr(args, 'keep_last', 10), + ) + return import_bundle(devbase_root, opts) + + def _update_source_metadata(devbase_root: Path, env_file: EnvFile) -> None: """ソースメタデータを更新する""" sources = SourcesManager(devbase_root) diff --git a/lib/devbase/env/io_export.py b/lib/devbase/env/io_export.py index 2da2c2c..101ceb2 100644 --- a/lib/devbase/env/io_export.py +++ b/lib/devbase/env/io_export.py @@ -2,8 +2,10 @@ from __future__ import annotations +import getpass import os import re +import sys from dataclasses import dataclass, field from datetime import datetime from pathlib import Path @@ -69,11 +71,13 @@ def _read_passphrase(opts: ExportOptions) -> Optional[str]: ) return value if opts.passphrase_stdin: - import sys - # tty で対話実行している場合、ユーザーが「ハングしている」と誤解しないよう - # stderr へプロンプトを出してから stdin を待つ (パイプ入力時は出さない)。 + # tty で対話実行している場合は getpass.getpass でエコー抑止 + # (パイプ入力時は echo の概念がないので従来どおり stdin.readline で読む)。 if sys.stdin.isatty(): - print("passphrase: ", end='', file=sys.stderr, flush=True) + try: + return getpass.getpass("passphrase: ", stream=sys.stderr) + except EOFError as e: + raise ExportError("stdin からパスフレーズを読み取れませんでした") from e line = sys.stdin.readline() if not line: raise ExportError("stdin からパスフレーズを読み取れませんでした") diff --git a/lib/devbase/env/io_import.py b/lib/devbase/env/io_import.py new file mode 100644 index 0000000..5f025b1 --- /dev/null +++ b/lib/devbase/env/io_import.py @@ -0,0 +1,711 @@ +"""devbase env import の高レベル実装 + +責務: + - SOURCE (file / stdio) の読み込み + - age 復号 (バンドルが暗号化されていれば) + - tar.gz バンドルの展開と sha256 / manifest version の検証 (bundle.unpack) + - --merge / --replace-keys / --replace のセマンティクスで .env 群を更新 + - .env.sources.yml は既定で上書きせず参照用コピーのみ (--merge-metadata で + 新規 source のみ追加) + - 2 フェーズ書き出し (prepare → commit) で部分適用を最小化 + - --backup-dir / --keep-last N で backup を GC + - --dry-run で差分プレビュー +""" + +from __future__ import annotations + +import getpass +import os +import re +import shutil +import sys +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional, Sequence, Tuple + +import yaml + +from devbase.errors import DevbaseError +from devbase.log import get_logger + +from devbase.env import bundle as _bundle +from devbase.env import cipher as _cipher +from devbase.env import storage as _storage +from devbase.env.store import EnvEntry, EnvFile + +logger = get_logger(__name__) + +# gzip magic. tar.gz バンドルは先頭 2 byte が 0x1f 0x8b。age 暗号化済みは +# テキストヘッダ "age-encryption.org/v1\n" で始まるため magic で識別できる。 +_GZIP_MAGIC = b'\x1f\x8b' + +_MERGE_MODES = ('keep-existing', 'prefer-incoming') + +# _make_backup_dir が生成するタイムスタンプ形式のみを GC 対象にする。 +# 以下のいずれかにマッチするディレクトリのみ削除する: +# YYYYMMDD-HHMMSS (旧フォーマット, 後方互換) +# YYYYMMDD-HHMMSS-NNNNNN (microsecond 付き) +# YYYYMMDD-HHMMSS-NNNNNN-NN (同一マイクロ秒内の連番付き) +# これ以外のディレクトリは devbase が作ったものではないので削除しない +# (--backup-dir 親に無関係なディレクトリがあっても安全)。 +_BACKUP_DIR_NAME_RE = re.compile(r'^\d{8}-\d{6}(?:-\d{6}(?:-\d+)?)?$') + + +class ImportError(DevbaseError): + """import エラー""" + + +@dataclass +class ImportOptions: + source: str + merge: str = 'keep-existing' + replace_keys: List[str] = field(default_factory=list) + replace: bool = False + dry_run: bool = False + identities: List[str] = field(default_factory=list) + passphrase_env: Optional[str] = None + passphrase_stdin: bool = False + include_projects: Optional[List[str]] = None + exclude_projects: List[str] = field(default_factory=list) + include_global: bool = True + include_metadata: bool = True + merge_metadata: bool = False + backup_dir: Optional[str] = None + keep_last: int = 10 + + +@dataclass +class _Plan: + """1 ファイル分の書き出し計画""" + target: Path + arcname: str + new_bytes: bytes + # 差分サマリ (dry-run / ログ用) + added_keys: List[str] = field(default_factory=list) + overwritten_keys: List[str] = field(default_factory=list) + skipped_keys: List[str] = field(default_factory=list) + # ファイル単位の操作種別 + op: str = 'merge' # 'merge' | 'replace' | 'create' | 'sources-merge' + + +def _read_passphrase(opts: ImportOptions) -> Optional[str]: + if opts.passphrase_env: + value = os.environ.get(opts.passphrase_env) + if not value: + raise ImportError(f"環境変数 {opts.passphrase_env} が空または未設定です") + return value + if opts.passphrase_stdin: + if sys.stdin.isatty(): + try: + return getpass.getpass("passphrase: ", stream=sys.stderr) + except EOFError as e: + raise ImportError("stdin からパスフレーズを読み取れませんでした") from e + line = sys.stdin.readline() + if not line: + raise ImportError("stdin からパスフレーズを読み取れませんでした") + return line.rstrip('\n') + return None + + +def _resolve_identities(specs: Sequence[str]) -> List[str]: + if specs: + return list(specs) + for path in _cipher.default_identity_paths(): + if path.exists(): + logger.info("identity 既定鍵を使用: %s", path) + return [str(path)] + return [] + + +def _decrypt_if_needed(blob: bytes, opts: ImportOptions) -> bytes: + """先頭バイトで暗号化済みかを判定して必要なら復号する""" + if blob[:2] == _GZIP_MAGIC: + # 平文 tar.gz。鍵指定があっても無視せず警告にとどめる + if opts.identities or opts.passphrase_env or opts.passphrase_stdin: + logger.warning( + "バンドルは平文ですが identity / passphrase が指定されています " + "(使用されません)" + ) + return blob + + passphrase = _read_passphrase(opts) + if passphrase is not None: + return _cipher.decrypt(blob, passphrase=passphrase) + + identities = _resolve_identities(opts.identities) + if not identities: + raise ImportError( + "バンドルは暗号化されていますが復号キーが指定されていません。\n" + " --identity FILE age / OpenSSH 秘密鍵ファイル\n" + " --passphrase-env VAR 環境変数からパスフレーズ取得\n" + " --passphrase-stdin stdin の最初の行をパスフレーズとして使用\n" + " ~/.ssh/id_ed25519 または ~/.ssh/id_rsa があれば " + "--identity 省略時の既定として使用されます (ed25519 優先)" + ) + return _cipher.decrypt(blob, identities=identities) + + +def _filter_members(members: Dict[str, bytes], + opts: ImportOptions) -> Dict[str, bytes]: + """include/exclude 指定で展開済みメンバーを絞り込む""" + included = set(opts.include_projects) if opts.include_projects else None + excluded = set(opts.exclude_projects) + result: Dict[str, bytes] = {} + + proj_re = re.compile(r'^env/projects/([^/]+)/\.env$') + + for arcname, data in members.items(): + if arcname == 'env/global.env': + if not opts.include_global: + continue + result[arcname] = data + continue + if arcname == 'env/sources.yml': + if not opts.include_metadata: + continue + result[arcname] = data + continue + m = proj_re.match(arcname) + if m: + name = m.group(1) + if name in excluded: + continue + if included is not None and name not in included: + continue + result[arcname] = data + continue + # 他の形式は manifest 検証で拒否されているはずだが念のため + logger.debug("未対応の arcname を無視します: %s", arcname) + return result + + +def _target_for(arcname: str, devbase_root: Path) -> Path: + if arcname == 'env/global.env': + return devbase_root / '.env' + if arcname == 'env/sources.yml': + return devbase_root / '.env.sources.yml' + m = re.match(r'^env/projects/([^/]+)/\.env$', arcname) + if m: + return devbase_root / 'projects' / m.group(1) / '.env' + raise ImportError(f"未対応のバンドルエントリ: {arcname}") + + +def _merge_into_existing_bytes(existing_bytes: bytes, + merged: Dict[str, str]) -> bytes: + """既存 ``.env`` のコメント / 空行 / キー順を保持したまま、 + ``merged`` の内容で値を上書きしてバイト列化する。 + + ``merged`` のうち既存ファイルに無いキーは末尾に追加する。 + 既存ファイルにあって ``merged`` から削除されているキーは、entries からも除外して + 出力する (現状の merge ロジック上、削除されるケースは無いが安全側で対応)。 + + PR #15 gemini 指摘: ``EnvFile.dump_bytes`` で再シリアライズすると ``key=value`` + だけの出力になりコメント・空行が失われる。これを避けるため entries ベースで + 再構成する。 + """ + entries = EnvFile.parse_entries(existing_bytes) + seen: set = set() + out_entries: List[EnvEntry] = [] + for e in entries: + if e.kind == 'kv' and e.key is not None: + if e.key in merged: + # 値を merge 後のものに差し替え (key 順 / コメントは保持) + out_entries.append(EnvEntry( + kind='kv', raw=e.raw, key=e.key, value=merged[e.key] + )) + seen.add(e.key) + # merged から除外されているキーは entries からも落とす + else: + out_entries.append(e) + # 既存に無かった新規キーは末尾に append (定常的な key 順を維持するため sorted) + for key in sorted(k for k in merged if k not in seen): + out_entries.append(EnvEntry( + kind='kv', raw='', key=key, value=merged[key] + )) + return EnvFile.dump_entries_bytes(out_entries) + + +def _build_merge_plan( + target: Path, + arcname: str, + incoming_bytes: bytes, + existing_bytes: bytes, + target_exists: bool, + merged: Dict[str, str], + added: List[str], + overwritten: List[str], + skipped: List[str], +) -> _Plan: + """merge 系 (replace_keys / keep-existing / prefer-incoming) 共通の _Plan 生成。 + + - 新規作成時 (``target_exists`` が False) は ``incoming_bytes`` をそのまま採用して + 二重エスケープを回避 (PR #15 codex 指摘) + - 既存ファイルへの merge 時は ``_merge_into_existing_bytes`` でコメント / 空行 / + キー順を保持したまま値を差し替える (PR #15 gemini 指摘)。 + ``existing`` (key=value dict) の空判定で create/merge を決めると、コメント / + 空行のみで構成された既存 .env が ``incoming_bytes`` で上書きされてコメントが + 失われるため、ファイル実体の有無で判定する (PR #15 round5 指摘)。 + """ + new_bytes = (_merge_into_existing_bytes(existing_bytes, merged) + if target_exists else incoming_bytes) + return _Plan( + target=target, + arcname=arcname, + new_bytes=new_bytes, + added_keys=sorted(added), + overwritten_keys=sorted(overwritten), + skipped_keys=sorted(skipped), + op='merge' if target_exists else 'create', + ) + + +def _plan_env_merge(target: Path, incoming_bytes: bytes, + opts: ImportOptions, arcname: str) -> _Plan: + """1 つの .env に対する merge / replace 計画を作る + + 既存ファイルが無い (= create) ケースでは、バンドル側の ``incoming_bytes`` を + そのまま採用する。``EnvFile.dump_bytes`` で再シリアライズすると、export 側で + 既に escape された値を parse_bytes 経由でも完全に round-trip できる前提が + 崩れた瞬間に二重エスケープが発生するためである (PR #15 codex 指摘)。 + + 既存ファイルが存在する merge 経路では ``_merge_into_existing_bytes`` で + 既存のコメント / 空行 / キー順を保持したまま値だけ差し替える + (PR #15 gemini 指摘)。 + + 各分岐で重複していた ``new_bytes`` 生成と ``_Plan`` 構築は + ``_build_merge_plan`` に括り出している (PR #15 gemini round4 指摘)。 + """ + incoming = EnvFile.parse_bytes(incoming_bytes) + existing: Dict[str, str] = {} + existing_bytes: bytes = b'' + target_exists = target.exists() + if target_exists: + existing_bytes = target.read_bytes() + existing = EnvFile.parse_bytes(existing_bytes) + + if opts.replace: + added = sorted(set(incoming) - set(existing)) + overwritten = sorted(k for k in incoming if k in existing and incoming[k] != existing[k]) + # replace は バンドル側の値で完全に置き換える (merge 経路と別系統) + # op 判定は ``existing`` (key=value dict) ではなくファイル実体の有無で行う: + # コメントのみの既存 .env を 'create' と誤判定しないため (PR #15 round5 指摘)。 + return _Plan( + target=target, + arcname=arcname, + new_bytes=incoming_bytes, + added_keys=added, + overwritten_keys=overwritten, + skipped_keys=[], + op='replace' if target_exists else 'create', + ) + + merged: Dict[str, str] = dict(existing) + added: List[str] = [] + overwritten: List[str] = [] + skipped: List[str] = [] + + if opts.replace_keys: + replace_set = set(opts.replace_keys) + for key, value in incoming.items(): + if key in replace_set: + if key in existing: + if existing[key] != value: + overwritten.append(key) + merged[key] = value + else: + added.append(key) + merged[key] = value + else: + # --replace-keys 指定外のキーは keep-existing 相当: + # 既存にあれば残し、無ければ新規追加 (skipped は overwrite を + # 抑止した = 上書きしなかったキーのみ)。 + if key in existing: + if existing[key] != value: + skipped.append(key) + else: + added.append(key) + merged[key] = value + elif opts.merge == 'keep-existing': + for key, value in incoming.items(): + if key in existing: + skipped.append(key) + else: + merged[key] = value + added.append(key) + elif opts.merge == 'prefer-incoming': + for key, value in incoming.items(): + if key in existing: + if existing[key] != value: + overwritten.append(key) + merged[key] = value + else: + merged[key] = value + added.append(key) + else: + raise ImportError(f"不明な --merge モード: {opts.merge!r}") + + return _build_merge_plan( + target=target, + arcname=arcname, + incoming_bytes=incoming_bytes, + existing_bytes=existing_bytes, + target_exists=target_exists, + merged=merged, + added=added, + overwritten=overwritten, + skipped=skipped, + ) + + +def _plan_sources(target: Path, incoming_bytes: bytes, + opts: ImportOptions) -> Optional[_Plan]: + """.env.sources.yml の取り扱い計画 + + 既定: 上書きせず None を返す (backup_dir に参照用コピーのみ書く)。 + --merge-metadata: 新規 source エントリのみ追加した内容で更新する。 + """ + if not opts.merge_metadata: + # 上書きしないので _Plan は返さない。参照用 copy は run() 側で処理。 + return None + + try: + incoming = yaml.safe_load(incoming_bytes) or {} + except yaml.YAMLError as e: + raise ImportError(f"バンドルの sources.yml が壊れています: {e}") from e + if not isinstance(incoming, dict): + raise ImportError("バンドルの sources.yml が dict ではありません") + incoming_sources = incoming.get('sources') or {} + if not isinstance(incoming_sources, dict): + raise ImportError("バンドルの sources.yml の sources が dict ではありません") + + existing: Dict = {} + if target.exists(): + try: + existing = yaml.safe_load(target.read_bytes()) or {} + except yaml.YAMLError as e: + raise ImportError( + f"既存の {target.name} のパースに失敗しました: {e}" + ) from e + if not isinstance(existing, dict): + existing = {} + existing.setdefault('sources', {}) + if not isinstance(existing['sources'], dict): + existing['sources'] = {} + + added: List[str] = [] + merged_sources = dict(existing['sources']) + for name, entry in incoming_sources.items(): + if name in merged_sources: + continue + merged_sources[name] = entry + added.append(name) + + if not added: + return None # 変化なし + + existing['sources'] = merged_sources + new_bytes = yaml.safe_dump( + existing, default_flow_style=False, allow_unicode=True + ).encode('utf-8') + return _Plan( + target=target, + arcname='env/sources.yml', + new_bytes=new_bytes, + added_keys=sorted(added), + overwritten_keys=[], + skipped_keys=[], + op='sources-merge', + ) + + +def _make_backup_dir(devbase_root: Path, opts: ImportOptions) -> Path: + """バックアップディレクトリを作成する。 + + 秒精度のみだと同一秒に 2 回 import を走らせたときに同じディレクトリを再利用して + 前回バックアップを上書きしてしまうため、microsecond + 連番を付与して衝突を回避する + (PR #15 codex 指摘)。 + """ + if opts.backup_dir: + base = Path(opts.backup_dir).expanduser() + else: + base = devbase_root / 'backups' / 'env-import' + base.mkdir(parents=True, exist_ok=True) + + now = datetime.now() + stem = now.strftime('%Y%m%d-%H%M%S-%f') # microsecond まで + path = base / stem + if not path.exists(): + path.mkdir(parents=True) + return path + # 同一マイクロ秒に複数回走った場合の安全弁: 連番を付与 + for n in range(1, 1000): + candidate = base / f'{stem}-{n:02d}' + if not candidate.exists(): + candidate.mkdir(parents=True) + return candidate + raise ImportError( + f"backup ディレクトリの衝突回避に失敗しました (base={base}, stem={stem})" + ) + + +def _backup_existing(plans: Sequence[_Plan], sources_copy: Optional[Tuple[Path, bytes]], + backup_dir: Path, devbase_root: Path) -> None: + """phase 1 前に既存ファイルの内容を backup_dir にコピーする""" + for plan in plans: + if not plan.target.exists(): + continue + try: + relative = plan.target.relative_to(devbase_root) + except ValueError: + relative = Path(plan.target.name) + dst = backup_dir / relative + dst.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(plan.target, dst) + + # バンドルに含まれていた sources.yml の参照用コピー (上書きしないケース) + if sources_copy is not None: + target, data = sources_copy + dst = backup_dir / 'sources.yml.imported' + dst.parent.mkdir(parents=True, exist_ok=True) + dst.write_bytes(data) + try: + os.chmod(dst, 0o600) + except OSError: + pass + + +def _write_atomic(plan: _Plan) -> Path: + """phase 1: 新内容を .import.tmp として書き出す (0600)""" + tmp = plan.target.with_suffix(plan.target.suffix + '.import.tmp') + tmp.parent.mkdir(parents=True, exist_ok=True) + if tmp.exists(): + # 過去の失敗の残骸を掃除 + try: + tmp.unlink() + except OSError: + pass + flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC + fd = os.open(tmp, flags, 0o600) + try: + with os.fdopen(fd, 'wb') as f: + f.write(plan.new_bytes) + except BaseException: + try: + os.close(fd) + except OSError: + pass + raise + try: + os.chmod(tmp, 0o600) + except OSError: + pass + return tmp + + +def _commit(plans_and_tmps: List[Tuple[_Plan, Path]], backup_dir: Path, + devbase_root: Path) -> List[Path]: + """phase 2: tmp → target に rename。 + + 途中失敗時は best-effort で rollback したうえで、まだ rename されていない + 残りの ``.import.tmp`` ファイルもクリーンアップする (PR #15 gemini 指摘)。 + """ + committed: List[Tuple[_Plan, Path]] = [] + remaining_tmps = [tmp for _, tmp in plans_and_tmps] + try: + for idx, (plan, tmp) in enumerate(plans_and_tmps): + os.replace(tmp, plan.target) + try: + os.chmod(plan.target, 0o600) + except OSError: + pass + committed.append((plan, plan.target)) + # rename 済みの tmp は残らないが、リストから除外して後続 cleanup を簡潔に + remaining_tmps[idx] = None # type: ignore[call-overload] + except OSError as e: + logger.error("commit フェーズで失敗しました: %s", e) + try: + _rollback(committed, backup_dir, devbase_root) + finally: + # rename 前で残っている .import.tmp を後始末 + _cleanup_tmps([t for t in remaining_tmps if t is not None]) + raise ImportError(f"commit フェーズで失敗しました: {e}") from e + return [t for _, t in committed] + + +def _rollback(committed: Sequence[Tuple[_Plan, Path]], backup_dir: Path, + devbase_root: Path) -> None: + """best-effort ロールバック: + - 既存上書き (backup あり) → backup から復元 + - backup が無いケース → 元ファイルが存在しなかった (= 新規作成) と + みなして unlink し、元の「不在」状態に戻す。``op='create'`` だけでなく + ``op='sources-merge'`` で sources.yml を新規作成したケースもここで + unlink する (PR #15 gemini 指摘)。 + + ``_backup_existing`` は target が存在した場合のみ backup を作る。よって + 「backup が無い」事実は「元ファイルが存在しなかった」ことを示している。 + """ + for plan, target in committed: + try: + relative = target.relative_to(devbase_root) + except ValueError: + relative = Path(target.name) + src = backup_dir / relative + if src.exists(): + try: + shutil.copy2(src, target) + logger.warning("rollback: %s を %s から復元しました", target, src) + except OSError as e: + logger.error("rollback 失敗: %s -> %s: %s", src, target, e) + else: + # 元ファイル不在 → 新規作成された target を unlink して元の状態に戻す + try: + target.unlink() + logger.warning("rollback: 新規作成された %s を削除しました", target) + except FileNotFoundError: + pass + except OSError as e: + logger.error("rollback unlink 失敗: %s: %s", target, e) + + +def _cleanup_tmps(tmps: Sequence[Path]) -> None: + for tmp in tmps: + try: + if tmp.exists(): + tmp.unlink() + except OSError: + pass + + +def _gc_backups(backup_dir: Path, keep_last: int) -> None: + """backup_dir の親ディレクトリ内の古い backup を keep_last 個まで残して GC する。 + + 安全性のため、削除対象は devbase が生成するタイムスタンプ形式 + (YYYYMMDD-HHMMSS) のディレクトリに限定する。--backup-dir で指定された + 親ディレクトリに無関係なファイル/ディレクトリがあっても、それらは触らない。 + """ + if keep_last <= 0: + return + parent = backup_dir.parent + if not parent.is_dir(): + return + siblings = sorted( + (p for p in parent.iterdir() + if p.is_dir() and _BACKUP_DIR_NAME_RE.match(p.name)), + key=lambda p: p.name, + ) + if len(siblings) <= keep_last: + return + to_remove = siblings[:-keep_last] + for d in to_remove: + try: + shutil.rmtree(d) + logger.info("古い backup を削除しました: %s", d) + except OSError as e: + logger.warning("backup 削除に失敗 (%s): %s", d, e) + + +def _log_plans(plans: Sequence[_Plan], dry_run: bool) -> None: + prefix = "[dry-run] " if dry_run else "" + for plan in plans: + logger.info( + "%s%s: %s (+%d add / ~%d overwrite / -%d skip)", + prefix, plan.op, plan.target, + len(plan.added_keys), len(plan.overwritten_keys), len(plan.skipped_keys), + ) + if plan.added_keys: + logger.info(" added: %s", ", ".join(plan.added_keys)) + if plan.overwritten_keys: + logger.info(" overwrite: %s", ", ".join(plan.overwritten_keys)) + if plan.skipped_keys: + logger.info(" skip (existing kept): %s", ", ".join(plan.skipped_keys)) + + +def import_bundle(devbase_root: Path, opts: ImportOptions) -> int: + """import 本体。CLI ハンドラから呼ばれる""" + # 引数の早期検証 + if opts.merge not in _MERGE_MODES: + raise ImportError( + f"--merge の値が不正です: {opts.merge!r} (許可: {', '.join(_MERGE_MODES)})" + ) + if opts.replace and opts.replace_keys: + raise ImportError("--replace と --replace-keys は併用できません") + if opts.passphrase_stdin and opts.source == '-': + raise ImportError( + "SOURCE='-' (stdin) と --passphrase-stdin は併用できません " + "(stdin が衝突します)" + ) + if opts.passphrase_env and opts.passphrase_stdin: + raise ImportError("--passphrase-env と --passphrase-stdin は併用できません") + + # SOURCE 読み込み + backend = _storage.resolve(opts.source) + blob = backend.read_bytes(opts.source) + logger.debug("読み込みサイズ: %d bytes", len(blob)) + + # 復号 (必要なら) + 展開 + manifest 検証 (sha256 / version) + tar_blob = _decrypt_if_needed(blob, opts) + manifest, members = _bundle.unpack(tar_blob) + logger.info("バンドル version=%s, 生成=%s, devbase=%s", + manifest.get('version'), manifest.get('created_at'), + manifest.get('devbase_version')) + + filtered = _filter_members(members, opts) + if not filtered: + raise ImportError( + "import 対象がありません " + "(--no-global / --include-project の指定とバンドル内容を確認してください)" + ) + + # 計画作成 + plans: List[_Plan] = [] + sources_reference: Optional[Tuple[Path, bytes]] = None + for arcname, data in sorted(filtered.items()): + target = _target_for(arcname, devbase_root) + if arcname == 'env/sources.yml': + plan = _plan_sources(target, data, opts) + if plan is not None: + plans.append(plan) + else: + # 既定動作: 上書きしないので参照用 copy のみバックアップする + sources_reference = (target, data) + else: + plans.append(_plan_env_merge(target, data, opts, arcname)) + + _log_plans(plans, opts.dry_run) + if sources_reference is not None and not opts.merge_metadata: + logger.info( + "%ssources.yml は上書きしません (--merge-metadata 指定時のみ更新, " + "参照用コピーを backup ディレクトリに残します)", + "[dry-run] " if opts.dry_run else "", + ) + + if opts.dry_run: + logger.info("[dry-run] 書き込みは行いません") + return 0 + + if not plans and sources_reference is None: + logger.info("変更はありません") + return 0 + + # backup → phase 1 (tmp 書き出し) → phase 2 (rename) + backup_dir = _make_backup_dir(devbase_root, opts) + logger.info("backup ディレクトリ: %s", backup_dir) + _backup_existing(plans, sources_reference, backup_dir, devbase_root) + + tmps: List[Path] = [] + plans_and_tmps: List[Tuple[_Plan, Path]] = [] + try: + for plan in plans: + tmp = _write_atomic(plan) + tmps.append(tmp) + plans_and_tmps.append((plan, tmp)) + except Exception: + _cleanup_tmps(tmps) + raise + + _commit(plans_and_tmps, backup_dir, devbase_root) + logger.info("import 完了: %d ファイル更新", len(plans)) + + _gc_backups(backup_dir, opts.keep_last) + return 0 diff --git a/lib/devbase/env/store.py b/lib/devbase/env/store.py index 057fe0e..13eb6b3 100644 --- a/lib/devbase/env/store.py +++ b/lib/devbase/env/store.py @@ -1,9 +1,13 @@ """Environment variable file store""" +from __future__ import annotations + import os +import re import shutil +from dataclasses import dataclass from pathlib import Path -from typing import Optional, Dict, Union +from typing import Optional, Dict, Union, List from devbase.log import get_logger @@ -49,6 +53,52 @@ def collect_key(env_file, key, *, auto_value=None, prompt=None, mask_after=10): return False +@dataclass +class EnvEntry: + """``.env`` ファイルの 1 行を表すトークン。 + + - ``kind='kv'`` のとき ``key`` / ``value`` が有効 (``raw`` は元の行全体) + - ``kind='comment'`` / ``kind='blank'`` のとき ``raw`` のみ有効 + + コメント・空行を保持してマージ出力するために使う (PR #15 gemini 指摘)。 + """ + kind: str # 'kv' | 'comment' | 'blank' + raw: str = '' + key: Optional[str] = None + value: Optional[str] = None + + +# ``EnvFile.dump_bytes`` / :meth:`EnvFile.save` で値を quote する閾値となる文字集合。 +# シェル ``source`` 時に展開・解釈されうる metachar をすべて含める。``$`` を含む値も +# ``\$`` にエスケープして出力するため、ここで quoting 対象として捕捉する +# (PR #15 gemini 指摘)。 +_NEEDS_QUOTE_CHARS = (' ', '"', "'", '$', '`', '\\', '<', '>', '|', '&', ';', + '(', ')', '#') + + +def _escape_double_quoted(value: str) -> str: + """``"..."`` 内で安全な escape を施す。 + + - ``\\`` → ``\\\\`` + - ``"`` → ``\\"`` + - ``\n`` → ``\\n`` (改行をリテラル化) + - ``$`` → ``\\$`` (``.env`` を ``source`` した際の変数展開を抑止) + """ + return (value.replace('\\', '\\\\') + .replace('"', '\\"') + .replace('\n', '\\n') + .replace('$', '\\$')) + + +# ``_unescape_double_quoted`` 用の逆引きテーブル。``re.sub(r'\\.', ...)`` で +# マッチした 2 文字を 1 文字に置換する。未知エスケープ (例: ``\x``) は +# バックスラッシュごとそのまま残す必要があるため、``dict.get`` の default に +# マッチ文字列自身を返す。末尾単独 ``\`` は ``\\.`` のドットが 2 文字目を +# 要求するため自然にマッチせず、そのまま保持される。 +_DOUBLE_QUOTE_UNESCAPES = {'\\\\': '\\', '\\"': '"', '\\n': '\n', '\\$': '$'} +_DOUBLE_QUOTE_UNESCAPE_RE = re.compile(r'\\.') + + class EnvFile: """ .envファイルの読み書き・バックアップ・バリデーションを管理する。 @@ -59,48 +109,114 @@ def __init__(self, file_path: Union[str, Path]): self._data: Dict[str, str] = {} self._loaded = False + @staticmethod + def parse_bytes(data: bytes) -> Dict[str, str]: + """bytes 列を load と同じ規則でパースして dict を返す (ファイル不要) + + ``save`` / :meth:`EnvFile.dump_bytes` の inverse として振る舞うため、 + ダブルクオート内の ``\\\\`` / ``\\"`` / ``\\n`` / ``\\$`` を unescape する。 + formatter と round-trip 整合性が取れていないと、parse → format で + 二重エスケープが発生する (PR #15 codex 指摘)。 + """ + result: Dict[str, str] = {} + for entry in EnvFile.parse_entries(data): + if entry.kind == 'kv' and entry.key is not None: + result[entry.key] = entry.value or '' + return result + + @staticmethod + def parse_entries(data: bytes) -> List[EnvEntry]: + """``.env`` の各行をトークン化して返す。 + + コメント (``#`` 始まり) と空行は ``EnvEntry(kind='comment'|'blank', raw=...)`` + として保持される。これにより merge 出力時に元のコメント/空白構造を残せる + (PR #15 gemini 指摘)。 + """ + entries: List[EnvEntry] = [] + for raw_line in data.decode('utf-8').splitlines(): + stripped = raw_line.strip() + if not stripped: + entries.append(EnvEntry(kind='blank', raw=raw_line)) + continue + if stripped.startswith('#'): + entries.append(EnvEntry(kind='comment', raw=raw_line)) + continue + if '=' not in stripped: + # ``key=value`` 形式でない行は (滅多に無いが) 原文保持する + entries.append(EnvEntry(kind='comment', raw=raw_line)) + continue + key, _, value = stripped.partition('=') + key = key.strip() + value = value.strip() + if len(value) >= 2 and value[0] == value[-1] and value[0] in ('"', "'"): + quote = value[0] + value = value[1:-1] + if quote == '"': + value = EnvFile._unescape_double_quoted(value) + entries.append(EnvEntry(kind='kv', raw=raw_line, key=key, value=value)) + return entries + + @staticmethod + def _unescape_double_quoted(s: str) -> str: + """``save`` が double-quote 値に対して施した escape を 1 パスで戻す。 + + 単純な逐次 ``replace`` は ``"\\\\n"`` (リテラル ``\\\\`` + ``n``) と + ``"\\n"`` (改行) の区別が付かないため、``\\`` を一括で捉える + ``re.sub`` + 逆引き辞書で処理する。 + """ + return _DOUBLE_QUOTE_UNESCAPE_RE.sub( + lambda m: _DOUBLE_QUOTE_UNESCAPES.get(m.group(0), m.group(0)), s + ) + + @staticmethod + def _format_kv_line(key: str, value: str) -> str: + """1 つの ``key=value`` を ``.env`` 行 (末尾 ``\\n`` 含む) にフォーマットする""" + needs_quote = ( + '\n' in value + or any(c in value for c in _NEEDS_QUOTE_CHARS) + ) + if needs_quote: + return f'{key}="{_escape_double_quoted(value)}"\n' + return f'{key}={value}\n' + + @staticmethod + def dump_bytes(data: Dict[str, str]) -> bytes: + """``save`` と同一フォーマットで dict をバイト列化する (ファイル不要)。 + + ``io_import`` 側でも merge 結果を bytes として持つ必要があるため、 + フォーマット規則を 1 箇所 (このメソッド) に集約する (PR #15 gemini 指摘)。 + """ + lines = [EnvFile._format_kv_line(k, data[k]) for k in sorted(data)] + return ''.join(lines).encode('utf-8') + + @staticmethod + def dump_entries_bytes(entries: List[EnvEntry]) -> bytes: + """``parse_entries`` で得た entries を ``.env`` バイト列に戻す。 + + ``kv`` エントリは現在の ``value`` を ``dump_bytes`` と同じ規則で再フォーマット + する。``comment`` / ``blank`` は ``raw`` をそのまま保持して出力する。 + """ + lines: List[str] = [] + for e in entries: + if e.kind == 'kv' and e.key is not None: + lines.append(EnvFile._format_kv_line(e.key, e.value or '')) + else: + lines.append(e.raw + '\n') + return ''.join(lines).encode('utf-8') + def load(self) -> Dict[str, str]: """ファイルを読み込みkey=valueをパースする""" - self._data = {} - if not self.file_path.exists(): - self._loaded = True - return self._data - - with open(self.file_path, 'r', encoding='utf-8') as f: - for line in f: - line = line.strip() - - if not line or line.startswith('#'): - continue - - if '=' not in line: - continue - - key, _, value = line.partition('=') - key = key.strip() - value = value.strip() - - if value and value[0] == value[-1] and value[0] in ('"', "'"): - value = value[1:-1] - - self._data[key] = value - + self._data = {} + else: + self._data = self.parse_bytes(self.file_path.read_bytes()) self._loaded = True return self._data def save(self) -> None: """現在のデータを.envファイルに保存する""" self.file_path.parent.mkdir(parents=True, exist_ok=True) - - with open(self.file_path, 'w', encoding='utf-8') as f: - for key, value in sorted(self._data.items()): - if '\n' in value or any(c in value for c in (' ', '"', "'", '$', '`', '\\', '<', '>', '|', '&', ';', '(', ')', '#')): - value = value.replace('\\', '\\\\').replace('"', '\\"').replace('\n', '\\n') - f.write(f'{key}="{value}"\n') - else: - f.write(f'{key}={value}\n') - + self.file_path.write_bytes(self.dump_bytes(self._data)) os.chmod(self.file_path, 0o600) def backup(self) -> Optional[Path]: diff --git a/tests/cli/test_env_export.py b/tests/cli/test_env_export.py index 18b89fe..dd44b47 100644 --- a/tests/cli/test_env_export.py +++ b/tests/cli/test_env_export.py @@ -93,29 +93,58 @@ def test_export_rejects_both_passphrase_env_and_stdin(fake_root): dest="/dev/null", passphrase_env="X", passphrase_stdin=True)) -def test_read_passphrase_shows_prompt_on_tty(monkeypatch, capsys): - """tty 入力時は stderr に 'passphrase: ' プロンプトを表示する""" - fake_stdin = io.StringIO("hunter2\n") +def test_read_passphrase_uses_getpass_on_tty(monkeypatch): + """tty 入力時は getpass.getpass を使い stdin.readline は呼ばない (エコー抑止)""" + fake_stdin = io.StringIO("should-not-be-read\n") monkeypatch.setattr(fake_stdin, "isatty", lambda: True, raising=False) monkeypatch.setattr("sys.stdin", fake_stdin) + calls = {} + + def fake_getpass(prompt='', stream=None): + calls['prompt'] = prompt + calls['stream'] = stream + return "hunter2" + + monkeypatch.setattr("devbase.env.io_export.getpass.getpass", fake_getpass) + pw = _read_passphrase(ExportOptions(passphrase_stdin=True)) assert pw == "hunter2" - err = capsys.readouterr().err - assert "passphrase: " in err + assert calls['prompt'] == "passphrase: " + assert fake_stdin.read() == "should-not-be-read\n" # stdin は消費されていない -def test_read_passphrase_no_prompt_on_pipe(monkeypatch, capsys): - """パイプ (非 tty) 入力時はプロンプトを出さない""" +def test_read_passphrase_falls_back_to_stdin_on_pipe(monkeypatch, capsys): + """パイプ (非 tty) 入力時は getpass を使わず stdin.readline で読む""" fake_stdin = io.StringIO("hunter2\n") monkeypatch.setattr(fake_stdin, "isatty", lambda: False, raising=False) monkeypatch.setattr("sys.stdin", fake_stdin) + def fail_getpass(*args, **kwargs): + raise AssertionError("getpass.getpass should not be called for piped stdin") + + monkeypatch.setattr("devbase.env.io_export.getpass.getpass", fail_getpass) + pw = _read_passphrase(ExportOptions(passphrase_stdin=True)) assert pw == "hunter2" assert "passphrase" not in capsys.readouterr().err +def test_read_passphrase_tty_eof_raises_export_error(monkeypatch): + """tty で getpass が EOFError を投げた場合は ExportError に変換される""" + fake_stdin = io.StringIO("") + monkeypatch.setattr(fake_stdin, "isatty", lambda: True, raising=False) + monkeypatch.setattr("sys.stdin", fake_stdin) + + def raise_eof(*args, **kwargs): + raise EOFError() + + monkeypatch.setattr("devbase.env.io_export.getpass.getpass", raise_eof) + + with pytest.raises(ExportError, match="パスフレーズを読み取れません"): + _read_passphrase(ExportOptions(passphrase_stdin=True)) + + def test_export_with_passphrase_env(fake_root, tmp_path, monkeypatch): dest = tmp_path / "out.dbenv" monkeypatch.setenv("DEVBASE_TEST_PASS", "s3cr3t") diff --git a/tests/cli/test_env_import.py b/tests/cli/test_env_import.py new file mode 100644 index 0000000..f509895 --- /dev/null +++ b/tests/cli/test_env_import.py @@ -0,0 +1,941 @@ +"""devbase env import の統合テスト (擬似 DEVBASE_ROOT で round-trip / merge / replace / dry-run)""" + +from __future__ import annotations + +import io +import os +from pathlib import Path +from typing import Tuple + +import pyrage +import pytest + +from devbase.env import bundle, cipher +from devbase.env.io_export import ExportOptions, export +from devbase.env.io_import import ( + ImportError as ImportBundleError, + ImportOptions, + _read_passphrase, + import_bundle, +) + + +@pytest.fixture +def fake_root(tmp_path): + """export 用の擬似 DEVBASE_ROOT (PR1 と同じ構造)""" + root = tmp_path / "src-root" + (root / "projects" / "alpha").mkdir(parents=True) + (root / "projects" / "beta").mkdir(parents=True) + (root / ".env").write_text("AWS_CONFIG_BASE64=AAAA\nGLOBAL=1\n") + (root / ".env.sources.yml").write_text( + "sources:\n aws:\n type: tar_base64\n hash: deadbeef\n" + ) + (root / "projects" / "alpha" / ".env").write_text("ALPHA_API_KEY=xyz\n") + (root / "projects" / "beta" / ".env").write_text("BETA_DB_PASSWORD=p\n") + return root + + +@pytest.fixture +def dest_root(tmp_path): + """import 先の擬似 DEVBASE_ROOT (空 or 既存ファイルあり)""" + root = tmp_path / "dst-root" + (root / "projects" / "alpha").mkdir(parents=True) + (root / "projects" / "beta").mkdir(parents=True) + return root + + +@pytest.fixture +def age_keys(tmp_path): + identity = pyrage.x25519.Identity.generate() + pub_file = tmp_path / "age.pub" + pub_file.write_text(str(identity.to_public()) + "\n") + id_file = tmp_path / "age.key" + id_file.write_text(str(identity)) + return pub_file, id_file + + +def _export_bundle(fake_root: Path, age_keys: Tuple[Path, Path], + tmp_path: Path) -> Path: + pub_file, _ = age_keys + dest = tmp_path / "out.dbenv" + rc = export(fake_root, ExportOptions( + dest=str(dest), recipients=[f"@{pub_file}"])) + assert rc == 0 + return dest + + +def test_import_roundtrip_creates_files_with_0600(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)])) + assert rc == 0 + + # global と各 project の .env が復元されている + assert (dest_root / ".env").read_text() == "AWS_CONFIG_BASE64=AAAA\nGLOBAL=1\n" + assert (dest_root / "projects" / "alpha" / ".env").read_text() == "ALPHA_API_KEY=xyz\n" + assert (dest_root / "projects" / "beta" / ".env").read_text() == "BETA_DB_PASSWORD=p\n" + + # パーミッションが 0600 + assert (dest_root / ".env").stat().st_mode & 0o777 == 0o600 + assert (dest_root / "projects" / "alpha" / ".env").stat().st_mode & 0o777 == 0o600 + + # sources.yml は既定では上書きしないので存在しない + assert not (dest_root / ".env.sources.yml").exists() + # backup ディレクトリに参照用 sources.yml.imported が残る + backup_root = dest_root / "backups" / "env-import" + assert backup_root.is_dir() + sub = next(backup_root.iterdir()) + assert (sub / "sources.yml.imported").exists() + + +def test_import_dry_run_does_not_modify(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + # 既存ファイルを置く + (dest_root / ".env").write_text("EXISTING=keep\n") + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], dry_run=True)) + assert rc == 0 + + # 元の .env は変更されていない + assert (dest_root / ".env").read_text() == "EXISTING=keep\n" + # backup も作られない + assert not (dest_root / "backups").exists() + + +def test_import_keep_existing_only_adds_new_keys(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + (dest_root / ".env").write_text("AWS_CONFIG_BASE64=OLD\nKEEP=this\n") + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)])) + assert rc == 0 + + text = (dest_root / ".env").read_text() + # 既存の AWS_CONFIG_BASE64 は OLD のまま (keep-existing) + assert "AWS_CONFIG_BASE64=OLD" in text + # 新規キー GLOBAL=1 は追加される + assert "GLOBAL=1" in text + # 既存キー KEEP は残る + assert "KEEP=this" in text + + +def test_import_prefer_incoming_overwrites_existing(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + (dest_root / ".env").write_text("AWS_CONFIG_BASE64=OLD\nKEEP=this\n") + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + merge='prefer-incoming')) + assert rc == 0 + + text = (dest_root / ".env").read_text() + # バンドル側で上書きされる + assert "AWS_CONFIG_BASE64=AAAA" in text + # incoming に無い既存キーは残る + assert "KEEP=this" in text + + +def test_import_replace_keys_only_overwrites_specified(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + (dest_root / ".env").write_text("AWS_CONFIG_BASE64=OLD\nGLOBAL=KEEP\n") + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + replace_keys=['AWS_CONFIG_BASE64'])) + assert rc == 0 + + text = (dest_root / ".env").read_text() + assert "AWS_CONFIG_BASE64=AAAA" in text # 上書きされる + assert "GLOBAL=KEEP" in text # 指定外なので keep + + +def test_import_replace_takes_backup_and_replaces(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + (dest_root / ".env").write_text("OLD=value\n") + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], replace=True)) + assert rc == 0 + + text = (dest_root / ".env").read_text() + assert "AWS_CONFIG_BASE64=AAAA" in text + assert "GLOBAL=1" in text + assert "OLD=value" not in text # 完全に置き換わる + + backup_root = dest_root / "backups" / "env-import" + sub = next(backup_root.iterdir()) + assert (sub / ".env").read_text() == "OLD=value\n" + + +def test_import_rejects_replace_with_replace_keys(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + with pytest.raises(ImportBundleError, match="--replace と --replace-keys"): + import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + replace=True, replace_keys=['A'])) + + +def test_import_rejects_stdin_with_passphrase_stdin(dest_root): + with pytest.raises(ImportBundleError, match="SOURCE='-'"): + import_bundle(dest_root, ImportOptions( + source='-', passphrase_stdin=True)) + + +def test_import_rejects_both_passphrase_env_and_stdin(dest_root): + with pytest.raises(ImportBundleError, match="--passphrase-env"): + import_bundle(dest_root, ImportOptions( + source='/dev/null', passphrase_env='X', passphrase_stdin=True)) + + +def test_read_passphrase_uses_getpass_on_tty(monkeypatch): + """tty 入力時は getpass.getpass を使い stdin.readline は呼ばない (エコー抑止)""" + fake_stdin = io.StringIO("should-not-be-read\n") + monkeypatch.setattr(fake_stdin, "isatty", lambda: True, raising=False) + monkeypatch.setattr("sys.stdin", fake_stdin) + + calls = {} + + def fake_getpass(prompt='', stream=None): + calls['prompt'] = prompt + calls['stream'] = stream + return "hunter2" + + monkeypatch.setattr("devbase.env.io_import.getpass.getpass", fake_getpass) + + pw = _read_passphrase(ImportOptions(source='/dev/null', passphrase_stdin=True)) + assert pw == "hunter2" + assert calls['prompt'] == "passphrase: " + assert fake_stdin.read() == "should-not-be-read\n" # stdin は消費されていない + + +def test_read_passphrase_falls_back_to_stdin_on_pipe(monkeypatch, capsys): + """パイプ (非 tty) 入力時は getpass を使わず stdin.readline で読む""" + fake_stdin = io.StringIO("piped-pass\n") + monkeypatch.setattr(fake_stdin, "isatty", lambda: False, raising=False) + monkeypatch.setattr("sys.stdin", fake_stdin) + + def fail_getpass(*args, **kwargs): + raise AssertionError("getpass.getpass should not be called for piped stdin") + + monkeypatch.setattr("devbase.env.io_import.getpass.getpass", fail_getpass) + + pw = _read_passphrase(ImportOptions(source='/dev/null', passphrase_stdin=True)) + assert pw == "piped-pass" + assert "passphrase" not in capsys.readouterr().err + + +def test_read_passphrase_tty_eof_raises_import_error(monkeypatch): + """tty で getpass が EOFError を投げた場合は ImportError に変換される""" + fake_stdin = io.StringIO("") + monkeypatch.setattr(fake_stdin, "isatty", lambda: True, raising=False) + monkeypatch.setattr("sys.stdin", fake_stdin) + + def raise_eof(*args, **kwargs): + raise EOFError() + + monkeypatch.setattr("devbase.env.io_import.getpass.getpass", raise_eof) + + with pytest.raises(ImportBundleError, match="パスフレーズを読み取れません"): + _read_passphrase(ImportOptions(source='/dev/null', passphrase_stdin=True)) + + +def test_import_rejects_unknown_manifest_version(fake_root, dest_root, age_keys, tmp_path): + """manifest.version が SUPPORTED_MANIFEST_VERSION より大きいバンドルは拒否される""" + import gzip + import io as _io + import tarfile + import yaml + + pub_file, id_file = age_keys + # 通常 export + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + # 復号して中身を書き換えてから age で再暗号化する + plain = cipher.decrypt(bundle_path.read_bytes(), identities=[str(id_file)]) + + # tar.gz を再構築して manifest.version=999 にする + buf_in = _io.BytesIO(plain) + tin = tarfile.open(fileobj=buf_in, mode='r:gz') + out = _io.BytesIO() + with gzip.GzipFile(fileobj=out, mode='wb', mtime=0) as gz: + with tarfile.open(fileobj=gz, mode='w', format=tarfile.PAX_FORMAT) as tout: + for info in tin.getmembers(): + data = tin.extractfile(info).read() + if info.name == bundle.MANIFEST_NAME: + manifest = yaml.safe_load(data) + manifest['version'] = 999 + data = yaml.safe_dump(manifest, sort_keys=False).encode('utf-8') + ti = tarfile.TarInfo(name=info.name) + ti.size = len(data) + ti.mtime = 0 + ti.mode = 0o600 + tout.addfile(ti, _io.BytesIO(data)) + tin.close() + + bad_plain = out.getvalue() + bad = pyrage.encrypt(bad_plain, + [pyrage.ssh.Recipient.from_str(pub_file.read_text().strip())] + if pub_file.read_text().strip().startswith('ssh-') + else [pyrage.x25519.Recipient.from_str(pub_file.read_text().strip())]) + bad_path = tmp_path / "bad.dbenv" + bad_path.write_bytes(bad) + + with pytest.raises(bundle.BundleError, match="サポートされていません"): + import_bundle(dest_root, ImportOptions( + source=str(bad_path), identities=[str(id_file)])) + + +def test_import_preserves_lf_line_endings(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + # CRLF を排除した想定: export → import で LF が保持されること + (fake_root / ".env").write_text("A=1\nB=2\n") + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)])) + assert rc == 0 + raw = (dest_root / ".env").read_bytes() + assert b'\r' not in raw + assert raw.endswith(b'\n') + + +def test_import_keep_last_gc_removes_old_backups(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + backup_root = dest_root / "backups" / "env-import" + # 既存の古い backup を 5 個事前作成する (タイムスタンプ命名規則に合わせる) + backup_root.mkdir(parents=True) + for i in range(5): + (backup_root / f"20260101-00000{i}").mkdir() + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], keep_last=3)) + assert rc == 0 + + remaining = sorted(p.name for p in backup_root.iterdir()) + assert len(remaining) == 3 + # 最新 3 個に絞られる: 既存の 20260101-000003, 000004, 加えて新規 backup + assert remaining[-1].startswith('20') + + +def test_import_include_project_filter(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + include_projects=['alpha'])) + assert rc == 0 + assert (dest_root / "projects" / "alpha" / ".env").exists() + assert not (dest_root / "projects" / "beta" / ".env").exists() + + +def test_import_plaintext_bundle(fake_root, dest_root, tmp_path): + """--force-unencrypted で出力した平文 tar.gz もそのまま import できる""" + dest = tmp_path / "out.dbenv.tar.gz" + rc = export(fake_root, ExportOptions(dest=str(dest), force_unencrypted=True)) + assert rc == 0 + + rc = import_bundle(dest_root, ImportOptions(source=str(dest))) + assert rc == 0 + assert (dest_root / ".env").exists() + + +def test_import_merge_metadata_adds_only_new_sources(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + # 既存 sources.yml を用意 (aws のみ。bundle 側も aws を持つ) + (dest_root / ".env.sources.yml").write_text( + "sources:\n aws:\n type: tar_base64\n hash: existinghash\n" + ) + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + merge_metadata=True)) + assert rc == 0 + + import yaml as _yaml + data = _yaml.safe_load((dest_root / ".env.sources.yml").read_text()) + # 既存 aws は維持される (hash=existinghash のまま) + assert data['sources']['aws']['hash'] == 'existinghash' + + +def test_import_no_metadata_skips_sources_yml(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + include_metadata=False)) + assert rc == 0 + # 参照用コピーも作られない (filter で除外されるため) + backup_root = dest_root / "backups" / "env-import" + sub = next(backup_root.iterdir()) + assert not (sub / "sources.yml.imported").exists() + + +def test_import_replace_keys_adds_unspecified_new_keys(fake_root, dest_root, age_keys, tmp_path): + """--replace-keys 指定外でも、既存ファイルに無い incoming キーは追加される + (CLI help 'other keys behave like keep-existing' に整合)""" + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + # 既存は AWS_CONFIG_BASE64 のみ。incoming は AWS_CONFIG_BASE64 + GLOBAL=1 + (dest_root / ".env").write_text("AWS_CONFIG_BASE64=OLD\n") + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + replace_keys=['AWS_CONFIG_BASE64'])) + assert rc == 0 + + text = (dest_root / ".env").read_text() + assert "AWS_CONFIG_BASE64=AAAA" in text # 指定キーは上書き + assert "GLOBAL=1" in text # 指定外でも既存に無い新規キーは追加される (keep-existing 相当) + + +def test_rollback_unlinks_newly_created_files_on_commit_failure( + fake_root, dest_root, age_keys, tmp_path, monkeypatch): + """commit フェーズ途中失敗時、元ファイル不在で新規作成された target は unlink され、 + 部分適用状態が残らないこと""" + from devbase.env import io_import as _io_import + + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + # dest には元ファイルが一切無い (= 全 plan op='create') + assert not (dest_root / ".env").exists() + assert not (dest_root / "projects" / "alpha" / ".env").exists() + + # 2 つ目以降の os.replace で失敗させる + original_replace = os.replace + call_count = {'n': 0} + + def failing_replace(src, dst): + call_count['n'] += 1 + if call_count['n'] >= 2: + raise OSError("simulated commit failure") + return original_replace(src, dst) + + monkeypatch.setattr(_io_import.os, 'replace', failing_replace) + + with pytest.raises(_io_import.ImportError, match="commit フェーズで失敗"): + import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)])) + + # rollback で新規作成 (op='create') の .env は削除されていること + assert not (dest_root / ".env").exists() + # まだ commit されていない target ももちろん存在しない + assert not (dest_root / "projects" / "beta" / ".env").exists() + + +def test_gc_backups_only_removes_timestamp_dirs(fake_root, dest_root, age_keys, tmp_path): + """--backup-dir 指定時でも、devbase が作った timestamp 形式以外のディレクトリは + GC で削除されない""" + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + custom_backup_root = tmp_path / "user-backups" + custom_backup_root.mkdir() + # 関係ないディレクトリ + unrelated = custom_backup_root / "important-user-data" + unrelated.mkdir() + (unrelated / "keep.txt").write_text("must not be deleted") + # 関係ないファイル + unrelated_file = custom_backup_root / "readme.txt" + unrelated_file.write_text("must not be deleted") + # devbase 命名の古い backup を keep_last 超に置く + for i in range(5): + (custom_backup_root / f"20240101-00000{i}").mkdir() + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + backup_dir=str(custom_backup_root), keep_last=3)) + assert rc == 0 + + # 無関係なディレクトリ/ファイルは残る + assert unrelated.exists() + assert (unrelated / "keep.txt").exists() + assert unrelated_file.exists() + # timestamp 形式は keep_last=3 まで絞られる (新規 backup 含む) + timestamp_dirs = sorted( + p.name for p in custom_backup_root.iterdir() + if p.is_dir() and p.name not in ('important-user-data',) + ) + assert len(timestamp_dirs) == 3 + + +def test_import_passphrase_env_roundtrip(fake_root, dest_root, tmp_path, monkeypatch): + dest = tmp_path / "out.dbenv" + monkeypatch.setenv("DEVBASE_TEST_PASS", "s3cr3t") + rc = export(fake_root, ExportOptions( + dest=str(dest), passphrase_env="DEVBASE_TEST_PASS")) + assert rc == 0 + + rc = import_bundle(dest_root, ImportOptions( + source=str(dest), passphrase_env="DEVBASE_TEST_PASS")) + assert rc == 0 + assert (dest_root / ".env").exists() + + +def test_import_preserves_escaped_values_no_double_escape( + dest_root, age_keys, tmp_path): + """値に backslash / quote / newline / spaces が含まれていても + export → import で二重エスケープされないことを保証する (PR #15 codex 指摘)""" + _, id_file = age_keys + pub_file, _ = age_keys + + # 特殊文字を含む .env を持つ source root を構築 + src_root = tmp_path / "esc-src" + (src_root / "projects" / "alpha").mkdir(parents=True) + raw_env = ( + 'BACKSLASH="a\\\\b"\n' # 値: a\b (3 chars) + 'QUOTE_IN_VALUE="he said \\"hi\\""\n' # 値: he said "hi" + 'WITH_NEWLINE="line1\\nline2"\n' # 値: line1line2 + 'WITH_SPACE="value with space"\n' # 値: value with space + 'PLAIN=simple\n' # 値: simple + ) + (src_root / ".env").write_text(raw_env) + (src_root / "projects" / "alpha" / ".env").write_text( + 'ALPHA_BACK="a\\\\b"\n' + ) + + bundle_path = tmp_path / "esc.dbenv" + rc = export(src_root, ExportOptions( + dest=str(bundle_path), recipients=[f"@{pub_file}"])) + assert rc == 0 + + # 新規作成 (dest 側に既存ファイル無し) + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)])) + assert rc == 0 + + # 新規作成時は incoming_bytes をそのまま使うので元バイト列と一致する + assert (dest_root / ".env").read_text() == raw_env + + # EnvFile から読んだ際に escape が正しく解釈されること (parse_bytes round-trip) + from devbase.env.store import EnvFile + parsed = EnvFile.parse_bytes((dest_root / ".env").read_bytes()) + assert parsed['BACKSLASH'] == 'a\\b' + assert parsed['QUOTE_IN_VALUE'] == 'he said "hi"' + assert parsed['WITH_NEWLINE'] == 'line1\nline2' + assert parsed['WITH_SPACE'] == 'value with space' + assert parsed['PLAIN'] == 'simple' + + +def test_import_merge_round_trips_escaped_values( + dest_root, age_keys, tmp_path): + """既存ファイルがあって merge する場合でも、parse → format の round-trip で + 値が壊れない (二重エスケープしない)""" + _, id_file = age_keys + pub_file, _ = age_keys + + src_root = tmp_path / "esc-src2" + (src_root / "projects" / "alpha").mkdir(parents=True) + (src_root / ".env").write_text('NEW_BACK="a\\\\b"\n') + + bundle_path = tmp_path / "esc2.dbenv" + rc = export(src_root, ExportOptions( + dest=str(bundle_path), recipients=[f"@{pub_file}"])) + assert rc == 0 + + # dest に既存ファイルを置く (merge 経路に入る) + (dest_root / ".env").write_text('EXISTING="x\\\\y"\n') + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + merge='prefer-incoming')) + assert rc == 0 + + from devbase.env.store import EnvFile + parsed = EnvFile.parse_bytes((dest_root / ".env").read_bytes()) + # 二重エスケープされていないので、parse 後の値は元の 3 文字 "a\\b" + assert parsed['NEW_BACK'] == 'a\\b' + assert parsed['EXISTING'] == 'x\\y' + + +def test_rollback_unlinks_newly_created_sources_yml( + fake_root, dest_root, age_keys, tmp_path, monkeypatch): + """sources.yml を --merge-metadata で新規作成中に commit 失敗すると、 + ロールバックで sources.yml が削除されること (PR #15 gemini 指摘)""" + from devbase.env import io_import as _io_import + + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + # dest には sources.yml が無い状態。--merge-metadata で新規作成パスに入る + assert not (dest_root / ".env.sources.yml").exists() + + # commit 中に sources.yml の rename だけ失敗させる (最後のファイル) + original_replace = os.replace + + def failing_replace(src, dst): + if str(dst).endswith('.env.sources.yml'): + raise OSError("simulated commit failure on sources.yml") + return original_replace(src, dst) + + monkeypatch.setattr(_io_import.os, 'replace', failing_replace) + + with pytest.raises(_io_import.ImportError, match="commit"): + import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + merge_metadata=True)) + + # sources.yml はもともと存在しなかったので、ロールバックで unlink されているはず + assert not (dest_root / ".env.sources.yml").exists() + + +def test_commit_failure_cleans_remaining_import_tmp_files( + fake_root, dest_root, age_keys, tmp_path, monkeypatch): + """_commit 失敗時に、まだ rename されていない .import.tmp ファイルが残らないこと + (PR #15 gemini 指摘)""" + from devbase.env import io_import as _io_import + + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + original_replace = os.replace + call_count = {'n': 0} + + def failing_replace(src, dst): + call_count['n'] += 1 + if call_count['n'] >= 2: + raise OSError("simulated commit failure") + return original_replace(src, dst) + + monkeypatch.setattr(_io_import.os, 'replace', failing_replace) + + with pytest.raises(_io_import.ImportError, match="commit"): + import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)])) + + # 残骸の .import.tmp ファイルが無いこと + leftover = list(dest_root.rglob('*.import.tmp')) + assert leftover == [], f"残骸の tmp が残っている: {leftover}" + + +def test_backup_dir_collision_avoidance(fake_root, dest_root, age_keys, tmp_path): + """同じプロセス内で連続して import を実行しても、backup ディレクトリ名が衝突せず + 前回バックアップを上書きしないこと (PR #15 codex 指摘)""" + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + # 1 回目 + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)])) + assert rc == 0 + # 2 回目 (同一プロセス内, おそらく同一秒) + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)])) + assert rc == 0 + + backup_root = dest_root / "backups" / "env-import" + subdirs = sorted(p.name for p in backup_root.iterdir() if p.is_dir()) + # 2 つの異なる backup ディレクトリが残っていること + assert len(subdirs) == 2, f"backup が衝突して 1 つになっている: {subdirs}" + + +def test_envfile_parse_bytes_round_trip_with_escapes(): + """``EnvFile.parse_bytes`` が ``save`` が施す escape を正しく逆変換すること + (PR #15 codex 指摘の double-escape 回避テスト)""" + from devbase.env.store import EnvFile + + # 直接 EnvFile.save と同じ規則で encode したものを parse_bytes で復元 + raw = ( + 'BACKSLASH="a\\\\b"\n' # a\b + 'QUOTED="he said \\"hi\\""\n' # he said "hi" + 'NL="x\\ny"\n' # xy + 'PLAIN=simple\n' + 'EMPTY=""\n' # empty string with quotes + ) + parsed = EnvFile.parse_bytes(raw.encode('utf-8')) + assert parsed['BACKSLASH'] == 'a\\b' + assert parsed['QUOTED'] == 'he said "hi"' + assert parsed['NL'] == 'x\ny' + assert parsed['PLAIN'] == 'simple' + assert parsed['EMPTY'] == '' + + # 「リテラル ``\\n``」(2 文字: backslash + 'n') を含む値も区別できること + # save は ``a\\nb`` (3 chars) を ``"a\\\\nb"`` に変換するので、これを parse_bytes + # に通せば元の 3 文字に戻る + raw2 = 'LITERAL="a\\\\nb"\n' + parsed2 = EnvFile.parse_bytes(raw2.encode('utf-8')) + assert parsed2['LITERAL'] == 'a\\nb' # backslash + 'n' + 'b' (3 chars) + + +def test_envfile_dollar_escape_round_trip(): + """``$`` を含む値は ``\\$`` にエスケープされ、``source`` 時に変数展開されない + (PR #15 gemini 指摘)""" + from devbase.env.store import EnvFile + + # dump → parse の round-trip で値が保たれる + data = { + 'DOLLAR': '$HOME', # 単純な変数展開を含む + 'PRICE': 'cost is $100', # 値内の $ + 'ESCAPED_LIKE': 'a\\$b', # backslash + $ の組み合わせ + 'PLAIN_NUM': '12345', # quote 不要なケース + } + dumped = EnvFile.dump_bytes(data) + text = dumped.decode('utf-8') + # $ が裸 (バックスラッシュ無し) で出力されていないこと + # ($ の直前は必ず \ である or 行の終端 / 別の \\) + for line in text.splitlines(): + if '=' in line and '"' in line: + # ダブルクオート内に裸の $ があるかチェック + _, _, val = line.partition('=') + # 値部分の $ をすべて検査: 直前の文字が \\ であること + for idx, ch in enumerate(val): + if ch == '$': + assert idx > 0 and val[idx - 1] == '\\', ( + f"unescaped $ in dump: {line!r}" + ) + parsed = EnvFile.parse_bytes(dumped) + assert parsed == data + + +def test_env_import_merge_preserves_comments_and_blanks( + fake_root, dest_root, age_keys, tmp_path): + """merge 経路で既存 ``.env`` のコメントと空行が保持されること (PR #15 gemini 指摘)""" + _, id_file = age_keys + pub_file, _ = age_keys + + # incoming bundle: AWS_CONFIG_BASE64=AAAA + GLOBAL=1 + src_root = tmp_path / "comment-src" + src_root.mkdir() + (src_root / ".env").write_text("AWS_CONFIG_BASE64=AAAA\nGLOBAL=1\n") + bundle_path = tmp_path / "comment.dbenv" + rc = export(src_root, ExportOptions( + dest=str(bundle_path), recipients=[f"@{pub_file}"])) + assert rc == 0 + + # 既存 dest .env にコメント・空行・既存キーを配置 + existing_text = ( + "# Top-level header comment\n" + "\n" + "# AWS section\n" + "AWS_CONFIG_BASE64=OLD\n" + "\n" + "# user-managed key\n" + "KEEP=this\n" + ) + (dest_root / ".env").write_text(existing_text) + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + merge='prefer-incoming')) + assert rc == 0 + + out = (dest_root / ".env").read_text() + # コメント・空行が保持されている + assert "# Top-level header comment" in out + assert "# AWS section" in out + assert "# user-managed key" in out + # 既存値は prefer-incoming で AAAA に書き換わる + assert "AWS_CONFIG_BASE64=AAAA" in out + # 既存にしか無かった KEEP は維持 + assert "KEEP=this" in out + # incoming にしか無かった GLOBAL は末尾に追加 + assert "GLOBAL=1" in out + # 空行も最低 1 つ残っている + assert "\n\n" in out + + +def test_env_import_keep_existing_preserves_comments( + fake_root, dest_root, age_keys, tmp_path): + """keep-existing 経路でもコメントが保持されること""" + _, id_file = age_keys + pub_file, _ = age_keys + + src_root = tmp_path / "keep-src" + src_root.mkdir() + (src_root / ".env").write_text("INCOMING_KEY=incoming\n") + bundle_path = tmp_path / "keep.dbenv" + rc = export(src_root, ExportOptions( + dest=str(bundle_path), recipients=[f"@{pub_file}"])) + assert rc == 0 + + (dest_root / ".env").write_text( + "# This comment must survive\nKEEP_ME=v\n" + ) + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)])) + assert rc == 0 + + out = (dest_root / ".env").read_text() + assert "# This comment must survive" in out + assert "KEEP_ME=v" in out + assert "INCOMING_KEY=incoming" in out + + +def test_env_import_dollar_value_is_escaped_after_merge( + fake_root, dest_root, age_keys, tmp_path): + """``$`` を含む値が merge 後の ``.env`` 上でエスケープされていること + (シェルで source した時の変数展開を防ぐ / PR #15 gemini 指摘)""" + _, id_file = age_keys + pub_file, _ = age_keys + + src_root = tmp_path / "dollar-src" + src_root.mkdir() + # 値に $ を含む。export 側 (EnvFile.save 形式) で書き出される + (src_root / ".env").write_text('PRICE="cost is \\$100"\n') + bundle_path = tmp_path / "dollar.dbenv" + rc = export(src_root, ExportOptions( + dest=str(bundle_path), recipients=[f"@{pub_file}"])) + assert rc == 0 + + # 既存 dest .env (merge 経路に入る) + (dest_root / ".env").write_text("EXISTING=keep\n") + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + merge='prefer-incoming')) + assert rc == 0 + + raw_text = (dest_root / ".env").read_text() + # 裸の $ が現れていないこと (\ の直後でなければ NG) + for line in raw_text.splitlines(): + if 'PRICE' not in line: + continue + # 値の中の $ は必ず \\ の直後 + _, _, val = line.partition('=') + for idx, ch in enumerate(val): + if ch == '$': + assert idx > 0 and val[idx - 1] == '\\', ( + f"unescaped $ in merged .env: {line!r}" + ) + + from devbase.env.store import EnvFile + parsed = EnvFile.parse_bytes((dest_root / ".env").read_bytes()) + assert parsed['PRICE'] == 'cost is $100' + assert parsed['EXISTING'] == 'keep' + + +# --- PR #15 round5: コメント / 空行のみの既存 .env が create 扱いされて +# 上書きされないこと (`existing` dict が空でも target.exists() で merge に入る)。 + +def _setup_comment_only_dest(dest_root: Path) -> str: + """key=value を含まずコメント / 空行のみで構成された既存 .env を作る""" + text = ( + "# user-managed header (no kv yet)\n" + "\n" + "# section: aws\n" + "\n" + ) + (dest_root / ".env").write_text(text) + os.chmod(dest_root / ".env", 0o600) + return text + + +def _build_simple_bundle(tmp_path: Path, pub_file: Path, + name: str = "comment-only") -> Path: + src_root = tmp_path / f"{name}-src" + src_root.mkdir() + (src_root / ".env").write_text("INCOMING_KEY=incoming\n") + bundle_path = tmp_path / f"{name}.dbenv" + rc = export(src_root, ExportOptions( + dest=str(bundle_path), recipients=[f"@{pub_file}"])) + assert rc == 0 + return bundle_path + + +@pytest.mark.parametrize("merge_mode", ["prefer-incoming", "keep-existing"]) +def test_env_import_comment_only_existing_preserves_comments_on_merge( + fake_root, dest_root, age_keys, tmp_path, merge_mode): + """コメント / 空行のみの既存 .env が ``existing`` 辞書の空判定で create 扱いに + なって上書きされ、ヘッダコメントが消失しないこと (PR #15 round5 指摘)""" + _, id_file = age_keys + pub_file, _ = age_keys + bundle_path = _build_simple_bundle(tmp_path, pub_file, "comment-merge") + + _setup_comment_only_dest(dest_root) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + merge=merge_mode)) + assert rc == 0 + + out = (dest_root / ".env").read_text() + assert "# user-managed header (no kv yet)" in out + assert "# section: aws" in out + assert "INCOMING_KEY=incoming" in out + + +def test_env_import_comment_only_existing_preserves_comments_on_replace_keys( + fake_root, dest_root, age_keys, tmp_path): + """--replace-keys 経路でも、コメントのみの既存 .env が create 扱いされず + コメントが保持されること (PR #15 round5 指摘)""" + _, id_file = age_keys + pub_file, _ = age_keys + bundle_path = _build_simple_bundle(tmp_path, pub_file, "comment-rk") + + _setup_comment_only_dest(dest_root) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + replace_keys=["INCOMING_KEY"])) + assert rc == 0 + + out = (dest_root / ".env").read_text() + assert "# user-managed header (no kv yet)" in out + assert "# section: aws" in out + assert "INCOMING_KEY=incoming" in out + + +def test_env_import_comment_only_existing_replace_reports_op_replace( + fake_root, dest_root, age_keys, tmp_path, caplog): + """--replace 経路では incoming で完全上書きするが、その op は 'create' では + なく 'replace' として報告されること (PR #15 round5 指摘)。 + + --replace の意味論として既存内容は捨てるため、コメント保持は要件ではないが、 + ログ上 ``create`` と表示されるとロールバック挙動など他の経路 (= 新規作成は + backup を取らない) と判別できなくなるため、op 表記の正確性を確認する。 + """ + import logging + _, id_file = age_keys + pub_file, _ = age_keys + bundle_path = _build_simple_bundle(tmp_path, pub_file, "comment-replace") + + _setup_comment_only_dest(dest_root) + + with caplog.at_level(logging.INFO, logger="devbase.env.io_import"): + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + replace=True)) + assert rc == 0 + + # 'replace: ' のような行が出ているはず。少なくとも 'create:' 表記で + # 出力されていないことを確認 (op_replace の正しさ)。 + log_text = "\n".join(r.message for r in caplog.records) + assert "replace: " in log_text, log_text + # backup には元のコメントのみの .env が記録されている (存在判定が正しければ + # _backup_existing が target を見つけてコピーするため) + backup_root = dest_root / "backups" / "env-import" + assert backup_root.is_dir() + snapshots = [p for p in backup_root.iterdir() if p.is_dir()] + assert len(snapshots) >= 1 + backed = (snapshots[0] / ".env").read_text() + assert "# user-managed header (no kv yet)" in backed diff --git a/tests/cli/test_prefix_resolution.py b/tests/cli/test_prefix_resolution.py index 0a7574e..06f849a 100644 --- a/tests/cli/test_prefix_resolution.py +++ b/tests/cli/test_prefix_resolution.py @@ -48,3 +48,24 @@ def test_expand_argv_env_ex_resolves_to_export(monkeypatch): monkeypatch.setattr(sys, "argv", ["devbase", "env", "ex"]) cli._expand_argv() assert sys.argv == ["devbase", "env", "export"] + + +def test_expand_argv_env_i_resolves_to_init(monkeypatch): + """`devbase env i` は `import` 追加後も `init` に解決される (後方互換 / PR #15 round5)""" + monkeypatch.setattr(sys, "argv", ["devbase", "env", "i"]) + cli._expand_argv() + assert sys.argv == ["devbase", "env", "init"] + + +def test_expand_argv_env_im_resolves_to_import(monkeypatch): + """`devbase env im` は唯一の候補なので `import` に解決される""" + monkeypatch.setattr(sys, "argv", ["devbase", "env", "im"]) + cli._expand_argv() + assert sys.argv == ["devbase", "env", "import"] + + +def test_expand_argv_env_in_resolves_to_init(monkeypatch): + """`devbase env in` も唯一の候補 (`init`) に解決される""" + monkeypatch.setattr(sys, "argv", ["devbase", "env", "in"]) + cli._expand_argv() + assert sys.argv == ["devbase", "env", "init"]