From 5809377971c91fa63dba6696fe233c238258d226 Mon Sep 17 00:00:00 2001 From: "takemi.ohama" Date: Thu, 21 May 2026 20:16:42 +0900 Subject: [PATCH 01/10] =?UTF-8?q?chore:=20PLAN03-1=20PR2=20Draft=20PR=20?= =?UTF-8?q?=E4=BD=9C=E6=88=90=20(import=20=E5=AE=9F=E8=A3=85)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit From 381dfed7ad641afa63cb5fcf628b0430ee650474 Mon Sep 17 00:00:00 2001 From: "takemi.ohama" Date: Thu, 21 May 2026 20:26:41 +0900 Subject: [PATCH 02/10] feat(env): PLAN03-1 PR2 devbase env import (Local + Stdio) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `devbase env import` を追加し、export で生成したバンドル (age 暗号化済み or 平文 tar.gz) から `.env` 群を復元できるようにする。 主な機能: - merge セマンティクス: --merge keep-existing (既定) / prefer-incoming - --replace-keys: 指定キーのみ上書き - --replace: 対象 .env を丸ごと差し替え (backup 取得) - --dry-run: 差分プレビュー (書き込みなし) - 2 フェーズ書き出し (prepare → commit) で部分適用を最小化、失敗時は backup から best-effort で rollback - --backup-dir / --keep-last N (既定 10) で backup を GC - .env.sources.yml は既定で上書きせず参照用コピーのみ、--merge-metadata で 新規 source エントリのみ追加、--no-metadata で完全無視 - 暗号化判定: gzip magic で平文 / age 暗号化を識別 - 引数バリデーション: SOURCE='-' と --passphrase-stdin の併用拒否、 --passphrase-env と --passphrase-stdin の併用拒否、--replace と --replace-keys の併用拒否 - 既定 identity: ~/.ssh/id_ed25519 → ~/.ssh/id_rsa の順で探索 テスト (tests/cli/test_env_import.py, 17 ケース): - export → import の round-trip、0600 permission 保持、LF 保持 - merge モード別の挙動 (keep-existing / prefer-incoming / replace-keys / replace)、dry-run が変更しないこと、replace 時の backup 作成 - --include-project / --no-metadata / --merge-metadata の挙動 - 未知 manifest version のバンドル拒否、--keep-last による古い backup GC - 平文バンドル import、passphrase round-trip Co-Authored-By: Claude Opus 4.7 (1M context) --- lib/devbase/cli.py | 51 ++- lib/devbase/commands/env.py | 28 ++ lib/devbase/env/io_import.py | 619 +++++++++++++++++++++++++++++++++++ tests/cli/test_env_import.py | 353 ++++++++++++++++++++ 4 files changed, 1050 insertions(+), 1 deletion(-) create mode 100644 lib/devbase/env/io_import.py create mode 100644 tests/cli/test_env_import.py diff --git a/lib/devbase/cli.py b/lib/devbase/cli.py index 1da2160..cc9fc28 100644 --- a/lib/devbase/cli.py +++ b/lib/devbase/cli.py @@ -36,7 +36,7 @@ # Subcommand map for prefix resolution: {(aliases...): [subcmds]} SUBCMD_MAP = { ('container', 'ct'): ['up', 'down', 'ps', 'login', 'logs', 'scale', 'build'], - ('env',): ['init', 'sync', 'list', 'set', 'get', 'delete', 'edit', 'project', 'export'], + ('env',): ['init', 'sync', 'list', 'set', 'get', 'delete', 'edit', 'project', 'export', 'import'], ('plugin', 'pl'): ['list', 'install', 'uninstall', 'update', 'info', 'sync', 'repo'], ('snapshot', 'ss'): ['create', 'list', 'restore', 'copy', 'delete', 'rotate'], } @@ -149,6 +149,55 @@ def _add_env_parser(subparsers): help='Write as plaintext tar.gz (rejected by default; ' 'warns when sensitive keys are detected)') + env_import = env_sub.add_parser( + 'import', + help='Import .env files from a bundle (age-encrypted or plaintext tar.gz)', + ) + env_import.add_argument('source', + help="Bundle path or '-' for stdin") + env_import.add_argument('--merge', choices=['keep-existing', 'prefer-incoming'], + default='keep-existing', + help=("Key-level merge mode. keep-existing (default) keeps " + "existing keys and adds new ones; prefer-incoming " + "overwrites with bundle values")) + env_import.add_argument('--replace-keys', metavar='KEYS', default='', + help=("Comma-separated keys to force-overwrite from bundle " + "(other keys behave like keep-existing). " + "Cannot be combined with --replace")) + env_import.add_argument('--replace', action='store_true', + help='Replace each target .env file wholesale (backup is taken)') + env_import.add_argument('--dry-run', action='store_true', + help='Show planned diff without writing') + env_import.add_argument('--identity', action='append', default=[], + metavar='FILE', dest='identities', + help=("age / OpenSSH private key file (repeatable). " + "Default: ~/.ssh/id_ed25519, then ~/.ssh/id_rsa " + "(first existing one)")) + env_import.add_argument('--passphrase-env', metavar='VAR', default=None, + help='Read passphrase from environment variable VAR') + env_import.add_argument('--passphrase-stdin', action='store_true', + help='Read passphrase from the first line of stdin') + env_import.add_argument('--include-project', action='append', default=None, + metavar='NAME', dest='include_projects', + help='Limit to specified project (repeatable)') + env_import.add_argument('--exclude-project', action='append', default=[], + metavar='NAME', dest='exclude_projects', + help='Exclude project (repeatable)') + env_import.add_argument('--no-global', action='store_true', + help='Do not import $DEVBASE_ROOT/.env') + env_import.add_argument('--no-metadata', action='store_true', + help='Do not import $DEVBASE_ROOT/.env.sources.yml ' + '(default behavior is reference-only copy; this fully ignores it)') + env_import.add_argument('--merge-metadata', action='store_true', + help='Merge new source entries into existing .env.sources.yml ' + '(machine-specific fields are preserved as-is from bundle; ' + 'run `devbase env sync` after import to refresh)') + env_import.add_argument('--backup-dir', metavar='DIR', default=None, + help='Override backup directory ' + '(default: $DEVBASE_ROOT/backups/env-import/)') + env_import.add_argument('--keep-last', type=int, default=10, metavar='N', + help='Keep only the last N backup directories (default: 10, 0 to disable)') + def _add_plugin_parser(subparsers): """Plugin group parser""" diff --git a/lib/devbase/commands/env.py b/lib/devbase/commands/env.py index a324fbd..3b43598 100644 --- a/lib/devbase/commands/env.py +++ b/lib/devbase/commands/env.py @@ -34,6 +34,7 @@ def cmd_env(devbase_root: Path, args) -> int: 'edit': lambda: cmd_env_edit(devbase_root), 'project': lambda: cmd_env_project(devbase_root), 'export': lambda: cmd_env_export(devbase_root, args), + 'import': lambda: cmd_env_import(devbase_root, args), } handler = handlers.get(subcmd) @@ -401,6 +402,33 @@ def cmd_env_export(devbase_root: Path, args) -> int: return export(devbase_root, opts) +def cmd_env_import(devbase_root: Path, args) -> int: + """devbase env import""" + from devbase.env.io_import import ImportOptions, import_bundle + + replace_keys_arg = getattr(args, 'replace_keys', '') or '' + replace_keys = [k.strip() for k in replace_keys_arg.split(',') if k.strip()] + + opts = ImportOptions( + source=getattr(args, 'source'), + merge=getattr(args, 'merge', 'keep-existing'), + replace_keys=replace_keys, + replace=getattr(args, 'replace', False), + dry_run=getattr(args, 'dry_run', False), + identities=list(getattr(args, 'identities', []) or []), + passphrase_env=getattr(args, 'passphrase_env', None), + passphrase_stdin=getattr(args, 'passphrase_stdin', False), + include_projects=getattr(args, 'include_projects', None), + exclude_projects=list(getattr(args, 'exclude_projects', []) or []), + include_global=not getattr(args, 'no_global', False), + include_metadata=not getattr(args, 'no_metadata', False), + merge_metadata=getattr(args, 'merge_metadata', False), + backup_dir=getattr(args, 'backup_dir', None), + keep_last=getattr(args, 'keep_last', 10), + ) + return import_bundle(devbase_root, opts) + + def _update_source_metadata(devbase_root: Path, env_file: EnvFile) -> None: """ソースメタデータを更新する""" sources = SourcesManager(devbase_root) diff --git a/lib/devbase/env/io_import.py b/lib/devbase/env/io_import.py new file mode 100644 index 0000000..6d6f975 --- /dev/null +++ b/lib/devbase/env/io_import.py @@ -0,0 +1,619 @@ +"""devbase env import の高レベル実装 + +責務: + - SOURCE (file / stdio) の読み込み + - age 復号 (バンドルが暗号化されていれば) + - tar.gz バンドルの展開と sha256 / manifest version の検証 (bundle.unpack) + - --merge / --replace-keys / --replace のセマンティクスで .env 群を更新 + - .env.sources.yml は既定で上書きせず参照用コピーのみ (--merge-metadata で + 新規 source のみ追加) + - 2 フェーズ書き出し (prepare → commit) で部分適用を最小化 + - --backup-dir / --keep-last N で backup を GC + - --dry-run で差分プレビュー +""" + +from __future__ import annotations + +import os +import re +import shutil +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional, Sequence, Tuple + +import yaml + +from devbase.errors import DevbaseError +from devbase.log import get_logger + +from devbase.env import bundle as _bundle +from devbase.env import cipher as _cipher +from devbase.env import storage as _storage +from devbase.env.store import EnvFile + +logger = get_logger(__name__) + +# gzip magic. tar.gz バンドルは先頭 2 byte が 0x1f 0x8b。age 暗号化済みは +# テキストヘッダ "age-encryption.org/v1\n" で始まるため magic で識別できる。 +_GZIP_MAGIC = b'\x1f\x8b' + +_MERGE_MODES = ('keep-existing', 'prefer-incoming') + + +class ImportError(DevbaseError): + """import エラー""" + + +@dataclass +class ImportOptions: + source: str + merge: str = 'keep-existing' + replace_keys: List[str] = field(default_factory=list) + replace: bool = False + dry_run: bool = False + identities: List[str] = field(default_factory=list) + passphrase_env: Optional[str] = None + passphrase_stdin: bool = False + include_projects: Optional[List[str]] = None + exclude_projects: List[str] = field(default_factory=list) + include_global: bool = True + include_metadata: bool = True + merge_metadata: bool = False + backup_dir: Optional[str] = None + keep_last: int = 10 + + +@dataclass +class _Plan: + """1 ファイル分の書き出し計画""" + target: Path + arcname: str + new_bytes: bytes + # 差分サマリ (dry-run / ログ用) + added_keys: List[str] = field(default_factory=list) + overwritten_keys: List[str] = field(default_factory=list) + skipped_keys: List[str] = field(default_factory=list) + # ファイル単位の操作種別 + op: str = 'merge' # 'merge' | 'replace' | 'create' | 'sources-merge' + + +def _read_passphrase(opts: ImportOptions) -> Optional[str]: + if opts.passphrase_env: + value = os.environ.get(opts.passphrase_env) + if not value: + raise ImportError(f"環境変数 {opts.passphrase_env} が空または未設定です") + return value + if opts.passphrase_stdin: + import sys + if sys.stdin.isatty(): + print("passphrase: ", end='', file=sys.stderr, flush=True) + line = sys.stdin.readline() + if not line: + raise ImportError("stdin からパスフレーズを読み取れませんでした") + return line.rstrip('\n') + return None + + +def _resolve_identities(specs: Sequence[str]) -> List[str]: + if specs: + return list(specs) + for path in _cipher.default_identity_paths(): + if path.exists(): + logger.info("identity 既定鍵を使用: %s", path) + return [str(path)] + return [] + + +def _decrypt_if_needed(blob: bytes, opts: ImportOptions) -> bytes: + """先頭バイトで暗号化済みかを判定して必要なら復号する""" + if blob[:2] == _GZIP_MAGIC: + # 平文 tar.gz。鍵指定があっても無視せず警告にとどめる + if opts.identities or opts.passphrase_env or opts.passphrase_stdin: + logger.warning( + "バンドルは平文ですが identity / passphrase が指定されています " + "(使用されません)" + ) + return blob + + passphrase = _read_passphrase(opts) + if passphrase is not None: + return _cipher.decrypt(blob, passphrase=passphrase) + + identities = _resolve_identities(opts.identities) + if not identities: + raise ImportError( + "バンドルは暗号化されていますが復号キーが指定されていません。\n" + " --identity FILE age / OpenSSH 秘密鍵ファイル\n" + " --passphrase-env VAR 環境変数からパスフレーズ取得\n" + " --passphrase-stdin stdin の最初の行をパスフレーズとして使用\n" + " ~/.ssh/id_ed25519 または ~/.ssh/id_rsa があれば " + "--identity 省略時の既定として使用されます (ed25519 優先)" + ) + return _cipher.decrypt(blob, identities=identities) + + +def _filter_members(members: Dict[str, bytes], + opts: ImportOptions) -> Dict[str, bytes]: + """include/exclude 指定で展開済みメンバーを絞り込む""" + included = set(opts.include_projects) if opts.include_projects else None + excluded = set(opts.exclude_projects) + result: Dict[str, bytes] = {} + + proj_re = re.compile(r'^env/projects/([^/]+)/\.env$') + + for arcname, data in members.items(): + if arcname == 'env/global.env': + if not opts.include_global: + continue + result[arcname] = data + continue + if arcname == 'env/sources.yml': + if not opts.include_metadata: + continue + result[arcname] = data + continue + m = proj_re.match(arcname) + if m: + name = m.group(1) + if name in excluded: + continue + if included is not None and name not in included: + continue + result[arcname] = data + continue + # 他の形式は manifest 検証で拒否されているはずだが念のため + logger.debug("未対応の arcname を無視します: %s", arcname) + return result + + +def _target_for(arcname: str, devbase_root: Path) -> Path: + if arcname == 'env/global.env': + return devbase_root / '.env' + if arcname == 'env/sources.yml': + return devbase_root / '.env.sources.yml' + m = re.match(r'^env/projects/([^/]+)/\.env$', arcname) + if m: + return devbase_root / 'projects' / m.group(1) / '.env' + raise ImportError(f"未対応のバンドルエントリ: {arcname}") + + +def _parse_env_bytes(data: bytes) -> Dict[str, str]: + """EnvFile と同じ規則で bytes をパースする (一時ファイル経由で再利用)""" + import tempfile + with tempfile.NamedTemporaryFile( + prefix='devbase-env-import-', suffix='.env', delete=False, mode='wb' + ) as tmp: + tmp.write(data) + tmp_path = Path(tmp.name) + try: + ef = EnvFile(tmp_path) + ef.load() + return ef.get_all() + finally: + try: + tmp_path.unlink() + except OSError: + pass + + +def _format_env_bytes(data: Dict[str, str]) -> bytes: + """EnvFile.save と同じフォーマットで dict をバイト列化する""" + lines: List[str] = [] + for key in sorted(data): + value = data[key] + needs_quote = ( + '\n' in value + or any(c in value for c in (' ', '"', "'", '$', '`', '\\', + '<', '>', '|', '&', ';', + '(', ')', '#')) + ) + if needs_quote: + quoted = (value.replace('\\', '\\\\') + .replace('"', '\\"') + .replace('\n', '\\n')) + lines.append(f'{key}="{quoted}"\n') + else: + lines.append(f'{key}={value}\n') + return ''.join(lines).encode('utf-8') + + +def _plan_env_merge(target: Path, incoming_bytes: bytes, + opts: ImportOptions, arcname: str) -> _Plan: + """1 つの .env に対する merge / replace 計画を作る""" + incoming = _parse_env_bytes(incoming_bytes) + existing: Dict[str, str] = {} + if target.exists(): + existing = _parse_env_bytes(target.read_bytes()) + + if opts.replace: + added = sorted(set(incoming) - set(existing)) + overwritten = sorted(k for k in incoming if k in existing and incoming[k] != existing[k]) + # replace は バンドル側の値で完全に置き換える + return _Plan( + target=target, + arcname=arcname, + new_bytes=incoming_bytes, + added_keys=added, + overwritten_keys=overwritten, + skipped_keys=[], + op='replace' if existing else 'create', + ) + + if opts.replace_keys: + merged = dict(existing) + added: List[str] = [] + overwritten: List[str] = [] + skipped: List[str] = [] + replace_set = set(opts.replace_keys) + for key, value in incoming.items(): + if key in replace_set: + if key in existing: + if existing[key] != value: + overwritten.append(key) + merged[key] = value + else: + added.append(key) + merged[key] = value + else: + # --replace-keys 指定外のキーは既存を優先 (= keep-existing 相当) + if key not in existing: + skipped.append(key) + return _Plan( + target=target, + arcname=arcname, + new_bytes=_format_env_bytes(merged), + added_keys=sorted(added), + overwritten_keys=sorted(overwritten), + skipped_keys=sorted(skipped), + op='merge' if existing else 'create', + ) + + if opts.merge == 'keep-existing': + merged = dict(existing) + added: List[str] = [] + skipped: List[str] = [] + for key, value in incoming.items(): + if key in existing: + skipped.append(key) + else: + merged[key] = value + added.append(key) + return _Plan( + target=target, + arcname=arcname, + new_bytes=_format_env_bytes(merged), + added_keys=sorted(added), + overwritten_keys=[], + skipped_keys=sorted(skipped), + op='merge' if existing else 'create', + ) + + if opts.merge == 'prefer-incoming': + merged = dict(existing) + added: List[str] = [] + overwritten: List[str] = [] + for key, value in incoming.items(): + if key in existing: + if existing[key] != value: + overwritten.append(key) + merged[key] = value + else: + merged[key] = value + added.append(key) + return _Plan( + target=target, + arcname=arcname, + new_bytes=_format_env_bytes(merged), + added_keys=sorted(added), + overwritten_keys=sorted(overwritten), + skipped_keys=[], + op='merge' if existing else 'create', + ) + + raise ImportError(f"不明な --merge モード: {opts.merge!r}") + + +def _plan_sources(target: Path, incoming_bytes: bytes, + opts: ImportOptions) -> Optional[_Plan]: + """.env.sources.yml の取り扱い計画 + + 既定: 上書きせず None を返す (backup_dir に参照用コピーのみ書く)。 + --merge-metadata: 新規 source エントリのみ追加した内容で更新する。 + """ + if not opts.merge_metadata: + # 上書きしないので _Plan は返さない。参照用 copy は run() 側で処理。 + return None + + try: + incoming = yaml.safe_load(incoming_bytes) or {} + except yaml.YAMLError as e: + raise ImportError(f"バンドルの sources.yml が壊れています: {e}") from e + if not isinstance(incoming, dict): + raise ImportError("バンドルの sources.yml が dict ではありません") + incoming_sources = incoming.get('sources') or {} + if not isinstance(incoming_sources, dict): + raise ImportError("バンドルの sources.yml の sources が dict ではありません") + + existing: Dict = {} + if target.exists(): + try: + existing = yaml.safe_load(target.read_bytes()) or {} + except yaml.YAMLError as e: + raise ImportError( + f"既存の {target.name} のパースに失敗しました: {e}" + ) from e + if not isinstance(existing, dict): + existing = {} + existing.setdefault('sources', {}) + if not isinstance(existing['sources'], dict): + existing['sources'] = {} + + added: List[str] = [] + merged_sources = dict(existing['sources']) + for name, entry in incoming_sources.items(): + if name in merged_sources: + continue + merged_sources[name] = entry + added.append(name) + + if not added: + return None # 変化なし + + existing['sources'] = merged_sources + new_bytes = yaml.safe_dump( + existing, default_flow_style=False, allow_unicode=True + ).encode('utf-8') + return _Plan( + target=target, + arcname='env/sources.yml', + new_bytes=new_bytes, + added_keys=sorted(added), + overwritten_keys=[], + skipped_keys=[], + op='sources-merge', + ) + + +def _make_backup_dir(devbase_root: Path, opts: ImportOptions) -> Path: + if opts.backup_dir: + base = Path(opts.backup_dir).expanduser() + else: + base = devbase_root / 'backups' / 'env-import' + ts = datetime.now().strftime('%Y%m%d-%H%M%S') + path = base / ts + path.mkdir(parents=True, exist_ok=True) + return path + + +def _backup_existing(plans: Sequence[_Plan], sources_copy: Optional[Tuple[Path, bytes]], + backup_dir: Path, devbase_root: Path) -> None: + """phase 1 前に既存ファイルの内容を backup_dir にコピーする""" + for plan in plans: + if not plan.target.exists(): + continue + try: + relative = plan.target.relative_to(devbase_root) + except ValueError: + relative = Path(plan.target.name) + dst = backup_dir / relative + dst.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(plan.target, dst) + + # バンドルに含まれていた sources.yml の参照用コピー (上書きしないケース) + if sources_copy is not None: + target, data = sources_copy + dst = backup_dir / 'sources.yml.imported' + dst.parent.mkdir(parents=True, exist_ok=True) + dst.write_bytes(data) + try: + os.chmod(dst, 0o600) + except OSError: + pass + + +def _write_atomic(plan: _Plan) -> Path: + """phase 1: 新内容を .import.tmp として書き出す (0600)""" + tmp = plan.target.with_suffix(plan.target.suffix + '.import.tmp') + tmp.parent.mkdir(parents=True, exist_ok=True) + if tmp.exists(): + # 過去の失敗の残骸を掃除 + try: + tmp.unlink() + except OSError: + pass + flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC + fd = os.open(tmp, flags, 0o600) + try: + with os.fdopen(fd, 'wb') as f: + f.write(plan.new_bytes) + except BaseException: + try: + os.close(fd) + except OSError: + pass + raise + try: + os.chmod(tmp, 0o600) + except OSError: + pass + return tmp + + +def _commit(plans_and_tmps: List[Tuple[_Plan, Path]], backup_dir: Path, + devbase_root: Path) -> List[Path]: + """phase 2: tmp → target に rename。途中失敗時は best-effort で rollback""" + committed: List[Path] = [] + try: + for plan, tmp in plans_and_tmps: + os.replace(tmp, plan.target) + try: + os.chmod(plan.target, 0o600) + except OSError: + pass + committed.append(plan.target) + except OSError as e: + logger.error("commit フェーズで失敗しました: %s", e) + _rollback(committed, backup_dir, devbase_root) + raise ImportError(f"commit フェーズで失敗しました: {e}") from e + return committed + + +def _rollback(committed: Sequence[Path], backup_dir: Path, + devbase_root: Path) -> None: + """best-effort ロールバック: backup_dir から各 target を復元する""" + for target in committed: + try: + relative = target.relative_to(devbase_root) + except ValueError: + relative = Path(target.name) + src = backup_dir / relative + if src.exists(): + try: + shutil.copy2(src, target) + logger.warning("rollback: %s を %s から復元しました", target, src) + except OSError as e: + logger.error("rollback 失敗: %s -> %s: %s", src, target, e) + else: + logger.error( + "rollback 用 backup が見つかりません: %s " + "(元ファイルが存在しなかった可能性があります)", + src, + ) + + +def _cleanup_tmps(tmps: Sequence[Path]) -> None: + for tmp in tmps: + try: + if tmp.exists(): + tmp.unlink() + except OSError: + pass + + +def _gc_backups(backup_dir: Path, keep_last: int) -> None: + """backup_dir の親ディレクトリ内の古い backup を keep_last 個まで残して GC する""" + if keep_last <= 0: + return + parent = backup_dir.parent + if not parent.is_dir(): + return + siblings = sorted( + (p for p in parent.iterdir() if p.is_dir()), + key=lambda p: p.name, + ) + if len(siblings) <= keep_last: + return + to_remove = siblings[:-keep_last] + for d in to_remove: + try: + shutil.rmtree(d) + logger.info("古い backup を削除しました: %s", d) + except OSError as e: + logger.warning("backup 削除に失敗 (%s): %s", d, e) + + +def _log_plans(plans: Sequence[_Plan], dry_run: bool) -> None: + prefix = "[dry-run] " if dry_run else "" + for plan in plans: + logger.info( + "%s%s: %s (+%d add / ~%d overwrite / -%d skip)", + prefix, plan.op, plan.target, + len(plan.added_keys), len(plan.overwritten_keys), len(plan.skipped_keys), + ) + if plan.added_keys: + logger.info(" added: %s", ", ".join(plan.added_keys)) + if plan.overwritten_keys: + logger.info(" overwrite: %s", ", ".join(plan.overwritten_keys)) + if plan.skipped_keys: + logger.info(" skip (existing kept): %s", ", ".join(plan.skipped_keys)) + + +def import_bundle(devbase_root: Path, opts: ImportOptions) -> int: + """import 本体。CLI ハンドラから呼ばれる""" + # 引数の早期検証 + if opts.merge not in _MERGE_MODES: + raise ImportError( + f"--merge の値が不正です: {opts.merge!r} (許可: {', '.join(_MERGE_MODES)})" + ) + if opts.replace and opts.replace_keys: + raise ImportError("--replace と --replace-keys は併用できません") + if opts.passphrase_stdin and opts.source == '-': + raise ImportError( + "SOURCE='-' (stdin) と --passphrase-stdin は併用できません " + "(stdin が衝突します)" + ) + if opts.passphrase_env and opts.passphrase_stdin: + raise ImportError("--passphrase-env と --passphrase-stdin は併用できません") + + # SOURCE 読み込み + backend = _storage.resolve(opts.source) + blob = backend.read_bytes(opts.source) + logger.debug("読み込みサイズ: %d bytes", len(blob)) + + # 復号 (必要なら) + 展開 + manifest 検証 (sha256 / version) + tar_blob = _decrypt_if_needed(blob, opts) + manifest, members = _bundle.unpack(tar_blob) + logger.info("バンドル version=%s, 生成=%s, devbase=%s", + manifest.get('version'), manifest.get('created_at'), + manifest.get('devbase_version')) + + filtered = _filter_members(members, opts) + if not filtered: + raise ImportError( + "import 対象がありません " + "(--no-global / --include-project の指定とバンドル内容を確認してください)" + ) + + # 計画作成 + plans: List[_Plan] = [] + sources_reference: Optional[Tuple[Path, bytes]] = None + for arcname, data in sorted(filtered.items()): + target = _target_for(arcname, devbase_root) + if arcname == 'env/sources.yml': + plan = _plan_sources(target, data, opts) + if plan is not None: + plans.append(plan) + else: + # 既定動作: 上書きしないので参照用 copy のみバックアップする + sources_reference = (target, data) + else: + plans.append(_plan_env_merge(target, data, opts, arcname)) + + _log_plans(plans, opts.dry_run) + if sources_reference is not None and not opts.merge_metadata: + logger.info( + "%ssources.yml は上書きしません (--merge-metadata 指定時のみ更新, " + "参照用コピーを backup ディレクトリに残します)", + "[dry-run] " if opts.dry_run else "", + ) + + if opts.dry_run: + logger.info("[dry-run] 書き込みは行いません") + return 0 + + if not plans and sources_reference is None: + logger.info("変更はありません") + return 0 + + # backup → phase 1 (tmp 書き出し) → phase 2 (rename) + backup_dir = _make_backup_dir(devbase_root, opts) + logger.info("backup ディレクトリ: %s", backup_dir) + _backup_existing(plans, sources_reference, backup_dir, devbase_root) + + tmps: List[Path] = [] + plans_and_tmps: List[Tuple[_Plan, Path]] = [] + try: + for plan in plans: + tmp = _write_atomic(plan) + tmps.append(tmp) + plans_and_tmps.append((plan, tmp)) + except Exception: + _cleanup_tmps(tmps) + raise + + _commit(plans_and_tmps, backup_dir, devbase_root) + logger.info("import 完了: %d ファイル更新", len(plans)) + + _gc_backups(backup_dir, opts.keep_last) + return 0 diff --git a/tests/cli/test_env_import.py b/tests/cli/test_env_import.py new file mode 100644 index 0000000..a5506f0 --- /dev/null +++ b/tests/cli/test_env_import.py @@ -0,0 +1,353 @@ +"""devbase env import の統合テスト (擬似 DEVBASE_ROOT で round-trip / merge / replace / dry-run)""" + +from __future__ import annotations + +import io +import os +from pathlib import Path +from typing import Tuple + +import pyrage +import pytest + +from devbase.env import bundle, cipher +from devbase.env.io_export import ExportOptions, export +from devbase.env.io_import import ( + ImportError as ImportBundleError, + ImportOptions, + import_bundle, +) + + +@pytest.fixture +def fake_root(tmp_path): + """export 用の擬似 DEVBASE_ROOT (PR1 と同じ構造)""" + root = tmp_path / "src-root" + (root / "projects" / "alpha").mkdir(parents=True) + (root / "projects" / "beta").mkdir(parents=True) + (root / ".env").write_text("AWS_CONFIG_BASE64=AAAA\nGLOBAL=1\n") + (root / ".env.sources.yml").write_text( + "sources:\n aws:\n type: tar_base64\n hash: deadbeef\n" + ) + (root / "projects" / "alpha" / ".env").write_text("ALPHA_API_KEY=xyz\n") + (root / "projects" / "beta" / ".env").write_text("BETA_DB_PASSWORD=p\n") + return root + + +@pytest.fixture +def dest_root(tmp_path): + """import 先の擬似 DEVBASE_ROOT (空 or 既存ファイルあり)""" + root = tmp_path / "dst-root" + (root / "projects" / "alpha").mkdir(parents=True) + (root / "projects" / "beta").mkdir(parents=True) + return root + + +@pytest.fixture +def age_keys(tmp_path): + identity = pyrage.x25519.Identity.generate() + pub_file = tmp_path / "age.pub" + pub_file.write_text(str(identity.to_public()) + "\n") + id_file = tmp_path / "age.key" + id_file.write_text(str(identity)) + return pub_file, id_file + + +def _export_bundle(fake_root: Path, age_keys: Tuple[Path, Path], + tmp_path: Path) -> Path: + pub_file, _ = age_keys + dest = tmp_path / "out.dbenv" + rc = export(fake_root, ExportOptions( + dest=str(dest), recipients=[f"@{pub_file}"])) + assert rc == 0 + return dest + + +def test_import_roundtrip_creates_files_with_0600(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)])) + assert rc == 0 + + # global と各 project の .env が復元されている + assert (dest_root / ".env").read_text() == "AWS_CONFIG_BASE64=AAAA\nGLOBAL=1\n" + assert (dest_root / "projects" / "alpha" / ".env").read_text() == "ALPHA_API_KEY=xyz\n" + assert (dest_root / "projects" / "beta" / ".env").read_text() == "BETA_DB_PASSWORD=p\n" + + # パーミッションが 0600 + assert (dest_root / ".env").stat().st_mode & 0o777 == 0o600 + assert (dest_root / "projects" / "alpha" / ".env").stat().st_mode & 0o777 == 0o600 + + # sources.yml は既定では上書きしないので存在しない + assert not (dest_root / ".env.sources.yml").exists() + # backup ディレクトリに参照用 sources.yml.imported が残る + backup_root = dest_root / "backups" / "env-import" + assert backup_root.is_dir() + sub = next(backup_root.iterdir()) + assert (sub / "sources.yml.imported").exists() + + +def test_import_dry_run_does_not_modify(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + # 既存ファイルを置く + (dest_root / ".env").write_text("EXISTING=keep\n") + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], dry_run=True)) + assert rc == 0 + + # 元の .env は変更されていない + assert (dest_root / ".env").read_text() == "EXISTING=keep\n" + # backup も作られない + assert not (dest_root / "backups").exists() + + +def test_import_keep_existing_only_adds_new_keys(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + (dest_root / ".env").write_text("AWS_CONFIG_BASE64=OLD\nKEEP=this\n") + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)])) + assert rc == 0 + + text = (dest_root / ".env").read_text() + # 既存の AWS_CONFIG_BASE64 は OLD のまま (keep-existing) + assert "AWS_CONFIG_BASE64=OLD" in text + # 新規キー GLOBAL=1 は追加される + assert "GLOBAL=1" in text + # 既存キー KEEP は残る + assert "KEEP=this" in text + + +def test_import_prefer_incoming_overwrites_existing(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + (dest_root / ".env").write_text("AWS_CONFIG_BASE64=OLD\nKEEP=this\n") + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + merge='prefer-incoming')) + assert rc == 0 + + text = (dest_root / ".env").read_text() + # バンドル側で上書きされる + assert "AWS_CONFIG_BASE64=AAAA" in text + # incoming に無い既存キーは残る + assert "KEEP=this" in text + + +def test_import_replace_keys_only_overwrites_specified(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + (dest_root / ".env").write_text("AWS_CONFIG_BASE64=OLD\nGLOBAL=KEEP\n") + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + replace_keys=['AWS_CONFIG_BASE64'])) + assert rc == 0 + + text = (dest_root / ".env").read_text() + assert "AWS_CONFIG_BASE64=AAAA" in text # 上書きされる + assert "GLOBAL=KEEP" in text # 指定外なので keep + + +def test_import_replace_takes_backup_and_replaces(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + (dest_root / ".env").write_text("OLD=value\n") + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], replace=True)) + assert rc == 0 + + text = (dest_root / ".env").read_text() + assert "AWS_CONFIG_BASE64=AAAA" in text + assert "GLOBAL=1" in text + assert "OLD=value" not in text # 完全に置き換わる + + backup_root = dest_root / "backups" / "env-import" + sub = next(backup_root.iterdir()) + assert (sub / ".env").read_text() == "OLD=value\n" + + +def test_import_rejects_replace_with_replace_keys(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + with pytest.raises(ImportBundleError, match="--replace と --replace-keys"): + import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + replace=True, replace_keys=['A'])) + + +def test_import_rejects_stdin_with_passphrase_stdin(dest_root): + with pytest.raises(ImportBundleError, match="SOURCE='-'"): + import_bundle(dest_root, ImportOptions( + source='-', passphrase_stdin=True)) + + +def test_import_rejects_both_passphrase_env_and_stdin(dest_root): + with pytest.raises(ImportBundleError, match="--passphrase-env"): + import_bundle(dest_root, ImportOptions( + source='/dev/null', passphrase_env='X', passphrase_stdin=True)) + + +def test_import_rejects_unknown_manifest_version(fake_root, dest_root, age_keys, tmp_path): + """manifest.version が SUPPORTED_MANIFEST_VERSION より大きいバンドルは拒否される""" + import gzip + import io as _io + import tarfile + import yaml + + pub_file, id_file = age_keys + # 通常 export + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + # 復号して中身を書き換えてから age で再暗号化する + plain = cipher.decrypt(bundle_path.read_bytes(), identities=[str(id_file)]) + + # tar.gz を再構築して manifest.version=999 にする + buf_in = _io.BytesIO(plain) + tin = tarfile.open(fileobj=buf_in, mode='r:gz') + out = _io.BytesIO() + with gzip.GzipFile(fileobj=out, mode='wb', mtime=0) as gz: + with tarfile.open(fileobj=gz, mode='w', format=tarfile.PAX_FORMAT) as tout: + for info in tin.getmembers(): + data = tin.extractfile(info).read() + if info.name == bundle.MANIFEST_NAME: + manifest = yaml.safe_load(data) + manifest['version'] = 999 + data = yaml.safe_dump(manifest, sort_keys=False).encode('utf-8') + ti = tarfile.TarInfo(name=info.name) + ti.size = len(data) + ti.mtime = 0 + ti.mode = 0o600 + tout.addfile(ti, _io.BytesIO(data)) + tin.close() + + bad_plain = out.getvalue() + bad = pyrage.encrypt(bad_plain, + [pyrage.ssh.Recipient.from_str(pub_file.read_text().strip())] + if pub_file.read_text().strip().startswith('ssh-') + else [pyrage.x25519.Recipient.from_str(pub_file.read_text().strip())]) + bad_path = tmp_path / "bad.dbenv" + bad_path.write_bytes(bad) + + with pytest.raises(bundle.BundleError, match="サポートされていません"): + import_bundle(dest_root, ImportOptions( + source=str(bad_path), identities=[str(id_file)])) + + +def test_import_preserves_lf_line_endings(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + # CRLF を排除した想定: export → import で LF が保持されること + (fake_root / ".env").write_text("A=1\nB=2\n") + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)])) + assert rc == 0 + raw = (dest_root / ".env").read_bytes() + assert b'\r' not in raw + assert raw.endswith(b'\n') + + +def test_import_keep_last_gc_removes_old_backups(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + backup_root = dest_root / "backups" / "env-import" + # 既存の古い backup を 5 個事前作成する (タイムスタンプ命名規則に合わせる) + backup_root.mkdir(parents=True) + for i in range(5): + (backup_root / f"20260101-00000{i}").mkdir() + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], keep_last=3)) + assert rc == 0 + + remaining = sorted(p.name for p in backup_root.iterdir()) + assert len(remaining) == 3 + # 最新 3 個に絞られる: 既存の 20260101-000003, 000004, 加えて新規 backup + assert remaining[-1].startswith('20') + + +def test_import_include_project_filter(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + include_projects=['alpha'])) + assert rc == 0 + assert (dest_root / "projects" / "alpha" / ".env").exists() + assert not (dest_root / "projects" / "beta" / ".env").exists() + + +def test_import_plaintext_bundle(fake_root, dest_root, tmp_path): + """--force-unencrypted で出力した平文 tar.gz もそのまま import できる""" + dest = tmp_path / "out.dbenv.tar.gz" + rc = export(fake_root, ExportOptions(dest=str(dest), force_unencrypted=True)) + assert rc == 0 + + rc = import_bundle(dest_root, ImportOptions(source=str(dest))) + assert rc == 0 + assert (dest_root / ".env").exists() + + +def test_import_merge_metadata_adds_only_new_sources(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + # 既存 sources.yml を用意 (aws のみ。bundle 側も aws を持つ) + (dest_root / ".env.sources.yml").write_text( + "sources:\n aws:\n type: tar_base64\n hash: existinghash\n" + ) + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + merge_metadata=True)) + assert rc == 0 + + import yaml as _yaml + data = _yaml.safe_load((dest_root / ".env.sources.yml").read_text()) + # 既存 aws は維持される (hash=existinghash のまま) + assert data['sources']['aws']['hash'] == 'existinghash' + + +def test_import_no_metadata_skips_sources_yml(fake_root, dest_root, age_keys, tmp_path): + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + include_metadata=False)) + assert rc == 0 + # 参照用コピーも作られない (filter で除外されるため) + backup_root = dest_root / "backups" / "env-import" + sub = next(backup_root.iterdir()) + assert not (sub / "sources.yml.imported").exists() + + +def test_import_passphrase_env_roundtrip(fake_root, dest_root, tmp_path, monkeypatch): + dest = tmp_path / "out.dbenv" + monkeypatch.setenv("DEVBASE_TEST_PASS", "s3cr3t") + rc = export(fake_root, ExportOptions( + dest=str(dest), passphrase_env="DEVBASE_TEST_PASS")) + assert rc == 0 + + rc = import_bundle(dest_root, ImportOptions( + source=str(dest), passphrase_env="DEVBASE_TEST_PASS")) + assert rc == 0 + assert (dest_root / ".env").exists() From 62363d72250a5d3923dc49db6536b0cfc60800b0 Mon Sep 17 00:00:00 2001 From: "takemi.ohama" Date: Thu, 21 May 2026 21:04:09 +0900 Subject: [PATCH 03/10] =?UTF-8?q?fix(env):=20import=20=E3=81=AE=20merge/ro?= =?UTF-8?q?llback/GC=20=E5=AE=89=E5=85=A8=E6=80=A7=E3=82=92=E6=94=B9?= =?UTF-8?q?=E5=96=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit cross-review round 1 で指摘されたデータ損失・部分適用・merge 不整合を修正: - --replace-keys 指定外でも既存に無い incoming 新規キーは追加するように修正 (CLI help の "other keys behave like keep-existing" に整合) - _rollback で op='create' なターゲットは backup が無くても unlink し、 commit 途中失敗時の部分適用残骸を消す - _gc_backups は devbase が生成する timestamp 形式 (YYYYMMDD-HHMMSS) の ディレクトリのみを削除対象にし、--backup-dir 親に置かれた無関係なファイル /ディレクトリを誤って消さないようにする - EnvFile.parse_bytes(data) を新設し、io_import の bytes パースを 一時ファイル経由から直接パースに置き換え (I/O 削減 + 例外安全) - 上記 3 件分の回帰テストを追加 Refs: codex review #4336666744 / gemini review #4336672519 --- lib/devbase/env/io_import.py | 69 ++++++++++++++++----------- lib/devbase/env/store.py | 45 +++++++++--------- tests/cli/test_env_import.py | 91 ++++++++++++++++++++++++++++++++++++ 3 files changed, 154 insertions(+), 51 deletions(-) diff --git a/lib/devbase/env/io_import.py b/lib/devbase/env/io_import.py index 6d6f975..852f025 100644 --- a/lib/devbase/env/io_import.py +++ b/lib/devbase/env/io_import.py @@ -40,6 +40,11 @@ _MERGE_MODES = ('keep-existing', 'prefer-incoming') +# _make_backup_dir が生成するタイムスタンプ形式 (YYYYMMDD-HHMMSS) のみを +# GC 対象にする。これ以外のディレクトリは devbase が作ったものではないので +# 削除しない (--backup-dir 親に無関係なディレクトリがあっても安全)。 +_BACKUP_DIR_NAME_RE = re.compile(r'^\d{8}-\d{6}$') + class ImportError(DevbaseError): """import エラー""" @@ -179,22 +184,8 @@ def _target_for(arcname: str, devbase_root: Path) -> Path: def _parse_env_bytes(data: bytes) -> Dict[str, str]: - """EnvFile と同じ規則で bytes をパースする (一時ファイル経由で再利用)""" - import tempfile - with tempfile.NamedTemporaryFile( - prefix='devbase-env-import-', suffix='.env', delete=False, mode='wb' - ) as tmp: - tmp.write(data) - tmp_path = Path(tmp.name) - try: - ef = EnvFile(tmp_path) - ef.load() - return ef.get_all() - finally: - try: - tmp_path.unlink() - except OSError: - pass + """EnvFile と同じ規則で bytes をパースする (一時ファイル不要)""" + return EnvFile.parse_bytes(data) def _format_env_bytes(data: Dict[str, str]) -> bytes: @@ -256,9 +247,15 @@ def _plan_env_merge(target: Path, incoming_bytes: bytes, added.append(key) merged[key] = value else: - # --replace-keys 指定外のキーは既存を優先 (= keep-existing 相当) - if key not in existing: - skipped.append(key) + # --replace-keys 指定外のキーは keep-existing 相当: + # 既存にあれば残し、無ければ新規追加 (skipped は overwrite を + # 抑止した = 上書きしなかったキーのみ)。 + if key in existing: + if existing[key] != value: + skipped.append(key) + else: + added.append(key) + merged[key] = value return _Plan( target=target, arcname=arcname, @@ -443,7 +440,7 @@ def _write_atomic(plan: _Plan) -> Path: def _commit(plans_and_tmps: List[Tuple[_Plan, Path]], backup_dir: Path, devbase_root: Path) -> List[Path]: """phase 2: tmp → target に rename。途中失敗時は best-effort で rollback""" - committed: List[Path] = [] + committed: List[Tuple[_Plan, Path]] = [] try: for plan, tmp in plans_and_tmps: os.replace(tmp, plan.target) @@ -451,18 +448,21 @@ def _commit(plans_and_tmps: List[Tuple[_Plan, Path]], backup_dir: Path, os.chmod(plan.target, 0o600) except OSError: pass - committed.append(plan.target) + committed.append((plan, plan.target)) except OSError as e: logger.error("commit フェーズで失敗しました: %s", e) _rollback(committed, backup_dir, devbase_root) raise ImportError(f"commit フェーズで失敗しました: {e}") from e - return committed + return [t for _, t in committed] -def _rollback(committed: Sequence[Path], backup_dir: Path, +def _rollback(committed: Sequence[Tuple[_Plan, Path]], backup_dir: Path, devbase_root: Path) -> None: - """best-effort ロールバック: backup_dir から各 target を復元する""" - for target in committed: + """best-effort ロールバック: + - 既存上書き (backup あり) → backup から復元 + - 新規作成 (op='create', backup なし) → unlink して元の「不在」状態に戻す + """ + for plan, target in committed: try: relative = target.relative_to(devbase_root) except ValueError: @@ -474,6 +474,15 @@ def _rollback(committed: Sequence[Path], backup_dir: Path, logger.warning("rollback: %s を %s から復元しました", target, src) except OSError as e: logger.error("rollback 失敗: %s -> %s: %s", src, target, e) + elif plan.op == 'create': + # 元ファイルが無かった新規作成 → 削除して元の状態に戻す + try: + target.unlink() + logger.warning("rollback: 新規作成された %s を削除しました", target) + except FileNotFoundError: + pass + except OSError as e: + logger.error("rollback unlink 失敗: %s: %s", target, e) else: logger.error( "rollback 用 backup が見つかりません: %s " @@ -492,14 +501,20 @@ def _cleanup_tmps(tmps: Sequence[Path]) -> None: def _gc_backups(backup_dir: Path, keep_last: int) -> None: - """backup_dir の親ディレクトリ内の古い backup を keep_last 個まで残して GC する""" + """backup_dir の親ディレクトリ内の古い backup を keep_last 個まで残して GC する。 + + 安全性のため、削除対象は devbase が生成するタイムスタンプ形式 + (YYYYMMDD-HHMMSS) のディレクトリに限定する。--backup-dir で指定された + 親ディレクトリに無関係なファイル/ディレクトリがあっても、それらは触らない。 + """ if keep_last <= 0: return parent = backup_dir.parent if not parent.is_dir(): return siblings = sorted( - (p for p in parent.iterdir() if p.is_dir()), + (p for p in parent.iterdir() + if p.is_dir() and _BACKUP_DIR_NAME_RE.match(p.name)), key=lambda p: p.name, ) if len(siblings) <= keep_last: diff --git a/lib/devbase/env/store.py b/lib/devbase/env/store.py index 057fe0e..7d62db5 100644 --- a/lib/devbase/env/store.py +++ b/lib/devbase/env/store.py @@ -59,33 +59,30 @@ def __init__(self, file_path: Union[str, Path]): self._data: Dict[str, str] = {} self._loaded = False + @staticmethod + def parse_bytes(data: bytes) -> Dict[str, str]: + """bytes 列を load と同じ規則でパースして dict を返す (ファイル不要)""" + result: Dict[str, str] = {} + for raw_line in data.decode('utf-8').splitlines(): + line = raw_line.strip() + if not line or line.startswith('#'): + continue + if '=' not in line: + continue + key, _, value = line.partition('=') + key = key.strip() + value = value.strip() + if value and value[0] == value[-1] and value[0] in ('"', "'"): + value = value[1:-1] + result[key] = value + return result + def load(self) -> Dict[str, str]: """ファイルを読み込みkey=valueをパースする""" - self._data = {} - if not self.file_path.exists(): - self._loaded = True - return self._data - - with open(self.file_path, 'r', encoding='utf-8') as f: - for line in f: - line = line.strip() - - if not line or line.startswith('#'): - continue - - if '=' not in line: - continue - - key, _, value = line.partition('=') - key = key.strip() - value = value.strip() - - if value and value[0] == value[-1] and value[0] in ('"', "'"): - value = value[1:-1] - - self._data[key] = value - + self._data = {} + else: + self._data = self.parse_bytes(self.file_path.read_bytes()) self._loaded = True return self._data diff --git a/tests/cli/test_env_import.py b/tests/cli/test_env_import.py index a5506f0..acfe28f 100644 --- a/tests/cli/test_env_import.py +++ b/tests/cli/test_env_import.py @@ -340,6 +340,97 @@ def test_import_no_metadata_skips_sources_yml(fake_root, dest_root, age_keys, tm assert not (sub / "sources.yml.imported").exists() +def test_import_replace_keys_adds_unspecified_new_keys(fake_root, dest_root, age_keys, tmp_path): + """--replace-keys 指定外でも、既存ファイルに無い incoming キーは追加される + (CLI help 'other keys behave like keep-existing' に整合)""" + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + # 既存は AWS_CONFIG_BASE64 のみ。incoming は AWS_CONFIG_BASE64 + GLOBAL=1 + (dest_root / ".env").write_text("AWS_CONFIG_BASE64=OLD\n") + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + replace_keys=['AWS_CONFIG_BASE64'])) + assert rc == 0 + + text = (dest_root / ".env").read_text() + assert "AWS_CONFIG_BASE64=AAAA" in text # 指定キーは上書き + assert "GLOBAL=1" in text # 指定外でも既存に無い新規キーは追加される (keep-existing 相当) + + +def test_rollback_unlinks_newly_created_files_on_commit_failure( + fake_root, dest_root, age_keys, tmp_path, monkeypatch): + """commit フェーズ途中失敗時、元ファイル不在で新規作成された target は unlink され、 + 部分適用状態が残らないこと""" + from devbase.env import io_import as _io_import + + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + # dest には元ファイルが一切無い (= 全 plan op='create') + assert not (dest_root / ".env").exists() + assert not (dest_root / "projects" / "alpha" / ".env").exists() + + # 2 つ目以降の os.replace で失敗させる + original_replace = os.replace + call_count = {'n': 0} + + def failing_replace(src, dst): + call_count['n'] += 1 + if call_count['n'] >= 2: + raise OSError("simulated commit failure") + return original_replace(src, dst) + + monkeypatch.setattr(_io_import.os, 'replace', failing_replace) + + with pytest.raises(_io_import.ImportError, match="commit フェーズで失敗"): + import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)])) + + # rollback で新規作成 (op='create') の .env は削除されていること + assert not (dest_root / ".env").exists() + # まだ commit されていない target ももちろん存在しない + assert not (dest_root / "projects" / "beta" / ".env").exists() + + +def test_gc_backups_only_removes_timestamp_dirs(fake_root, dest_root, age_keys, tmp_path): + """--backup-dir 指定時でも、devbase が作った timestamp 形式以外のディレクトリは + GC で削除されない""" + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + custom_backup_root = tmp_path / "user-backups" + custom_backup_root.mkdir() + # 関係ないディレクトリ + unrelated = custom_backup_root / "important-user-data" + unrelated.mkdir() + (unrelated / "keep.txt").write_text("must not be deleted") + # 関係ないファイル + unrelated_file = custom_backup_root / "readme.txt" + unrelated_file.write_text("must not be deleted") + # devbase 命名の古い backup を keep_last 超に置く + for i in range(5): + (custom_backup_root / f"20240101-00000{i}").mkdir() + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + backup_dir=str(custom_backup_root), keep_last=3)) + assert rc == 0 + + # 無関係なディレクトリ/ファイルは残る + assert unrelated.exists() + assert (unrelated / "keep.txt").exists() + assert unrelated_file.exists() + # timestamp 形式は keep_last=3 まで絞られる (新規 backup 含む) + timestamp_dirs = sorted( + p.name for p in custom_backup_root.iterdir() + if p.is_dir() and p.name not in ('important-user-data',) + ) + assert len(timestamp_dirs) == 3 + + def test_import_passphrase_env_roundtrip(fake_root, dest_root, tmp_path, monkeypatch): dest = tmp_path / "out.dbenv" monkeypatch.setenv("DEVBASE_TEST_PASS", "s3cr3t") From 61257371a5cd0eed67542637efc14428ee877892 Mon Sep 17 00:00:00 2001 From: "takemi.ohama" Date: Thu, 21 May 2026 21:18:48 +0900 Subject: [PATCH 04/10] =?UTF-8?q?fix(env):=20import=20=E3=81=AE=E4=BA=8C?= =?UTF-8?q?=E9=87=8D=E3=82=A8=E3=82=B9=E3=82=B1=E3=83=BC=E3=83=97=20/=20ro?= =?UTF-8?q?llback=20/=20tmp=20=E6=AE=8B=E9=AA=B8=20/=20completion=20?= =?UTF-8?q?=E3=82=92=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #15 round 2 のクロスレビュー指摘 (codex 3 件 + gemini 2 件) に対応。 * EnvFile.parse_bytes に double-quote 内 escape の逆変換 (state machine) を 追加し、save / _format_env_bytes との round-trip を成立させる。これにより backslash / quote / 改行を含む値が import → export を経て二重エスケープ される問題を解消する。 * _plan_env_merge の create パスでは _format_env_bytes による再シリアライズを 避け、incoming_bytes をそのまま採用する。バンドル側のバイト列を完全に 保持し、parse/format に潜む副作用を確実に排除する。 * _rollback で「backup が無い = 元ファイル不在」のケースを op に関係なく unlink するように変更。op='sources-merge' で sources.yml を新規作成した ロールバックでも残骸が残らなくなる。 * _commit 失敗時に、まだ rename されていない .import.tmp ファイルを try-finally で確実に削除する。 * _make_backup_dir に microsecond + 連番フォールバックを付与し、同一秒内に import が複数回走っても backup ディレクトリが衝突しない。 * etc/devbase-completion.bash と etc/_devbase の env サブコマンド一覧に import を追加し、各オプションも補完できるようにする。 テストでは parse_bytes round-trip / create 経路の byte preservation / sources.yml rollback unlink / tmp cleanup / backup 衝突回避を検証する。 Co-Authored-By: Claude Opus 4.7 (1M context) --- etc/_devbase | 19 ++++ etc/devbase-completion.bash | 7 +- lib/devbase/env/io_import.py | 92 ++++++++++++----- lib/devbase/env/store.py | 45 ++++++++- tests/cli/test_env_import.py | 186 +++++++++++++++++++++++++++++++++++ 5 files changed, 322 insertions(+), 27 deletions(-) diff --git a/etc/_devbase b/etc/_devbase index c5d0f14..b7944dd 100644 --- a/etc/_devbase +++ b/etc/_devbase @@ -74,6 +74,7 @@ _devbase() { 'edit:Open .env in editor' 'project:Setup project-specific variables' 'export:Export .env files as an encrypted bundle (age)' + 'import:Import .env bundle (age decrypt + merge)' ) plugin_subcommands=( @@ -163,6 +164,24 @@ _devbase() { '--passphrase-stdin[Read passphrase from stdin]' \ '--force-unencrypted[Write as plaintext tar.gz]' ;; + import) + _arguments \ + '1:source:_files' \ + '--merge[Merge strategy]:mode:(keep-existing prefer-incoming)' \ + '--replace-keys[Replace only these keys (comma-separated)]:keys:' \ + '--replace[Replace existing files entirely]' \ + '--dry-run[Preview changes without writing]' \ + '*--identity[age / OpenSSH private key file (repeatable)]:file:_files' \ + '--passphrase-env[Read passphrase from env var]:var:' \ + '--passphrase-stdin[Read passphrase from stdin]' \ + '*--include-project[Limit to specified project (repeatable)]:name:' \ + '*--exclude-project[Exclude project (repeatable)]:name:' \ + '--no-global[Do not import $DEVBASE_ROOT/.env]' \ + '--no-metadata[Do not import $DEVBASE_ROOT/.env.sources.yml]' \ + '--merge-metadata[Add only new sources entries from bundle]' \ + '--backup-dir[Override backup directory]:dir:_files -/' \ + '--keep-last[Keep only the last N backup directories]:n:' + ;; *) _describe -t env-commands 'env command' env_subcommands ;; diff --git a/etc/devbase-completion.bash b/etc/devbase-completion.bash index e7ef68c..81f6431 100644 --- a/etc/devbase-completion.bash +++ b/etc/devbase-completion.bash @@ -12,7 +12,7 @@ _devbase_completions() { local commands="init status shell-rc container ct env plugin pl snapshot ss up down login build ps help" local container_subcommands="up down ps login logs scale build" - local env_subcommands="init sync list set get delete edit project export" + local env_subcommands="init sync list set get delete edit project export import" local plugin_subcommands="list install uninstall update info sync repo" local repo_subcommands="add remove list refresh" local snapshot_subcommands="create list restore copy delete rotate" @@ -86,6 +86,11 @@ _devbase_completions() { COMPREPLY=($(compgen -W "--include-project --exclude-project --no-global --no-metadata --recipient --passphrase-env --passphrase-stdin --force-unencrypted" -- "$cur")) fi ;; + import) + if [[ "$cur" == -* ]]; then + COMPREPLY=($(compgen -W "--merge --replace-keys --replace --dry-run --identity --passphrase-env --passphrase-stdin --include-project --exclude-project --no-global --no-metadata --merge-metadata --backup-dir --keep-last" -- "$cur")) + fi + ;; esac fi # plugin subcommand arguments diff --git a/lib/devbase/env/io_import.py b/lib/devbase/env/io_import.py index 852f025..764825f 100644 --- a/lib/devbase/env/io_import.py +++ b/lib/devbase/env/io_import.py @@ -40,10 +40,14 @@ _MERGE_MODES = ('keep-existing', 'prefer-incoming') -# _make_backup_dir が生成するタイムスタンプ形式 (YYYYMMDD-HHMMSS) のみを -# GC 対象にする。これ以外のディレクトリは devbase が作ったものではないので -# 削除しない (--backup-dir 親に無関係なディレクトリがあっても安全)。 -_BACKUP_DIR_NAME_RE = re.compile(r'^\d{8}-\d{6}$') +# _make_backup_dir が生成するタイムスタンプ形式のみを GC 対象にする。 +# 以下のいずれかにマッチするディレクトリのみ削除する: +# YYYYMMDD-HHMMSS (旧フォーマット, 後方互換) +# YYYYMMDD-HHMMSS-NNNNNN (microsecond 付き) +# YYYYMMDD-HHMMSS-NNNNNN-NN (同一マイクロ秒内の連番付き) +# これ以外のディレクトリは devbase が作ったものではないので削除しない +# (--backup-dir 親に無関係なディレクトリがあっても安全)。 +_BACKUP_DIR_NAME_RE = re.compile(r'^\d{8}-\d{6}(?:-\d{6}(?:-\d+)?)?$') class ImportError(DevbaseError): @@ -211,7 +215,13 @@ def _format_env_bytes(data: Dict[str, str]) -> bytes: def _plan_env_merge(target: Path, incoming_bytes: bytes, opts: ImportOptions, arcname: str) -> _Plan: - """1 つの .env に対する merge / replace 計画を作る""" + """1 つの .env に対する merge / replace 計画を作る + + 既存ファイルが無い (= create) ケースでは、バンドル側の ``incoming_bytes`` を + そのまま採用する。``_format_env_bytes`` で再シリアライズすると、export 側で + 既に escape された値を parse_bytes 経由でも完全に round-trip できる前提が + 崩れた瞬間に二重エスケープが発生するためである (PR #15 codex 指摘)。 + """ incoming = _parse_env_bytes(incoming_bytes) existing: Dict[str, str] = {} if target.exists(): @@ -256,10 +266,12 @@ def _plan_env_merge(target: Path, incoming_bytes: bytes, else: added.append(key) merged[key] = value + # 新規作成時は incoming_bytes をそのまま保持して二重エスケープを回避 + new_bytes = incoming_bytes if not existing else _format_env_bytes(merged) return _Plan( target=target, arcname=arcname, - new_bytes=_format_env_bytes(merged), + new_bytes=new_bytes, added_keys=sorted(added), overwritten_keys=sorted(overwritten), skipped_keys=sorted(skipped), @@ -276,10 +288,11 @@ def _plan_env_merge(target: Path, incoming_bytes: bytes, else: merged[key] = value added.append(key) + new_bytes = incoming_bytes if not existing else _format_env_bytes(merged) return _Plan( target=target, arcname=arcname, - new_bytes=_format_env_bytes(merged), + new_bytes=new_bytes, added_keys=sorted(added), overwritten_keys=[], skipped_keys=sorted(skipped), @@ -298,10 +311,11 @@ def _plan_env_merge(target: Path, incoming_bytes: bytes, else: merged[key] = value added.append(key) + new_bytes = incoming_bytes if not existing else _format_env_bytes(merged) return _Plan( target=target, arcname=arcname, - new_bytes=_format_env_bytes(merged), + new_bytes=new_bytes, added_keys=sorted(added), overwritten_keys=sorted(overwritten), skipped_keys=[], @@ -373,14 +387,33 @@ def _plan_sources(target: Path, incoming_bytes: bytes, def _make_backup_dir(devbase_root: Path, opts: ImportOptions) -> Path: + """バックアップディレクトリを作成する。 + + 秒精度のみだと同一秒に 2 回 import を走らせたときに同じディレクトリを再利用して + 前回バックアップを上書きしてしまうため、microsecond + 連番を付与して衝突を回避する + (PR #15 codex 指摘)。 + """ if opts.backup_dir: base = Path(opts.backup_dir).expanduser() else: base = devbase_root / 'backups' / 'env-import' - ts = datetime.now().strftime('%Y%m%d-%H%M%S') - path = base / ts - path.mkdir(parents=True, exist_ok=True) - return path + base.mkdir(parents=True, exist_ok=True) + + now = datetime.now() + stem = now.strftime('%Y%m%d-%H%M%S-%f') # microsecond まで + path = base / stem + if not path.exists(): + path.mkdir(parents=True) + return path + # 同一マイクロ秒に複数回走った場合の安全弁: 連番を付与 + for n in range(1, 1000): + candidate = base / f'{stem}-{n:02d}' + if not candidate.exists(): + candidate.mkdir(parents=True) + return candidate + raise ImportError( + f"backup ディレクトリの衝突回避に失敗しました (base={base}, stem={stem})" + ) def _backup_existing(plans: Sequence[_Plan], sources_copy: Optional[Tuple[Path, bytes]], @@ -439,19 +472,30 @@ def _write_atomic(plan: _Plan) -> Path: def _commit(plans_and_tmps: List[Tuple[_Plan, Path]], backup_dir: Path, devbase_root: Path) -> List[Path]: - """phase 2: tmp → target に rename。途中失敗時は best-effort で rollback""" + """phase 2: tmp → target に rename。 + + 途中失敗時は best-effort で rollback したうえで、まだ rename されていない + 残りの ``.import.tmp`` ファイルもクリーンアップする (PR #15 gemini 指摘)。 + """ committed: List[Tuple[_Plan, Path]] = [] + remaining_tmps = [tmp for _, tmp in plans_and_tmps] try: - for plan, tmp in plans_and_tmps: + for idx, (plan, tmp) in enumerate(plans_and_tmps): os.replace(tmp, plan.target) try: os.chmod(plan.target, 0o600) except OSError: pass committed.append((plan, plan.target)) + # rename 済みの tmp は残らないが、リストから除外して後続 cleanup を簡潔に + remaining_tmps[idx] = None # type: ignore[call-overload] except OSError as e: logger.error("commit フェーズで失敗しました: %s", e) - _rollback(committed, backup_dir, devbase_root) + try: + _rollback(committed, backup_dir, devbase_root) + finally: + # rename 前で残っている .import.tmp を後始末 + _cleanup_tmps([t for t in remaining_tmps if t is not None]) raise ImportError(f"commit フェーズで失敗しました: {e}") from e return [t for _, t in committed] @@ -460,7 +504,13 @@ def _rollback(committed: Sequence[Tuple[_Plan, Path]], backup_dir: Path, devbase_root: Path) -> None: """best-effort ロールバック: - 既存上書き (backup あり) → backup から復元 - - 新規作成 (op='create', backup なし) → unlink して元の「不在」状態に戻す + - backup が無いケース → 元ファイルが存在しなかった (= 新規作成) と + みなして unlink し、元の「不在」状態に戻す。``op='create'`` だけでなく + ``op='sources-merge'`` で sources.yml を新規作成したケースもここで + unlink する (PR #15 gemini 指摘)。 + + ``_backup_existing`` は target が存在した場合のみ backup を作る。よって + 「backup が無い」事実は「元ファイルが存在しなかった」ことを示している。 """ for plan, target in committed: try: @@ -474,8 +524,8 @@ def _rollback(committed: Sequence[Tuple[_Plan, Path]], backup_dir: Path, logger.warning("rollback: %s を %s から復元しました", target, src) except OSError as e: logger.error("rollback 失敗: %s -> %s: %s", src, target, e) - elif plan.op == 'create': - # 元ファイルが無かった新規作成 → 削除して元の状態に戻す + else: + # 元ファイル不在 → 新規作成された target を unlink して元の状態に戻す try: target.unlink() logger.warning("rollback: 新規作成された %s を削除しました", target) @@ -483,12 +533,6 @@ def _rollback(committed: Sequence[Tuple[_Plan, Path]], backup_dir: Path, pass except OSError as e: logger.error("rollback unlink 失敗: %s: %s", target, e) - else: - logger.error( - "rollback 用 backup が見つかりません: %s " - "(元ファイルが存在しなかった可能性があります)", - src, - ) def _cleanup_tmps(tmps: Sequence[Path]) -> None: diff --git a/lib/devbase/env/store.py b/lib/devbase/env/store.py index 7d62db5..d2284bd 100644 --- a/lib/devbase/env/store.py +++ b/lib/devbase/env/store.py @@ -61,7 +61,13 @@ def __init__(self, file_path: Union[str, Path]): @staticmethod def parse_bytes(data: bytes) -> Dict[str, str]: - """bytes 列を load と同じ規則でパースして dict を返す (ファイル不要)""" + """bytes 列を load と同じ規則でパースして dict を返す (ファイル不要) + + ``save`` / :func:`devbase.env.io_import._format_env_bytes` の inverse として + 振る舞うため、ダブルクオート内の ``\\\\`` / ``\\"`` / ``\\n`` を unescape する。 + formatter と round-trip 整合性が取れていないと、parse → format で + 二重エスケープが発生する (PR #15 codex 指摘)。 + """ result: Dict[str, str] = {} for raw_line in data.decode('utf-8').splitlines(): line = raw_line.strip() @@ -72,11 +78,46 @@ def parse_bytes(data: bytes) -> Dict[str, str]: key, _, value = line.partition('=') key = key.strip() value = value.strip() - if value and value[0] == value[-1] and value[0] in ('"', "'"): + if len(value) >= 2 and value[0] == value[-1] and value[0] in ('"', "'"): + quote = value[0] value = value[1:-1] + if quote == '"': + value = EnvFile._unescape_double_quoted(value) result[key] = value return result + @staticmethod + def _unescape_double_quoted(s: str) -> str: + """``save`` が double-quote 値に対して施した escape を 1 パスで戻す。 + + 単純な逐次 ``replace`` は ``"\\\\n"`` (リテラル ``\\\\`` + ``n``) と + ``"\\n"`` (改行) の区別が付かないため state machine で処理する。 + 未知のエスケープ文字 (``\\x`` 等) はバックスラッシュごとそのまま保持する。 + """ + out: list = [] + i = 0 + n = len(s) + while i < n: + c = s[i] + if c == '\\' and i + 1 < n: + nxt = s[i + 1] + if nxt == '\\': + out.append('\\') + i += 2 + elif nxt == '"': + out.append('"') + i += 2 + elif nxt == 'n': + out.append('\n') + i += 2 + else: + out.append(c) + i += 1 + else: + out.append(c) + i += 1 + return ''.join(out) + def load(self) -> Dict[str, str]: """ファイルを読み込みkey=valueをパースする""" if not self.file_path.exists(): diff --git a/tests/cli/test_env_import.py b/tests/cli/test_env_import.py index acfe28f..f18428b 100644 --- a/tests/cli/test_env_import.py +++ b/tests/cli/test_env_import.py @@ -442,3 +442,189 @@ def test_import_passphrase_env_roundtrip(fake_root, dest_root, tmp_path, monkeyp source=str(dest), passphrase_env="DEVBASE_TEST_PASS")) assert rc == 0 assert (dest_root / ".env").exists() + + +def test_import_preserves_escaped_values_no_double_escape( + dest_root, age_keys, tmp_path): + """値に backslash / quote / newline / spaces が含まれていても + export → import で二重エスケープされないことを保証する (PR #15 codex 指摘)""" + _, id_file = age_keys + pub_file, _ = age_keys + + # 特殊文字を含む .env を持つ source root を構築 + src_root = tmp_path / "esc-src" + (src_root / "projects" / "alpha").mkdir(parents=True) + raw_env = ( + 'BACKSLASH="a\\\\b"\n' # 値: a\b (3 chars) + 'QUOTE_IN_VALUE="he said \\"hi\\""\n' # 値: he said "hi" + 'WITH_NEWLINE="line1\\nline2"\n' # 値: line1line2 + 'WITH_SPACE="value with space"\n' # 値: value with space + 'PLAIN=simple\n' # 値: simple + ) + (src_root / ".env").write_text(raw_env) + (src_root / "projects" / "alpha" / ".env").write_text( + 'ALPHA_BACK="a\\\\b"\n' + ) + + bundle_path = tmp_path / "esc.dbenv" + rc = export(src_root, ExportOptions( + dest=str(bundle_path), recipients=[f"@{pub_file}"])) + assert rc == 0 + + # 新規作成 (dest 側に既存ファイル無し) + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)])) + assert rc == 0 + + # 新規作成時は incoming_bytes をそのまま使うので元バイト列と一致する + assert (dest_root / ".env").read_text() == raw_env + + # EnvFile から読んだ際に escape が正しく解釈されること (parse_bytes round-trip) + from devbase.env.store import EnvFile + parsed = EnvFile.parse_bytes((dest_root / ".env").read_bytes()) + assert parsed['BACKSLASH'] == 'a\\b' + assert parsed['QUOTE_IN_VALUE'] == 'he said "hi"' + assert parsed['WITH_NEWLINE'] == 'line1\nline2' + assert parsed['WITH_SPACE'] == 'value with space' + assert parsed['PLAIN'] == 'simple' + + +def test_import_merge_round_trips_escaped_values( + dest_root, age_keys, tmp_path): + """既存ファイルがあって merge する場合でも、parse → format の round-trip で + 値が壊れない (二重エスケープしない)""" + _, id_file = age_keys + pub_file, _ = age_keys + + src_root = tmp_path / "esc-src2" + (src_root / "projects" / "alpha").mkdir(parents=True) + (src_root / ".env").write_text('NEW_BACK="a\\\\b"\n') + + bundle_path = tmp_path / "esc2.dbenv" + rc = export(src_root, ExportOptions( + dest=str(bundle_path), recipients=[f"@{pub_file}"])) + assert rc == 0 + + # dest に既存ファイルを置く (merge 経路に入る) + (dest_root / ".env").write_text('EXISTING="x\\\\y"\n') + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + merge='prefer-incoming')) + assert rc == 0 + + from devbase.env.store import EnvFile + parsed = EnvFile.parse_bytes((dest_root / ".env").read_bytes()) + # 二重エスケープされていないので、parse 後の値は元の 3 文字 "a\\b" + assert parsed['NEW_BACK'] == 'a\\b' + assert parsed['EXISTING'] == 'x\\y' + + +def test_rollback_unlinks_newly_created_sources_yml( + fake_root, dest_root, age_keys, tmp_path, monkeypatch): + """sources.yml を --merge-metadata で新規作成中に commit 失敗すると、 + ロールバックで sources.yml が削除されること (PR #15 gemini 指摘)""" + from devbase.env import io_import as _io_import + + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + # dest には sources.yml が無い状態。--merge-metadata で新規作成パスに入る + assert not (dest_root / ".env.sources.yml").exists() + + # commit 中に sources.yml の rename だけ失敗させる (最後のファイル) + original_replace = os.replace + + def failing_replace(src, dst): + if str(dst).endswith('.env.sources.yml'): + raise OSError("simulated commit failure on sources.yml") + return original_replace(src, dst) + + monkeypatch.setattr(_io_import.os, 'replace', failing_replace) + + with pytest.raises(_io_import.ImportError, match="commit"): + import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + merge_metadata=True)) + + # sources.yml はもともと存在しなかったので、ロールバックで unlink されているはず + assert not (dest_root / ".env.sources.yml").exists() + + +def test_commit_failure_cleans_remaining_import_tmp_files( + fake_root, dest_root, age_keys, tmp_path, monkeypatch): + """_commit 失敗時に、まだ rename されていない .import.tmp ファイルが残らないこと + (PR #15 gemini 指摘)""" + from devbase.env import io_import as _io_import + + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + original_replace = os.replace + call_count = {'n': 0} + + def failing_replace(src, dst): + call_count['n'] += 1 + if call_count['n'] >= 2: + raise OSError("simulated commit failure") + return original_replace(src, dst) + + monkeypatch.setattr(_io_import.os, 'replace', failing_replace) + + with pytest.raises(_io_import.ImportError, match="commit"): + import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)])) + + # 残骸の .import.tmp ファイルが無いこと + leftover = list(dest_root.rglob('*.import.tmp')) + assert leftover == [], f"残骸の tmp が残っている: {leftover}" + + +def test_backup_dir_collision_avoidance(fake_root, dest_root, age_keys, tmp_path): + """同じプロセス内で連続して import を実行しても、backup ディレクトリ名が衝突せず + 前回バックアップを上書きしないこと (PR #15 codex 指摘)""" + _, id_file = age_keys + bundle_path = _export_bundle(fake_root, age_keys, tmp_path) + + # 1 回目 + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)])) + assert rc == 0 + # 2 回目 (同一プロセス内, おそらく同一秒) + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)])) + assert rc == 0 + + backup_root = dest_root / "backups" / "env-import" + subdirs = sorted(p.name for p in backup_root.iterdir() if p.is_dir()) + # 2 つの異なる backup ディレクトリが残っていること + assert len(subdirs) == 2, f"backup が衝突して 1 つになっている: {subdirs}" + + +def test_envfile_parse_bytes_round_trip_with_escapes(): + """``EnvFile.parse_bytes`` が ``save`` が施す escape を正しく逆変換すること + (PR #15 codex 指摘の double-escape 回避テスト)""" + from devbase.env.store import EnvFile + + # 直接 EnvFile.save と同じ規則で encode したものを parse_bytes で復元 + raw = ( + 'BACKSLASH="a\\\\b"\n' # a\b + 'QUOTED="he said \\"hi\\""\n' # he said "hi" + 'NL="x\\ny"\n' # xy + 'PLAIN=simple\n' + 'EMPTY=""\n' # empty string with quotes + ) + parsed = EnvFile.parse_bytes(raw.encode('utf-8')) + assert parsed['BACKSLASH'] == 'a\\b' + assert parsed['QUOTED'] == 'he said "hi"' + assert parsed['NL'] == 'x\ny' + assert parsed['PLAIN'] == 'simple' + assert parsed['EMPTY'] == '' + + # 「リテラル ``\\n``」(2 文字: backslash + 'n') を含む値も区別できること + # save は ``a\\nb`` (3 chars) を ``"a\\\\nb"`` に変換するので、これを parse_bytes + # に通せば元の 3 文字に戻る + raw2 = 'LITERAL="a\\\\nb"\n' + parsed2 = EnvFile.parse_bytes(raw2.encode('utf-8')) + assert parsed2['LITERAL'] == 'a\\nb' # backslash + 'n' + 'b' (3 chars) From fbfbe88080c979d8773d41a771d8933b5fdece50 Mon Sep 17 00:00:00 2001 From: "takemi.ohama" Date: Thu, 21 May 2026 21:30:19 +0900 Subject: [PATCH 05/10] =?UTF-8?q?fix(env):=20import=20=E3=81=AE=20merge=20?= =?UTF-8?q?=E3=81=A7=E3=82=B3=E3=83=A1=E3=83=B3=E3=83=88=E3=82=92=E4=BF=9D?= =?UTF-8?q?=E6=8C=81=E3=81=97=20$=20=E3=82=92=E3=82=A8=E3=82=B9=E3=82=B1?= =?UTF-8?q?=E3=83=BC=E3=83=97=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ``EnvFile.parse_entries`` を追加し、コメント / 空行 / kv を行単位で トークン化できるようにする (PR #15 gemini 指摘)。 - ``_merge_into_existing_bytes`` で既存 ``.env`` のコメント・空行・キー順を 保持したまま値を差し替えるよう merge 経路を変更。``EnvFile.dump_bytes`` への単純差し替えではコメントが失われていた。 - ダブルクオート値に含まれる ``$`` を ``\$`` にエスケープし、``parse_bytes`` 側でも ``\$`` を ``$`` に戻すように round-trip 対応 (シェル ``source`` 時の 意図しない変数展開を防止 / PR #15 gemini 指摘)。 - ``EnvFile.dump_bytes`` / ``EnvFile.dump_entries_bytes`` にフォーマット ロジックを集約し、``io_import._format_env_bytes`` を廃止。``EnvFile.save`` も ``dump_bytes`` 経由に統一して二重実装を解消 (PR #15 gemini 指摘)。 - 新規テスト: コメント保持マージ (prefer-incoming / keep-existing)、 ``$`` の round-trip、``EnvFile.dump_bytes`` のエスケープ仕様を追加。 --- lib/devbase/env/io_import.py | 74 ++++++++++------- lib/devbase/env/store.py | 127 ++++++++++++++++++++++++----- tests/cli/test_env_import.py | 151 +++++++++++++++++++++++++++++++++++ 3 files changed, 306 insertions(+), 46 deletions(-) diff --git a/lib/devbase/env/io_import.py b/lib/devbase/env/io_import.py index 764825f..9944dfe 100644 --- a/lib/devbase/env/io_import.py +++ b/lib/devbase/env/io_import.py @@ -30,7 +30,7 @@ from devbase.env import bundle as _bundle from devbase.env import cipher as _cipher from devbase.env import storage as _storage -from devbase.env.store import EnvFile +from devbase.env.store import EnvEntry, EnvFile logger = get_logger(__name__) @@ -187,30 +187,39 @@ def _target_for(arcname: str, devbase_root: Path) -> Path: raise ImportError(f"未対応のバンドルエントリ: {arcname}") -def _parse_env_bytes(data: bytes) -> Dict[str, str]: - """EnvFile と同じ規則で bytes をパースする (一時ファイル不要)""" - return EnvFile.parse_bytes(data) +def _merge_into_existing_bytes(existing_bytes: bytes, + merged: Dict[str, str]) -> bytes: + """既存 ``.env`` のコメント / 空行 / キー順を保持したまま、 + ``merged`` の内容で値を上書きしてバイト列化する。 + ``merged`` のうち既存ファイルに無いキーは末尾に追加する。 + 既存ファイルにあって ``merged`` から削除されているキーは、entries からも除外して + 出力する (現状の merge ロジック上、削除されるケースは無いが安全側で対応)。 -def _format_env_bytes(data: Dict[str, str]) -> bytes: - """EnvFile.save と同じフォーマットで dict をバイト列化する""" - lines: List[str] = [] - for key in sorted(data): - value = data[key] - needs_quote = ( - '\n' in value - or any(c in value for c in (' ', '"', "'", '$', '`', '\\', - '<', '>', '|', '&', ';', - '(', ')', '#')) - ) - if needs_quote: - quoted = (value.replace('\\', '\\\\') - .replace('"', '\\"') - .replace('\n', '\\n')) - lines.append(f'{key}="{quoted}"\n') + PR #15 gemini 指摘: ``EnvFile.dump_bytes`` で再シリアライズすると ``key=value`` + だけの出力になりコメント・空行が失われる。これを避けるため entries ベースで + 再構成する。 + """ + entries = EnvFile.parse_entries(existing_bytes) + seen: set = set() + out_entries: List[EnvEntry] = [] + for e in entries: + if e.kind == 'kv' and e.key is not None: + if e.key in merged: + # 値を merge 後のものに差し替え (key 順 / コメントは保持) + out_entries.append(EnvEntry( + kind='kv', raw=e.raw, key=e.key, value=merged[e.key] + )) + seen.add(e.key) + # merged から除外されているキーは entries からも落とす else: - lines.append(f'{key}={value}\n') - return ''.join(lines).encode('utf-8') + out_entries.append(e) + # 既存に無かった新規キーは末尾に append (定常的な key 順を維持するため sorted) + for key in sorted(k for k in merged if k not in seen): + out_entries.append(EnvEntry( + kind='kv', raw='', key=key, value=merged[key] + )) + return EnvFile.dump_entries_bytes(out_entries) def _plan_env_merge(target: Path, incoming_bytes: bytes, @@ -218,14 +227,20 @@ def _plan_env_merge(target: Path, incoming_bytes: bytes, """1 つの .env に対する merge / replace 計画を作る 既存ファイルが無い (= create) ケースでは、バンドル側の ``incoming_bytes`` を - そのまま採用する。``_format_env_bytes`` で再シリアライズすると、export 側で + そのまま採用する。``EnvFile.dump_bytes`` で再シリアライズすると、export 側で 既に escape された値を parse_bytes 経由でも完全に round-trip できる前提が 崩れた瞬間に二重エスケープが発生するためである (PR #15 codex 指摘)。 + + 既存ファイルが存在する merge 経路では ``_merge_into_existing_bytes`` で + 既存のコメント / 空行 / キー順を保持したまま値だけ差し替える + (PR #15 gemini 指摘)。 """ - incoming = _parse_env_bytes(incoming_bytes) + incoming = EnvFile.parse_bytes(incoming_bytes) existing: Dict[str, str] = {} + existing_bytes: bytes = b'' if target.exists(): - existing = _parse_env_bytes(target.read_bytes()) + existing_bytes = target.read_bytes() + existing = EnvFile.parse_bytes(existing_bytes) if opts.replace: added = sorted(set(incoming) - set(existing)) @@ -267,7 +282,8 @@ def _plan_env_merge(target: Path, incoming_bytes: bytes, added.append(key) merged[key] = value # 新規作成時は incoming_bytes をそのまま保持して二重エスケープを回避 - new_bytes = incoming_bytes if not existing else _format_env_bytes(merged) + new_bytes = (incoming_bytes if not existing + else _merge_into_existing_bytes(existing_bytes, merged)) return _Plan( target=target, arcname=arcname, @@ -288,7 +304,8 @@ def _plan_env_merge(target: Path, incoming_bytes: bytes, else: merged[key] = value added.append(key) - new_bytes = incoming_bytes if not existing else _format_env_bytes(merged) + new_bytes = (incoming_bytes if not existing + else _merge_into_existing_bytes(existing_bytes, merged)) return _Plan( target=target, arcname=arcname, @@ -311,7 +328,8 @@ def _plan_env_merge(target: Path, incoming_bytes: bytes, else: merged[key] = value added.append(key) - new_bytes = incoming_bytes if not existing else _format_env_bytes(merged) + new_bytes = (incoming_bytes if not existing + else _merge_into_existing_bytes(existing_bytes, merged)) return _Plan( target=target, arcname=arcname, diff --git a/lib/devbase/env/store.py b/lib/devbase/env/store.py index d2284bd..8e61fa8 100644 --- a/lib/devbase/env/store.py +++ b/lib/devbase/env/store.py @@ -1,9 +1,12 @@ """Environment variable file store""" +from __future__ import annotations + import os import shutil +from dataclasses import dataclass from pathlib import Path -from typing import Optional, Dict, Union +from typing import Optional, Dict, Union, List from devbase.log import get_logger @@ -49,6 +52,43 @@ def collect_key(env_file, key, *, auto_value=None, prompt=None, mask_after=10): return False +@dataclass +class EnvEntry: + """``.env`` ファイルの 1 行を表すトークン。 + + - ``kind='kv'`` のとき ``key`` / ``value`` が有効 (``raw`` は元の行全体) + - ``kind='comment'`` / ``kind='blank'`` のとき ``raw`` のみ有効 + + コメント・空行を保持してマージ出力するために使う (PR #15 gemini 指摘)。 + """ + kind: str # 'kv' | 'comment' | 'blank' + raw: str = '' + key: Optional[str] = None + value: Optional[str] = None + + +# ``EnvFile.dump_bytes`` / :meth:`EnvFile.save` で値を quote する閾値となる文字集合。 +# シェル ``source`` 時に展開・解釈されうる metachar をすべて含める。``$`` を含む値も +# ``\$`` にエスケープして出力するため、ここで quoting 対象として捕捉する +# (PR #15 gemini 指摘)。 +_NEEDS_QUOTE_CHARS = (' ', '"', "'", '$', '`', '\\', '<', '>', '|', '&', ';', + '(', ')', '#') + + +def _escape_double_quoted(value: str) -> str: + """``"..."`` 内で安全な escape を施す。 + + - ``\\`` → ``\\\\`` + - ``"`` → ``\\"`` + - ``\n`` → ``\\n`` (改行をリテラル化) + - ``$`` → ``\\$`` (``.env`` を ``source`` した際の変数展開を抑止) + """ + return (value.replace('\\', '\\\\') + .replace('"', '\\"') + .replace('\n', '\\n') + .replace('$', '\\$')) + + class EnvFile: """ .envファイルの読み書き・バックアップ・バリデーションを管理する。 @@ -63,19 +103,39 @@ def __init__(self, file_path: Union[str, Path]): def parse_bytes(data: bytes) -> Dict[str, str]: """bytes 列を load と同じ規則でパースして dict を返す (ファイル不要) - ``save`` / :func:`devbase.env.io_import._format_env_bytes` の inverse として - 振る舞うため、ダブルクオート内の ``\\\\`` / ``\\"`` / ``\\n`` を unescape する。 + ``save`` / :meth:`EnvFile.dump_bytes` の inverse として振る舞うため、 + ダブルクオート内の ``\\\\`` / ``\\"`` / ``\\n`` / ``\\$`` を unescape する。 formatter と round-trip 整合性が取れていないと、parse → format で 二重エスケープが発生する (PR #15 codex 指摘)。 """ result: Dict[str, str] = {} + for entry in EnvFile.parse_entries(data): + if entry.kind == 'kv' and entry.key is not None: + result[entry.key] = entry.value or '' + return result + + @staticmethod + def parse_entries(data: bytes) -> List[EnvEntry]: + """``.env`` の各行をトークン化して返す。 + + コメント (``#`` 始まり) と空行は ``EnvEntry(kind='comment'|'blank', raw=...)`` + として保持される。これにより merge 出力時に元のコメント/空白構造を残せる + (PR #15 gemini 指摘)。 + """ + entries: List[EnvEntry] = [] for raw_line in data.decode('utf-8').splitlines(): - line = raw_line.strip() - if not line or line.startswith('#'): + stripped = raw_line.strip() + if not stripped: + entries.append(EnvEntry(kind='blank', raw=raw_line)) continue - if '=' not in line: + if stripped.startswith('#'): + entries.append(EnvEntry(kind='comment', raw=raw_line)) continue - key, _, value = line.partition('=') + if '=' not in stripped: + # ``key=value`` 形式でない行は (滅多に無いが) 原文保持する + entries.append(EnvEntry(kind='comment', raw=raw_line)) + continue + key, _, value = stripped.partition('=') key = key.strip() value = value.strip() if len(value) >= 2 and value[0] == value[-1] and value[0] in ('"', "'"): @@ -83,8 +143,8 @@ def parse_bytes(data: bytes) -> Dict[str, str]: value = value[1:-1] if quote == '"': value = EnvFile._unescape_double_quoted(value) - result[key] = value - return result + entries.append(EnvEntry(kind='kv', raw=raw_line, key=key, value=value)) + return entries @staticmethod def _unescape_double_quoted(s: str) -> str: @@ -110,6 +170,9 @@ def _unescape_double_quoted(s: str) -> str: elif nxt == 'n': out.append('\n') i += 2 + elif nxt == '$': + out.append('$') + i += 2 else: out.append(c) i += 1 @@ -118,6 +181,42 @@ def _unescape_double_quoted(s: str) -> str: i += 1 return ''.join(out) + @staticmethod + def _format_kv_line(key: str, value: str) -> str: + """1 つの ``key=value`` を ``.env`` 行 (末尾 ``\\n`` 含む) にフォーマットする""" + needs_quote = ( + '\n' in value + or any(c in value for c in _NEEDS_QUOTE_CHARS) + ) + if needs_quote: + return f'{key}="{_escape_double_quoted(value)}"\n' + return f'{key}={value}\n' + + @staticmethod + def dump_bytes(data: Dict[str, str]) -> bytes: + """``save`` と同一フォーマットで dict をバイト列化する (ファイル不要)。 + + ``io_import`` 側でも merge 結果を bytes として持つ必要があるため、 + フォーマット規則を 1 箇所 (このメソッド) に集約する (PR #15 gemini 指摘)。 + """ + lines = [EnvFile._format_kv_line(k, data[k]) for k in sorted(data)] + return ''.join(lines).encode('utf-8') + + @staticmethod + def dump_entries_bytes(entries: List[EnvEntry]) -> bytes: + """``parse_entries`` で得た entries を ``.env`` バイト列に戻す。 + + ``kv`` エントリは現在の ``value`` を ``dump_bytes`` と同じ規則で再フォーマット + する。``comment`` / ``blank`` は ``raw`` をそのまま保持して出力する。 + """ + lines: List[str] = [] + for e in entries: + if e.kind == 'kv' and e.key is not None: + lines.append(EnvFile._format_kv_line(e.key, e.value or '')) + else: + lines.append(e.raw + '\n') + return ''.join(lines).encode('utf-8') + def load(self) -> Dict[str, str]: """ファイルを読み込みkey=valueをパースする""" if not self.file_path.exists(): @@ -130,15 +229,7 @@ def load(self) -> Dict[str, str]: def save(self) -> None: """現在のデータを.envファイルに保存する""" self.file_path.parent.mkdir(parents=True, exist_ok=True) - - with open(self.file_path, 'w', encoding='utf-8') as f: - for key, value in sorted(self._data.items()): - if '\n' in value or any(c in value for c in (' ', '"', "'", '$', '`', '\\', '<', '>', '|', '&', ';', '(', ')', '#')): - value = value.replace('\\', '\\\\').replace('"', '\\"').replace('\n', '\\n') - f.write(f'{key}="{value}"\n') - else: - f.write(f'{key}={value}\n') - + self.file_path.write_bytes(self.dump_bytes(self._data)) os.chmod(self.file_path, 0o600) def backup(self) -> Optional[Path]: diff --git a/tests/cli/test_env_import.py b/tests/cli/test_env_import.py index f18428b..aab2e32 100644 --- a/tests/cli/test_env_import.py +++ b/tests/cli/test_env_import.py @@ -628,3 +628,154 @@ def test_envfile_parse_bytes_round_trip_with_escapes(): raw2 = 'LITERAL="a\\\\nb"\n' parsed2 = EnvFile.parse_bytes(raw2.encode('utf-8')) assert parsed2['LITERAL'] == 'a\\nb' # backslash + 'n' + 'b' (3 chars) + + +def test_envfile_dollar_escape_round_trip(): + """``$`` を含む値は ``\\$`` にエスケープされ、``source`` 時に変数展開されない + (PR #15 gemini 指摘)""" + from devbase.env.store import EnvFile + + # dump → parse の round-trip で値が保たれる + data = { + 'DOLLAR': '$HOME', # 単純な変数展開を含む + 'PRICE': 'cost is $100', # 値内の $ + 'ESCAPED_LIKE': 'a\\$b', # backslash + $ の組み合わせ + 'PLAIN_NUM': '12345', # quote 不要なケース + } + dumped = EnvFile.dump_bytes(data) + text = dumped.decode('utf-8') + # $ が裸 (バックスラッシュ無し) で出力されていないこと + # ($ の直前は必ず \ である or 行の終端 / 別の \\) + for line in text.splitlines(): + if '=' in line and '"' in line: + # ダブルクオート内に裸の $ があるかチェック + _, _, val = line.partition('=') + # 値部分の $ をすべて検査: 直前の文字が \\ であること + for idx, ch in enumerate(val): + if ch == '$': + assert idx > 0 and val[idx - 1] == '\\', ( + f"unescaped $ in dump: {line!r}" + ) + parsed = EnvFile.parse_bytes(dumped) + assert parsed == data + + +def test_env_import_merge_preserves_comments_and_blanks( + fake_root, dest_root, age_keys, tmp_path): + """merge 経路で既存 ``.env`` のコメントと空行が保持されること (PR #15 gemini 指摘)""" + _, id_file = age_keys + pub_file, _ = age_keys + + # incoming bundle: AWS_CONFIG_BASE64=AAAA + GLOBAL=1 + src_root = tmp_path / "comment-src" + src_root.mkdir() + (src_root / ".env").write_text("AWS_CONFIG_BASE64=AAAA\nGLOBAL=1\n") + bundle_path = tmp_path / "comment.dbenv" + rc = export(src_root, ExportOptions( + dest=str(bundle_path), recipients=[f"@{pub_file}"])) + assert rc == 0 + + # 既存 dest .env にコメント・空行・既存キーを配置 + existing_text = ( + "# Top-level header comment\n" + "\n" + "# AWS section\n" + "AWS_CONFIG_BASE64=OLD\n" + "\n" + "# user-managed key\n" + "KEEP=this\n" + ) + (dest_root / ".env").write_text(existing_text) + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + merge='prefer-incoming')) + assert rc == 0 + + out = (dest_root / ".env").read_text() + # コメント・空行が保持されている + assert "# Top-level header comment" in out + assert "# AWS section" in out + assert "# user-managed key" in out + # 既存値は prefer-incoming で AAAA に書き換わる + assert "AWS_CONFIG_BASE64=AAAA" in out + # 既存にしか無かった KEEP は維持 + assert "KEEP=this" in out + # incoming にしか無かった GLOBAL は末尾に追加 + assert "GLOBAL=1" in out + # 空行も最低 1 つ残っている + assert "\n\n" in out + + +def test_env_import_keep_existing_preserves_comments( + fake_root, dest_root, age_keys, tmp_path): + """keep-existing 経路でもコメントが保持されること""" + _, id_file = age_keys + pub_file, _ = age_keys + + src_root = tmp_path / "keep-src" + src_root.mkdir() + (src_root / ".env").write_text("INCOMING_KEY=incoming\n") + bundle_path = tmp_path / "keep.dbenv" + rc = export(src_root, ExportOptions( + dest=str(bundle_path), recipients=[f"@{pub_file}"])) + assert rc == 0 + + (dest_root / ".env").write_text( + "# This comment must survive\nKEEP_ME=v\n" + ) + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)])) + assert rc == 0 + + out = (dest_root / ".env").read_text() + assert "# This comment must survive" in out + assert "KEEP_ME=v" in out + assert "INCOMING_KEY=incoming" in out + + +def test_env_import_dollar_value_is_escaped_after_merge( + fake_root, dest_root, age_keys, tmp_path): + """``$`` を含む値が merge 後の ``.env`` 上でエスケープされていること + (シェルで source した時の変数展開を防ぐ / PR #15 gemini 指摘)""" + _, id_file = age_keys + pub_file, _ = age_keys + + src_root = tmp_path / "dollar-src" + src_root.mkdir() + # 値に $ を含む。export 側 (EnvFile.save 形式) で書き出される + (src_root / ".env").write_text('PRICE="cost is \\$100"\n') + bundle_path = tmp_path / "dollar.dbenv" + rc = export(src_root, ExportOptions( + dest=str(bundle_path), recipients=[f"@{pub_file}"])) + assert rc == 0 + + # 既存 dest .env (merge 経路に入る) + (dest_root / ".env").write_text("EXISTING=keep\n") + os.chmod(dest_root / ".env", 0o600) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + merge='prefer-incoming')) + assert rc == 0 + + raw_text = (dest_root / ".env").read_text() + # 裸の $ が現れていないこと (\ の直後でなければ NG) + for line in raw_text.splitlines(): + if 'PRICE' not in line: + continue + # 値の中の $ は必ず \\ の直後 + _, _, val = line.partition('=') + for idx, ch in enumerate(val): + if ch == '$': + assert idx > 0 and val[idx - 1] == '\\', ( + f"unescaped $ in merged .env: {line!r}" + ) + + from devbase.env.store import EnvFile + parsed = EnvFile.parse_bytes((dest_root / ".env").read_bytes()) + assert parsed['PRICE'] == 'cost is $100' + assert parsed['EXISTING'] == 'keep' From 7ab664c1c17d4133daaa3332d94175644b42ea30 Mon Sep 17 00:00:00 2001 From: "takemi.ohama" Date: Thu, 21 May 2026 21:38:36 +0900 Subject: [PATCH 06/10] =?UTF-8?q?refactor(env):=20=5Fplan=5Fenv=5Fmerge=20?= =?UTF-8?q?=E3=81=AE=E9=87=8D=E8=A4=87=E3=82=92=20=5Fbuild=5Fmerge=5Fplan?= =?UTF-8?q?=20=E3=81=AB=E5=85=B1=E9=80=9A=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 各マージ戦略 (replace_keys / keep-existing / prefer-incoming) で個別に書かれていた new_bytes 生成と _Plan 構築を `_build_merge_plan` にまとめた。各分岐は merged / added / overwritten / skipped の計算だけを担当するように整理。 動作変更なし (PR #15 gemini round4 指摘 / 既存テスト 102 件 PASS)。 --- lib/devbase/env/io_import.py | 105 ++++++++++++++++++----------------- 1 file changed, 55 insertions(+), 50 deletions(-) diff --git a/lib/devbase/env/io_import.py b/lib/devbase/env/io_import.py index 9944dfe..f2b0b22 100644 --- a/lib/devbase/env/io_import.py +++ b/lib/devbase/env/io_import.py @@ -222,6 +222,37 @@ def _merge_into_existing_bytes(existing_bytes: bytes, return EnvFile.dump_entries_bytes(out_entries) +def _build_merge_plan( + target: Path, + arcname: str, + incoming_bytes: bytes, + existing_bytes: bytes, + existing: Dict[str, str], + merged: Dict[str, str], + added: List[str], + overwritten: List[str], + skipped: List[str], +) -> _Plan: + """merge 系 (replace_keys / keep-existing / prefer-incoming) 共通の _Plan 生成。 + + - 新規作成時は ``incoming_bytes`` をそのまま採用して二重エスケープを回避 + (PR #15 codex 指摘) + - 既存ファイルへの merge 時は ``_merge_into_existing_bytes`` でコメント / 空行 / + キー順を保持したまま値を差し替える (PR #15 gemini 指摘) + """ + new_bytes = (incoming_bytes if not existing + else _merge_into_existing_bytes(existing_bytes, merged)) + return _Plan( + target=target, + arcname=arcname, + new_bytes=new_bytes, + added_keys=sorted(added), + overwritten_keys=sorted(overwritten), + skipped_keys=sorted(skipped), + op='merge' if existing else 'create', + ) + + def _plan_env_merge(target: Path, incoming_bytes: bytes, opts: ImportOptions, arcname: str) -> _Plan: """1 つの .env に対する merge / replace 計画を作る @@ -234,6 +265,9 @@ def _plan_env_merge(target: Path, incoming_bytes: bytes, 既存ファイルが存在する merge 経路では ``_merge_into_existing_bytes`` で 既存のコメント / 空行 / キー順を保持したまま値だけ差し替える (PR #15 gemini 指摘)。 + + 各分岐で重複していた ``new_bytes`` 生成と ``_Plan`` 構築は + ``_build_merge_plan`` に括り出している (PR #15 gemini round4 指摘)。 """ incoming = EnvFile.parse_bytes(incoming_bytes) existing: Dict[str, str] = {} @@ -245,7 +279,7 @@ def _plan_env_merge(target: Path, incoming_bytes: bytes, if opts.replace: added = sorted(set(incoming) - set(existing)) overwritten = sorted(k for k in incoming if k in existing and incoming[k] != existing[k]) - # replace は バンドル側の値で完全に置き換える + # replace は バンドル側の値で完全に置き換える (merge 経路と別系統) return _Plan( target=target, arcname=arcname, @@ -256,11 +290,12 @@ def _plan_env_merge(target: Path, incoming_bytes: bytes, op='replace' if existing else 'create', ) + merged: Dict[str, str] = dict(existing) + added: List[str] = [] + overwritten: List[str] = [] + skipped: List[str] = [] + if opts.replace_keys: - merged = dict(existing) - added: List[str] = [] - overwritten: List[str] = [] - skipped: List[str] = [] replace_set = set(opts.replace_keys) for key, value in incoming.items(): if key in replace_set: @@ -281,45 +316,14 @@ def _plan_env_merge(target: Path, incoming_bytes: bytes, else: added.append(key) merged[key] = value - # 新規作成時は incoming_bytes をそのまま保持して二重エスケープを回避 - new_bytes = (incoming_bytes if not existing - else _merge_into_existing_bytes(existing_bytes, merged)) - return _Plan( - target=target, - arcname=arcname, - new_bytes=new_bytes, - added_keys=sorted(added), - overwritten_keys=sorted(overwritten), - skipped_keys=sorted(skipped), - op='merge' if existing else 'create', - ) - - if opts.merge == 'keep-existing': - merged = dict(existing) - added: List[str] = [] - skipped: List[str] = [] + elif opts.merge == 'keep-existing': for key, value in incoming.items(): if key in existing: skipped.append(key) else: merged[key] = value added.append(key) - new_bytes = (incoming_bytes if not existing - else _merge_into_existing_bytes(existing_bytes, merged)) - return _Plan( - target=target, - arcname=arcname, - new_bytes=new_bytes, - added_keys=sorted(added), - overwritten_keys=[], - skipped_keys=sorted(skipped), - op='merge' if existing else 'create', - ) - - if opts.merge == 'prefer-incoming': - merged = dict(existing) - added: List[str] = [] - overwritten: List[str] = [] + elif opts.merge == 'prefer-incoming': for key, value in incoming.items(): if key in existing: if existing[key] != value: @@ -328,19 +332,20 @@ def _plan_env_merge(target: Path, incoming_bytes: bytes, else: merged[key] = value added.append(key) - new_bytes = (incoming_bytes if not existing - else _merge_into_existing_bytes(existing_bytes, merged)) - return _Plan( - target=target, - arcname=arcname, - new_bytes=new_bytes, - added_keys=sorted(added), - overwritten_keys=sorted(overwritten), - skipped_keys=[], - op='merge' if existing else 'create', - ) + else: + raise ImportError(f"不明な --merge モード: {opts.merge!r}") - raise ImportError(f"不明な --merge モード: {opts.merge!r}") + return _build_merge_plan( + target=target, + arcname=arcname, + incoming_bytes=incoming_bytes, + existing_bytes=existing_bytes, + existing=existing, + merged=merged, + added=added, + overwritten=overwritten, + skipped=skipped, + ) def _plan_sources(target: Path, incoming_bytes: bytes, From 73e6697fd2c9f7e33b0a42c9b457684152147448 Mon Sep 17 00:00:00 2001 From: "takemi.ohama" Date: Thu, 21 May 2026 21:48:27 +0900 Subject: [PATCH 07/10] =?UTF-8?q?fix(env):=20=E3=82=B3=E3=83=A1=E3=83=B3?= =?UTF-8?q?=E3=83=88=E3=81=AE=E3=81=BF=E6=97=A2=E5=AD=98=20.env=20?= =?UTF-8?q?=E3=82=92=20merge=20=E7=B5=8C=E8=B7=AF=E3=81=AB=E9=80=9A?= =?UTF-8?q?=E3=81=99=20+=20env=20i=20=E7=9F=AD=E7=B8=AE=E7=B6=AD=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - _build_merge_plan / _plan_env_merge / replace ブランチの op 判定を existing (key=value dict) の有無から target.exists() に変更。 コメント / 空行のみで構成された既存 .env が「create」と誤判定されて incoming_bytes で上書きされ既存コメントが失われる問題を修正 (PR #15 round5 codex/gemini 指摘)。 - SUBCMD_PREFIX_PREFERENCES に i: init を追加。import 追加で devbase env i が init / import の両方にマッチして ambiguous に なっていたため、既存ショートカット (devbase env i → init) を維持する (PR #15 round5 codex 指摘)。 - 回帰テスト追加: - tests/cli/test_env_import.py: コメント/空行のみの既存 .env が prefer-incoming / keep-existing / replace-keys でコメント保持される こと。--replace ブランチでも op='replace' として報告され backup が 取られること - tests/cli/test_prefix_resolution.py: devbase env i → init, devbase env im → import, devbase env in → init Co-Authored-By: Claude Opus 4.7 (1M context) --- lib/devbase/cli.py | 3 + lib/devbase/env/io_import.py | 26 ++++--- tests/cli/test_env_import.py | 107 ++++++++++++++++++++++++++++ tests/cli/test_prefix_resolution.py | 21 ++++++ 4 files changed, 147 insertions(+), 10 deletions(-) diff --git a/lib/devbase/cli.py b/lib/devbase/cli.py index cc9fc28..d519db4 100644 --- a/lib/devbase/cli.py +++ b/lib/devbase/cli.py @@ -47,6 +47,9 @@ SUBCMD_PREFIX_PREFERENCES = { ('env',): { 'e': 'edit', + # `import` 追加で `i` が `init` / `import` の両方にマッチして ambiguous に + # なるため、既存ショートカット (`devbase env i` → `init`) を維持する。 + 'i': 'init', }, } diff --git a/lib/devbase/env/io_import.py b/lib/devbase/env/io_import.py index f2b0b22..972425c 100644 --- a/lib/devbase/env/io_import.py +++ b/lib/devbase/env/io_import.py @@ -227,7 +227,7 @@ def _build_merge_plan( arcname: str, incoming_bytes: bytes, existing_bytes: bytes, - existing: Dict[str, str], + target_exists: bool, merged: Dict[str, str], added: List[str], overwritten: List[str], @@ -235,13 +235,16 @@ def _build_merge_plan( ) -> _Plan: """merge 系 (replace_keys / keep-existing / prefer-incoming) 共通の _Plan 生成。 - - 新規作成時は ``incoming_bytes`` をそのまま採用して二重エスケープを回避 - (PR #15 codex 指摘) + - 新規作成時 (``target_exists`` が False) は ``incoming_bytes`` をそのまま採用して + 二重エスケープを回避 (PR #15 codex 指摘) - 既存ファイルへの merge 時は ``_merge_into_existing_bytes`` でコメント / 空行 / - キー順を保持したまま値を差し替える (PR #15 gemini 指摘) + キー順を保持したまま値を差し替える (PR #15 gemini 指摘)。 + ``existing`` (key=value dict) の空判定で create/merge を決めると、コメント / + 空行のみで構成された既存 .env が ``incoming_bytes`` で上書きされてコメントが + 失われるため、ファイル実体の有無で判定する (PR #15 round5 指摘)。 """ - new_bytes = (incoming_bytes if not existing - else _merge_into_existing_bytes(existing_bytes, merged)) + new_bytes = (_merge_into_existing_bytes(existing_bytes, merged) + if target_exists else incoming_bytes) return _Plan( target=target, arcname=arcname, @@ -249,7 +252,7 @@ def _build_merge_plan( added_keys=sorted(added), overwritten_keys=sorted(overwritten), skipped_keys=sorted(skipped), - op='merge' if existing else 'create', + op='merge' if target_exists else 'create', ) @@ -272,7 +275,8 @@ def _plan_env_merge(target: Path, incoming_bytes: bytes, incoming = EnvFile.parse_bytes(incoming_bytes) existing: Dict[str, str] = {} existing_bytes: bytes = b'' - if target.exists(): + target_exists = target.exists() + if target_exists: existing_bytes = target.read_bytes() existing = EnvFile.parse_bytes(existing_bytes) @@ -280,6 +284,8 @@ def _plan_env_merge(target: Path, incoming_bytes: bytes, added = sorted(set(incoming) - set(existing)) overwritten = sorted(k for k in incoming if k in existing and incoming[k] != existing[k]) # replace は バンドル側の値で完全に置き換える (merge 経路と別系統) + # op 判定は ``existing`` (key=value dict) ではなくファイル実体の有無で行う: + # コメントのみの既存 .env を 'create' と誤判定しないため (PR #15 round5 指摘)。 return _Plan( target=target, arcname=arcname, @@ -287,7 +293,7 @@ def _plan_env_merge(target: Path, incoming_bytes: bytes, added_keys=added, overwritten_keys=overwritten, skipped_keys=[], - op='replace' if existing else 'create', + op='replace' if target_exists else 'create', ) merged: Dict[str, str] = dict(existing) @@ -340,7 +346,7 @@ def _plan_env_merge(target: Path, incoming_bytes: bytes, arcname=arcname, incoming_bytes=incoming_bytes, existing_bytes=existing_bytes, - existing=existing, + target_exists=target_exists, merged=merged, added=added, overwritten=overwritten, diff --git a/tests/cli/test_env_import.py b/tests/cli/test_env_import.py index aab2e32..7c390e4 100644 --- a/tests/cli/test_env_import.py +++ b/tests/cli/test_env_import.py @@ -779,3 +779,110 @@ def test_env_import_dollar_value_is_escaped_after_merge( parsed = EnvFile.parse_bytes((dest_root / ".env").read_bytes()) assert parsed['PRICE'] == 'cost is $100' assert parsed['EXISTING'] == 'keep' + + +# --- PR #15 round5: コメント / 空行のみの既存 .env が create 扱いされて +# 上書きされないこと (`existing` dict が空でも target.exists() で merge に入る)。 + +def _setup_comment_only_dest(dest_root: Path) -> str: + """key=value を含まずコメント / 空行のみで構成された既存 .env を作る""" + text = ( + "# user-managed header (no kv yet)\n" + "\n" + "# section: aws\n" + "\n" + ) + (dest_root / ".env").write_text(text) + os.chmod(dest_root / ".env", 0o600) + return text + + +def _build_simple_bundle(tmp_path: Path, pub_file: Path, + name: str = "comment-only") -> Path: + src_root = tmp_path / f"{name}-src" + src_root.mkdir() + (src_root / ".env").write_text("INCOMING_KEY=incoming\n") + bundle_path = tmp_path / f"{name}.dbenv" + rc = export(src_root, ExportOptions( + dest=str(bundle_path), recipients=[f"@{pub_file}"])) + assert rc == 0 + return bundle_path + + +@pytest.mark.parametrize("merge_mode", ["prefer-incoming", "keep-existing"]) +def test_env_import_comment_only_existing_preserves_comments_on_merge( + fake_root, dest_root, age_keys, tmp_path, merge_mode): + """コメント / 空行のみの既存 .env が ``existing`` 辞書の空判定で create 扱いに + なって上書きされ、ヘッダコメントが消失しないこと (PR #15 round5 指摘)""" + _, id_file = age_keys + pub_file, _ = age_keys + bundle_path = _build_simple_bundle(tmp_path, pub_file, "comment-merge") + + _setup_comment_only_dest(dest_root) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + merge=merge_mode)) + assert rc == 0 + + out = (dest_root / ".env").read_text() + assert "# user-managed header (no kv yet)" in out + assert "# section: aws" in out + assert "INCOMING_KEY=incoming" in out + + +def test_env_import_comment_only_existing_preserves_comments_on_replace_keys( + fake_root, dest_root, age_keys, tmp_path): + """--replace-keys 経路でも、コメントのみの既存 .env が create 扱いされず + コメントが保持されること (PR #15 round5 指摘)""" + _, id_file = age_keys + pub_file, _ = age_keys + bundle_path = _build_simple_bundle(tmp_path, pub_file, "comment-rk") + + _setup_comment_only_dest(dest_root) + + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + replace_keys=["INCOMING_KEY"])) + assert rc == 0 + + out = (dest_root / ".env").read_text() + assert "# user-managed header (no kv yet)" in out + assert "# section: aws" in out + assert "INCOMING_KEY=incoming" in out + + +def test_env_import_comment_only_existing_replace_reports_op_replace( + fake_root, dest_root, age_keys, tmp_path, caplog): + """--replace 経路では incoming で完全上書きするが、その op は 'create' では + なく 'replace' として報告されること (PR #15 round5 指摘)。 + + --replace の意味論として既存内容は捨てるため、コメント保持は要件ではないが、 + ログ上 ``create`` と表示されるとロールバック挙動など他の経路 (= 新規作成は + backup を取らない) と判別できなくなるため、op 表記の正確性を確認する。 + """ + import logging + _, id_file = age_keys + pub_file, _ = age_keys + bundle_path = _build_simple_bundle(tmp_path, pub_file, "comment-replace") + + _setup_comment_only_dest(dest_root) + + with caplog.at_level(logging.INFO, logger="devbase.env.io_import"): + rc = import_bundle(dest_root, ImportOptions( + source=str(bundle_path), identities=[str(id_file)], + replace=True)) + assert rc == 0 + + # 'replace: ' のような行が出ているはず。少なくとも 'create:' 表記で + # 出力されていないことを確認 (op_replace の正しさ)。 + log_text = "\n".join(r.message for r in caplog.records) + assert "replace: " in log_text, log_text + # backup には元のコメントのみの .env が記録されている (存在判定が正しければ + # _backup_existing が target を見つけてコピーするため) + backup_root = dest_root / "backups" / "env-import" + assert backup_root.is_dir() + snapshots = [p for p in backup_root.iterdir() if p.is_dir()] + assert len(snapshots) >= 1 + backed = (snapshots[0] / ".env").read_text() + assert "# user-managed header (no kv yet)" in backed diff --git a/tests/cli/test_prefix_resolution.py b/tests/cli/test_prefix_resolution.py index 0a7574e..06f849a 100644 --- a/tests/cli/test_prefix_resolution.py +++ b/tests/cli/test_prefix_resolution.py @@ -48,3 +48,24 @@ def test_expand_argv_env_ex_resolves_to_export(monkeypatch): monkeypatch.setattr(sys, "argv", ["devbase", "env", "ex"]) cli._expand_argv() assert sys.argv == ["devbase", "env", "export"] + + +def test_expand_argv_env_i_resolves_to_init(monkeypatch): + """`devbase env i` は `import` 追加後も `init` に解決される (後方互換 / PR #15 round5)""" + monkeypatch.setattr(sys, "argv", ["devbase", "env", "i"]) + cli._expand_argv() + assert sys.argv == ["devbase", "env", "init"] + + +def test_expand_argv_env_im_resolves_to_import(monkeypatch): + """`devbase env im` は唯一の候補なので `import` に解決される""" + monkeypatch.setattr(sys, "argv", ["devbase", "env", "im"]) + cli._expand_argv() + assert sys.argv == ["devbase", "env", "import"] + + +def test_expand_argv_env_in_resolves_to_init(monkeypatch): + """`devbase env in` も唯一の候補 (`init`) に解決される""" + monkeypatch.setattr(sys, "argv", ["devbase", "env", "in"]) + cli._expand_argv() + assert sys.argv == ["devbase", "env", "init"] From 454425eec45e50e39879f4370d3e97e805910a3c Mon Sep 17 00:00:00 2001 From: "takemi.ohama" Date: Sat, 23 May 2026 19:33:29 +0900 Subject: [PATCH 08/10] =?UTF-8?q?fix(env):=20import=20=E3=81=AE=20sys=20?= =?UTF-8?q?=E3=82=92=E5=85=88=E9=A0=AD=20import=20=E3=81=AB=20/=20TTY=20?= =?UTF-8?q?=E6=99=82=20getpass.getpass=20=E3=81=A7=E3=82=A8=E3=82=B3?= =?UTF-8?q?=E3=83=BC=E6=8A=91=E6=AD=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit gemini round 6 で挙がった minor 指摘 2 件に対応: 1. lib/devbase/env/io_import.py:97 `import sys` がローカル import になっていたのを ファイル先頭の標準 import セクションに移動 (`import getpass` も同時追加)。 メンテナンス性向上、PEP 8 への準拠。 2. lib/devbase/env/io_import.py:100 TTY 入力時に `sys.stdin.readline()` を使って いたため、パスフレーズがそのまま画面にエコーバックされていた。 `getpass.getpass(prompt, stream=sys.stderr)` を使うように変更。 パイプ入力時 (非 TTY) は従来どおり `sys.stdin.readline()` で読み ( stdin リダイレクト経由のテストフックとして必要)、getpass の EOFError は `ImportError("stdin からパスフレーズを読み取れませんでした")` に変換する。 回帰テスト 3 件を追加 (tests/cli/test_env_import.py): - test_read_passphrase_uses_getpass_on_tty - test_read_passphrase_falls_back_to_stdin_on_pipe - test_read_passphrase_tty_eof_raises_import_error ローカル品質チェック: - uv run pytest tests/ -> 112 passed (107 + 3 + 既存 review round で +2) - uv run python -m compileall lib bin -> OK - uvx ruff check --select=E9,F63,F7,F82 lib -> All checks passed Co-Authored-By: Claude Opus 4.7 (1M context) --- lib/devbase/env/io_import.py | 8 ++++-- tests/cli/test_env_import.py | 53 ++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/lib/devbase/env/io_import.py b/lib/devbase/env/io_import.py index 972425c..5f025b1 100644 --- a/lib/devbase/env/io_import.py +++ b/lib/devbase/env/io_import.py @@ -14,9 +14,11 @@ from __future__ import annotations +import getpass import os import re import shutil +import sys from dataclasses import dataclass, field from datetime import datetime from pathlib import Path @@ -94,9 +96,11 @@ def _read_passphrase(opts: ImportOptions) -> Optional[str]: raise ImportError(f"環境変数 {opts.passphrase_env} が空または未設定です") return value if opts.passphrase_stdin: - import sys if sys.stdin.isatty(): - print("passphrase: ", end='', file=sys.stderr, flush=True) + try: + return getpass.getpass("passphrase: ", stream=sys.stderr) + except EOFError as e: + raise ImportError("stdin からパスフレーズを読み取れませんでした") from e line = sys.stdin.readline() if not line: raise ImportError("stdin からパスフレーズを読み取れませんでした") diff --git a/tests/cli/test_env_import.py b/tests/cli/test_env_import.py index 7c390e4..f509895 100644 --- a/tests/cli/test_env_import.py +++ b/tests/cli/test_env_import.py @@ -15,6 +15,7 @@ from devbase.env.io_import import ( ImportError as ImportBundleError, ImportOptions, + _read_passphrase, import_bundle, ) @@ -205,6 +206,58 @@ def test_import_rejects_both_passphrase_env_and_stdin(dest_root): source='/dev/null', passphrase_env='X', passphrase_stdin=True)) +def test_read_passphrase_uses_getpass_on_tty(monkeypatch): + """tty 入力時は getpass.getpass を使い stdin.readline は呼ばない (エコー抑止)""" + fake_stdin = io.StringIO("should-not-be-read\n") + monkeypatch.setattr(fake_stdin, "isatty", lambda: True, raising=False) + monkeypatch.setattr("sys.stdin", fake_stdin) + + calls = {} + + def fake_getpass(prompt='', stream=None): + calls['prompt'] = prompt + calls['stream'] = stream + return "hunter2" + + monkeypatch.setattr("devbase.env.io_import.getpass.getpass", fake_getpass) + + pw = _read_passphrase(ImportOptions(source='/dev/null', passphrase_stdin=True)) + assert pw == "hunter2" + assert calls['prompt'] == "passphrase: " + assert fake_stdin.read() == "should-not-be-read\n" # stdin は消費されていない + + +def test_read_passphrase_falls_back_to_stdin_on_pipe(monkeypatch, capsys): + """パイプ (非 tty) 入力時は getpass を使わず stdin.readline で読む""" + fake_stdin = io.StringIO("piped-pass\n") + monkeypatch.setattr(fake_stdin, "isatty", lambda: False, raising=False) + monkeypatch.setattr("sys.stdin", fake_stdin) + + def fail_getpass(*args, **kwargs): + raise AssertionError("getpass.getpass should not be called for piped stdin") + + monkeypatch.setattr("devbase.env.io_import.getpass.getpass", fail_getpass) + + pw = _read_passphrase(ImportOptions(source='/dev/null', passphrase_stdin=True)) + assert pw == "piped-pass" + assert "passphrase" not in capsys.readouterr().err + + +def test_read_passphrase_tty_eof_raises_import_error(monkeypatch): + """tty で getpass が EOFError を投げた場合は ImportError に変換される""" + fake_stdin = io.StringIO("") + monkeypatch.setattr(fake_stdin, "isatty", lambda: True, raising=False) + monkeypatch.setattr("sys.stdin", fake_stdin) + + def raise_eof(*args, **kwargs): + raise EOFError() + + monkeypatch.setattr("devbase.env.io_import.getpass.getpass", raise_eof) + + with pytest.raises(ImportBundleError, match="パスフレーズを読み取れません"): + _read_passphrase(ImportOptions(source='/dev/null', passphrase_stdin=True)) + + def test_import_rejects_unknown_manifest_version(fake_root, dest_root, age_keys, tmp_path): """manifest.version が SUPPORTED_MANIFEST_VERSION より大きいバンドルは拒否される""" import gzip From dfd0964d2b20796a2bcb1b2d52102e914ba88cb6 Mon Sep 17 00:00:00 2001 From: "takemi.ohama" Date: Sat, 23 May 2026 19:39:05 +0900 Subject: [PATCH 09/10] =?UTF-8?q?fix(env):=20export=20=E5=81=B4=E3=81=AB?= =?UTF-8?q?=E3=82=82=20TTY=20=E3=82=A8=E3=82=B3=E3=83=BC=E6=8A=91=E6=AD=A2?= =?UTF-8?q?=E3=81=A8=E5=85=88=E9=A0=AD=20import=20=E3=82=92=E9=81=A9?= =?UTF-8?q?=E7=94=A8=20(import=20=E3=81=A8=E5=AF=BE=E7=A7=B0=E5=8C=96)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #15 round 7 で io_import.py に入れた修正 (commit 454425e) を、対称な io_export.py 側にも適用する。両者は _read_passphrase の構造が同じで、 TTY エコー抑止と import 位置の問題も同じパターンで残っていた。 - lib/devbase/env/io_export.py: - ローカル `import sys` をファイル先頭の標準 import セクションに移動 - `import getpass` を追加 - TTY 入力時に `getpass.getpass("passphrase: ", stream=sys.stderr)` を 使うように変更。EOFError は ExportError("stdin からパスフレーズを 読み取れませんでした") に変換 - パイプ入力時 (非 TTY) は従来どおり `sys.stdin.readline()` を使用 - tests/cli/test_env_export.py: - 旧テスト test_read_passphrase_shows_prompt_on_tty / test_read_passphrase_no_prompt_on_pipe は `print` 経由の prompt 表示を 検証する構造だったため、getpass.getpass をモックする方式に書き換え - 追加: test_read_passphrase_uses_getpass_on_tty (TTY 時に getpass.getpass が呼ばれ stdin は消費されないこと) - 追加: test_read_passphrase_falls_back_to_stdin_on_pipe (非 TTY では getpass は呼ばれず stdin.readline 経路に入ること) - 追加: test_read_passphrase_tty_eof_raises_export_error (EOFError → ExportError 変換) ローカル品質チェック: - uv run pytest tests/ -> 113 passed - uvx ruff check --select=E9,F63,F7,F82 lib -> All checks passed Co-Authored-By: Claude Opus 4.7 (1M context) --- lib/devbase/env/io_export.py | 12 ++++++---- tests/cli/test_env_export.py | 43 ++++++++++++++++++++++++++++++------ 2 files changed, 44 insertions(+), 11 deletions(-) diff --git a/lib/devbase/env/io_export.py b/lib/devbase/env/io_export.py index 2da2c2c..101ceb2 100644 --- a/lib/devbase/env/io_export.py +++ b/lib/devbase/env/io_export.py @@ -2,8 +2,10 @@ from __future__ import annotations +import getpass import os import re +import sys from dataclasses import dataclass, field from datetime import datetime from pathlib import Path @@ -69,11 +71,13 @@ def _read_passphrase(opts: ExportOptions) -> Optional[str]: ) return value if opts.passphrase_stdin: - import sys - # tty で対話実行している場合、ユーザーが「ハングしている」と誤解しないよう - # stderr へプロンプトを出してから stdin を待つ (パイプ入力時は出さない)。 + # tty で対話実行している場合は getpass.getpass でエコー抑止 + # (パイプ入力時は echo の概念がないので従来どおり stdin.readline で読む)。 if sys.stdin.isatty(): - print("passphrase: ", end='', file=sys.stderr, flush=True) + try: + return getpass.getpass("passphrase: ", stream=sys.stderr) + except EOFError as e: + raise ExportError("stdin からパスフレーズを読み取れませんでした") from e line = sys.stdin.readline() if not line: raise ExportError("stdin からパスフレーズを読み取れませんでした") diff --git a/tests/cli/test_env_export.py b/tests/cli/test_env_export.py index 18b89fe..dd44b47 100644 --- a/tests/cli/test_env_export.py +++ b/tests/cli/test_env_export.py @@ -93,29 +93,58 @@ def test_export_rejects_both_passphrase_env_and_stdin(fake_root): dest="/dev/null", passphrase_env="X", passphrase_stdin=True)) -def test_read_passphrase_shows_prompt_on_tty(monkeypatch, capsys): - """tty 入力時は stderr に 'passphrase: ' プロンプトを表示する""" - fake_stdin = io.StringIO("hunter2\n") +def test_read_passphrase_uses_getpass_on_tty(monkeypatch): + """tty 入力時は getpass.getpass を使い stdin.readline は呼ばない (エコー抑止)""" + fake_stdin = io.StringIO("should-not-be-read\n") monkeypatch.setattr(fake_stdin, "isatty", lambda: True, raising=False) monkeypatch.setattr("sys.stdin", fake_stdin) + calls = {} + + def fake_getpass(prompt='', stream=None): + calls['prompt'] = prompt + calls['stream'] = stream + return "hunter2" + + monkeypatch.setattr("devbase.env.io_export.getpass.getpass", fake_getpass) + pw = _read_passphrase(ExportOptions(passphrase_stdin=True)) assert pw == "hunter2" - err = capsys.readouterr().err - assert "passphrase: " in err + assert calls['prompt'] == "passphrase: " + assert fake_stdin.read() == "should-not-be-read\n" # stdin は消費されていない -def test_read_passphrase_no_prompt_on_pipe(monkeypatch, capsys): - """パイプ (非 tty) 入力時はプロンプトを出さない""" +def test_read_passphrase_falls_back_to_stdin_on_pipe(monkeypatch, capsys): + """パイプ (非 tty) 入力時は getpass を使わず stdin.readline で読む""" fake_stdin = io.StringIO("hunter2\n") monkeypatch.setattr(fake_stdin, "isatty", lambda: False, raising=False) monkeypatch.setattr("sys.stdin", fake_stdin) + def fail_getpass(*args, **kwargs): + raise AssertionError("getpass.getpass should not be called for piped stdin") + + monkeypatch.setattr("devbase.env.io_export.getpass.getpass", fail_getpass) + pw = _read_passphrase(ExportOptions(passphrase_stdin=True)) assert pw == "hunter2" assert "passphrase" not in capsys.readouterr().err +def test_read_passphrase_tty_eof_raises_export_error(monkeypatch): + """tty で getpass が EOFError を投げた場合は ExportError に変換される""" + fake_stdin = io.StringIO("") + monkeypatch.setattr(fake_stdin, "isatty", lambda: True, raising=False) + monkeypatch.setattr("sys.stdin", fake_stdin) + + def raise_eof(*args, **kwargs): + raise EOFError() + + monkeypatch.setattr("devbase.env.io_export.getpass.getpass", raise_eof) + + with pytest.raises(ExportError, match="パスフレーズを読み取れません"): + _read_passphrase(ExportOptions(passphrase_stdin=True)) + + def test_export_with_passphrase_env(fake_root, tmp_path, monkeypatch): dest = tmp_path / "out.dbenv" monkeypatch.setenv("DEVBASE_TEST_PASS", "s3cr3t") From 46ebdae22f85db0ff60b64d5433c350793661469 Mon Sep 17 00:00:00 2001 From: "takemi.ohama" Date: Sat, 23 May 2026 19:43:56 +0900 Subject: [PATCH 10/10] =?UTF-8?q?refactor(env):=20=5Funescape=5Fdouble=5Fq?= =?UTF-8?q?uoted=20=E3=82=92=20re.sub=20+=20=E9=80=86=E5=BC=95=E3=81=8D?= =?UTF-8?q?=E8=BE=9E=E6=9B=B8=E3=81=AB=E6=9B=B8=E3=81=8D=E6=8F=9B=E3=81=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #15 ユーザーレビュー (lib/devbase/env/store.py:160) で挙がった 「pythonic に書き直してください」への対応。 before: バックスラッシュ + 次文字を 1 文字ずつ走査する hand-rolled state machine (25 行)。 after: re.compile(r'\\.') で \ を一括捕捉し、逆引き辞書 {'\\\\': '\\', '\\"': '"', '\\n': '\n', '\\$': '$'} で 1 行置換する re.sub 呼び出し 1 つに集約 (5 行)。 挙動は state machine と完全に同等: - 未知エスケープ (例: \x) は dict.get の default にマッチ文字列 自身を返すことでバックスラッシュごと保持 - 末尾の単独 \ は \\. のドットが 2 文字目を要求するため自然に マッチせず、そのまま保持 - \\\\n (リテラル \\ + n) と \\n (改行) も re.sub は左から非重複で マッチするため state machine と同じ結果 差分: 43 → 15 行 (net -28 行 削除 / +15 行 追加 = -13 行)。 既存テスト (test_envfile_parse_bytes_round_trip_with_escapes 等) で double-quote escape の round-trip は完全に検証済みのため新規テストは追加せず。 ローカル品質チェック: - uv run pytest tests/ -> 113 passed - uvx ruff check --select=E9,F63,F7,F82 lib -> All checks passed - uv run python -m compileall lib bin -> OK Co-Authored-By: Claude Opus 4.7 (1M context) --- lib/devbase/env/store.py | 43 ++++++++++++++-------------------------- 1 file changed, 15 insertions(+), 28 deletions(-) diff --git a/lib/devbase/env/store.py b/lib/devbase/env/store.py index 8e61fa8..13eb6b3 100644 --- a/lib/devbase/env/store.py +++ b/lib/devbase/env/store.py @@ -3,6 +3,7 @@ from __future__ import annotations import os +import re import shutil from dataclasses import dataclass from pathlib import Path @@ -89,6 +90,15 @@ def _escape_double_quoted(value: str) -> str: .replace('$', '\\$')) +# ``_unescape_double_quoted`` 用の逆引きテーブル。``re.sub(r'\\.', ...)`` で +# マッチした 2 文字を 1 文字に置換する。未知エスケープ (例: ``\x``) は +# バックスラッシュごとそのまま残す必要があるため、``dict.get`` の default に +# マッチ文字列自身を返す。末尾単独 ``\`` は ``\\.`` のドットが 2 文字目を +# 要求するため自然にマッチせず、そのまま保持される。 +_DOUBLE_QUOTE_UNESCAPES = {'\\\\': '\\', '\\"': '"', '\\n': '\n', '\\$': '$'} +_DOUBLE_QUOTE_UNESCAPE_RE = re.compile(r'\\.') + + class EnvFile: """ .envファイルの読み書き・バックアップ・バリデーションを管理する。 @@ -151,35 +161,12 @@ def _unescape_double_quoted(s: str) -> str: """``save`` が double-quote 値に対して施した escape を 1 パスで戻す。 単純な逐次 ``replace`` は ``"\\\\n"`` (リテラル ``\\\\`` + ``n``) と - ``"\\n"`` (改行) の区別が付かないため state machine で処理する。 - 未知のエスケープ文字 (``\\x`` 等) はバックスラッシュごとそのまま保持する。 + ``"\\n"`` (改行) の区別が付かないため、``\\`` を一括で捉える + ``re.sub`` + 逆引き辞書で処理する。 """ - out: list = [] - i = 0 - n = len(s) - while i < n: - c = s[i] - if c == '\\' and i + 1 < n: - nxt = s[i + 1] - if nxt == '\\': - out.append('\\') - i += 2 - elif nxt == '"': - out.append('"') - i += 2 - elif nxt == 'n': - out.append('\n') - i += 2 - elif nxt == '$': - out.append('$') - i += 2 - else: - out.append(c) - i += 1 - else: - out.append(c) - i += 1 - return ''.join(out) + return _DOUBLE_QUOTE_UNESCAPE_RE.sub( + lambda m: _DOUBLE_QUOTE_UNESCAPES.get(m.group(0), m.group(0)), s + ) @staticmethod def _format_kv_line(key: str, value: str) -> str: