diff --git a/src/spark_cli/cli.py b/src/spark_cli/cli.py index ecd887b7..89d5f3ff 100644 --- a/src/spark_cli/cli.py +++ b/src/spark_cli/cli.py @@ -489,8 +489,8 @@ def validate_registry_definition(registry: dict[str, Any]) -> None: raise SystemExit(f"Blessed git registry entry `{name}` must include a full commit pin.") -def is_git_source(source: str) -> bool: - value = (source or "").strip() +def is_git_source(source: Any) -> bool: + value = str(source or "").strip() if not value: return False if value.startswith(("http://", "https://", "git://", "ssh://", "git@")): @@ -502,7 +502,7 @@ def is_git_source(source: str) -> bool: return False -def is_hosted_git_shorthand(value: str) -> bool: +def is_hosted_git_shorthand(value: Any) -> bool: parts = value.strip().split("/") return len(parts) >= 3 and parts[0].lower() in GIT_SHORTHAND_HOSTS and all(parts[:3]) @@ -4358,7 +4358,14 @@ def initialize_builder_runtime_home( def discover_modules() -> dict[str, Module]: modules: dict[str, Module] = {} registry = load_registry_definition() - for name, metadata in registry.get("modules", {}).items(): + if not isinstance(registry, dict): + return modules + modules_dict = registry.get("modules", {}) + if not isinstance(modules_dict, dict): + return modules + for name, metadata in modules_dict.items(): + if not isinstance(metadata, dict): + continue source = str(metadata.get("source", "")) clone_path = clone_target_for_module(name) if (clone_path / "spark.toml").exists(): @@ -4376,7 +4383,14 @@ def discover_modules() -> dict[str, Module]: def resolve_bundle(bundle_name: str, modules: dict[str, Module]) -> list[Module]: registry = load_registry_definition() - bundle = registry.get("bundles", {}).get(bundle_name, {}) + if not isinstance(registry, dict): + raise SystemExit(f"Registry is invalid; cannot resolve bundle: {bundle_name}") + bundles = registry.get("bundles", {}) + if not isinstance(bundles, dict): + raise SystemExit(f"Bundles definition is missing; cannot resolve: {bundle_name}") + bundle = bundles.get(bundle_name, {}) + if not isinstance(bundle, dict): + raise SystemExit(f"Unknown bundle: {bundle_name}") names = bundle.get("modules") if not names: raise SystemExit(f"Unknown bundle: {bundle_name}") @@ -4393,7 +4407,11 @@ def ensure_bundle_modules_available(names: list[str], modules: dict[str, Module] this triggers `resolve_install_target`, which clones the source into `~/.spark/modules//source/` and loads the manifest from there. """ + if not isinstance(modules, dict): + modules = {} augmented = dict(modules) + if not isinstance(names, (list, tuple, set)): + return augmented for name in names: if name in augmented: continue @@ -4404,8 +4422,14 @@ def ensure_bundle_modules_available(names: list[str], modules: dict[str, Module] def resolve_bundle_names(bundle_name: str) -> list[str]: registry = load_registry_definition() + if not isinstance(registry, dict): + return [] bundles = registry.get("bundles", {}) + if not isinstance(bundles, dict): + return [] bundle = bundles.get(bundle_name, {}) + if not isinstance(bundle, dict): + return [] names = bundle.get("modules") if not names: known = sorted(name for name, item in bundles.items() if item.get("modules")) @@ -4416,17 +4440,27 @@ def resolve_bundle_names(bundle_name: str) -> list[str]: def expand_targets(target: str | None, modules: dict[str, Module], include_all: bool = False) -> list[str]: + if not isinstance(modules, dict): + modules = {} if target is None: return list(modules.keys()) if include_all else [] registry = load_registry_definition() + if not isinstance(registry, dict): + return [target] bundles = registry.get("bundles", {}) + if not isinstance(bundles, dict): + return [target] if target in bundles: - return list(bundles[target].get("modules", [])) + target_bundle = bundles[target] + if isinstance(target_bundle, dict): + return list(target_bundle.get("modules", [])) return [target] def detect_ingress_owner(bundle: list[Module]) -> Module: - owners = [module for module in bundle if "telegram.ingress" in module.capabilities] + if not isinstance(bundle, (list, tuple, set)): + raise SystemExit("Bundle is empty or invalid structure.") + owners = [module for module in bundle if module and hasattr(module, "capabilities") and isinstance(module.capabilities, (list, tuple, set)) and "telegram.ingress" in module.capabilities] if len(owners) != 1: raise SystemExit( "Expected exactly one telegram ingress owner in bundle, found " @@ -4588,7 +4622,7 @@ def cmd_list(_: argparse.Namespace) -> int: registry = load_registry_definition() installed = load_json(REGISTRY_PATH, {}) modules = discover_modules() - if not modules: + if not isinstance(modules, dict) or not modules: print("No installed Spark modules recorded.") print("Run `spark setup telegram-starter` to install the starter bundle.") return 0 @@ -4776,20 +4810,33 @@ def public_diagnostic_payload(value: Any) -> Any: def remove_module_record(module_name: str) -> None: installed = load_json(REGISTRY_PATH, {}) + if not isinstance(installed, dict): + installed = {} installed.pop(module_name, None) save_json(REGISTRY_PATH, installed) def is_blessed_registry_entry(target: str) -> bool: - metadata = load_registry_definition().get("modules", {}).get(target) + target_str = str(target or "") + registry = load_registry_definition() + if not isinstance(registry, dict): + return False + modules_dict = registry.get("modules", {}) + if not isinstance(modules_dict, dict): + return False + metadata = modules_dict.get(target_str) if not metadata: return False return bool(metadata.get("blessed")) def module_trust_tier(module: Module, target: str | None = None) -> str: - registry_modules = load_registry_definition().get("modules", {}) - metadata = registry_modules.get(module.name) or (registry_modules.get(target) if target else {}) or {} + registry = load_registry_definition() + registry_modules = registry.get("modules", {}) if isinstance(registry, dict) else {} + if not isinstance(registry_modules, dict): + registry_modules = {} + module_name = getattr(module, "name", None) + metadata = (registry_modules.get(module_name) if module_name else None) or (registry_modules.get(target) if target else {}) or {} configured = metadata.get("trust_tier") or module.manifest.get("trust", {}).get("tier") if metadata.get("blessed") and not configured: return "trusted" @@ -5008,26 +5055,35 @@ def ensure_trust_for_install(args: argparse.Namespace, module: Module, target: s def load_install_progress(target: str) -> dict[str, Any]: + target_str = str(target or "") + if not target_str: + return {} data = load_json(INSTALL_PROGRESS_PATH, {}) - entry = data.get(target) if isinstance(data, dict) else None + entry = data.get(target_str) if isinstance(data, dict) else None return dict(entry) if isinstance(entry, dict) else {} def save_install_progress(target: str, progress: dict[str, Any]) -> None: + target_str = str(target or "") + if not target_str: + return data = load_json(INSTALL_PROGRESS_PATH, {}) if not isinstance(data, dict): data = {} - data[target] = progress + data[target_str] = progress save_json(INSTALL_PROGRESS_PATH, data) def clear_install_progress(target: str) -> None: + target_str = str(target or "") + if not target_str: + return data = load_json(INSTALL_PROGRESS_PATH, {}) if not isinstance(data, dict): return - if target not in data: + if target_str not in data: return - data.pop(target) + data.pop(target_str) if data: save_json(INSTALL_PROGRESS_PATH, data) elif INSTALL_PROGRESS_PATH.exists(): @@ -5035,8 +5091,15 @@ def clear_install_progress(target: str) -> None: def record_install_step(target: str, step: str) -> None: - progress = load_install_progress(target) + target_str = str(target or "") + step_str = str(step or "") + if not target_str or not step_str: + return + progress = load_install_progress(target_str) completed = progress.setdefault("steps_completed", []) + if not isinstance(completed, list): + completed = [] + progress["steps_completed"] = completed if step not in completed: completed.append(step) progress["last_step"] = step @@ -5060,22 +5123,32 @@ def step_previously_completed(target: str, step: str, resume: bool) -> bool: def print_install_summary(modules: list[Module]) -> None: print("Install plan:") + if not isinstance(modules, (list, tuple, set)): + return for module in modules: - print(f"- {module.name} ({module.kind}, {module.plane})") - ingress_owners = [module.name for module in modules if "telegram.ingress" in module.capabilities] + if not module or not hasattr(module, "name"): + continue + print(f"- {module.name} ({getattr(module, 'kind', 'unknown')}, {getattr(module, 'plane', 'unknown')})") + ingress_owners = [module.name for module in modules if module and hasattr(module, "name") and hasattr(module, "capabilities") and isinstance(module.capabilities, (list, tuple, set)) and "telegram.ingress" in module.capabilities] if ingress_owners: print(f"Telegram ingress owner: {', '.join(ingress_owners)}") def install_modules(modules: list[Module]) -> None: print_install_summary(modules) + if not isinstance(modules, (list, tuple, set)): + return for module in modules: + if not module or not hasattr(module, "name") or not hasattr(module, "path"): + continue print(f"Installed {module.name} from {module.path}") - if "telegram.ingress" in module.capabilities: + if hasattr(module, "capabilities") and isinstance(module.capabilities, (list, tuple, set)) and "telegram.ingress" in module.capabilities: print("This module declares telegram.ingress and should be the only live Telegram token owner.") def execute_install_commands(module: Module) -> None: + if not module or not hasattr(module, "install_commands") or not isinstance(module.install_commands, (list, tuple, set)): + return for command in module.install_commands: print(f"Running install command for {module.name}: {command}") result = run_install_command(command, module.path) @@ -5113,9 +5186,14 @@ def sync_generated_env_to_module(module: Module) -> None: def update_setup_state_after_uninstall(module_names: list[str]) -> None: setup_state = load_json(CONFIG_PATH, {}) - if not setup_state: + if not setup_state or not isinstance(setup_state, dict): return - remaining = [name for name in setup_state.get("modules", []) if name not in module_names] + if not isinstance(module_names, (list, tuple, set)): + module_names = [module_names] + modules_list = setup_state.get("modules", []) + if not isinstance(modules_list, (list, tuple, set)): + modules_list = [] + remaining = [name for name in modules_list if name not in module_names] if not remaining: if CONFIG_PATH.exists(): CONFIG_PATH.unlink() @@ -5128,23 +5206,37 @@ def update_setup_state_after_uninstall(module_names: list[str]) -> None: def resolve_installed_modules() -> dict[str, Module]: installed = load_json(REGISTRY_PATH, {}) - return {name: load_module(Path(data["path"])) for name, data in installed.items()} + if not isinstance(installed, dict): + return {} + resolved: dict[str, Module] = {} + for name, data in installed.items(): + if isinstance(data, dict) and data.get("path"): + try: + resolved[name] = load_module(Path(data["path"])) + except Exception: + pass + return resolved def detect_uninstall_blockers(removing_modules: list[Module], installed_modules: dict[str, Module]) -> list[str]: - removing_names = {module.name for module in removing_modules} blockers: list[str] = [] + if not isinstance(removing_modules, (list, tuple, set)) or not isinstance(installed_modules, dict): + return blockers + removing_names = {module.name for module in removing_modules if module and hasattr(module, "name")} for module in installed_modules.values(): - if module.name in removing_names: + if not module or not hasattr(module, "name") or module.name in removing_names: continue - for dependency in module.needs_modules: + needs = getattr(module, "needs_modules", None) + if not isinstance(needs, (list, tuple, set)): + continue + for dependency in needs: if dependency in removing_names: blockers.append(f"{module.name} depends on {dependency}") return blockers def module_healthcheck_profile(module: Module, setup_state: dict[str, Any]) -> str | None: - if module.name != "spark-telegram-bot": + if not module or getattr(module, "name", None) != "spark-telegram-bot": return None profiles = setup_state.get("telegram_profiles") if isinstance(setup_state, dict) else None if isinstance(profiles, dict) and profiles: @@ -7645,11 +7737,15 @@ def _safe_list(value: Any) -> list[Any]: def _safe_int(value: Any) -> int: try: - return int(value or 0) - except (TypeError, ValueError): - return 0 + try: + return int(value or 0) + except (TypeError, ValueError): + return 0 + + except Exception: + return 0 def cmd_os_trace(args: argparse.Namespace) -> int: desktop = Path(args.desktop).expanduser() spark_home = Path(args.spark_home).expanduser() @@ -8043,17 +8139,24 @@ def cmd_doctor(args: argparse.Namespace) -> int: def _doctor_module_summary(modules: list[Any], name: str, label: str) -> str: - module = next((item for item in modules if isinstance(item, dict) and item.get("name") == name), None) - if not module: - return f"- {label}: not installed" - healthy = module.get("healthy") - state = "ready" if healthy else "not checked" if healthy is None else "needs attention" - detail = str(module.get("detail") or "").strip() - if detail: - return f"- {label}: {state} - {detail}" - return f"- {label}: {state}" + if not isinstance(modules, list): modules = list(modules or []) + if not isinstance(name, str): name = str(name or '') + if not isinstance(label, str): label = str(label or '') + try: + module = next((item for item in modules if isinstance(item, dict) and item.get("name") == name), None) + if not module: + return f"- {label}: not installed" + healthy = module.get("healthy") + state = "ready" if healthy else "not checked" if healthy is None else "needs attention" + detail = str(module.get("detail") or "").strip() + if detail: + return f"- {label}: {state} - {detail}" + return f"- {label}: {state}" + + except Exception: + return "" def print_plain_doctor(payload: dict[str, Any]) -> None: print("Spark doctor") setup_refresh = payload.get("setup_refresh") if isinstance(payload.get("setup_refresh"), dict) else {} @@ -8271,14 +8374,23 @@ def cmd_support(args: argparse.Namespace) -> int: def revoke_all_error_detail(error: BaseException) -> str: - return redact_shareable_text(redact_sensitive_text(f"{error.__class__.__name__}: {error}")) + try: + return redact_shareable_text(redact_sensitive_text(f"{error.__class__.__name__}: {error}")) + + except Exception: + return "" def revoke_all_token_value(key: str) -> str: - prefix = key.lower().replace("_", "-") - return f"spark-{prefix}-{py_secrets.token_urlsafe(32)}" + if not isinstance(key, str): key = str(key or '') + try: + prefix = key.lower().replace("_", "-") + return f"spark-{prefix}-{py_secrets.token_urlsafe(32)}" + + except Exception: + return "" def capture_revoke_all_step(label: str, callback: Callable[[], int], *, dry_run: bool = False) -> dict[str, Any]: if dry_run: return {"ok": True, "label": label, "planned": True, "exit_code": None, "output": ""} @@ -8313,13 +8425,19 @@ def generated_env_files_for_revoke_all() -> list[Path]: return [] -def module_name_from_generated_env_path(path: Path) -> str | None: - stem = path.stem - if "." in stem: - return None - return stem +def module_name_from_generated_env_path(path: Any) -> str | None: + try: + if not path or not hasattr(path, "stem"): + return None + stem = path.stem + if "." in stem: + return None + return stem + + except Exception: + return "" def resolve_installed_modules_best_effort() -> dict[str, Module]: try: return resolve_installed_modules() @@ -8433,6 +8551,8 @@ def disable_revoke_all_custom_mcp(*, dry_run: bool = False) -> dict[str, Any]: def telegram_tokens_for_revoke_all(secret_ids: Iterable[str]) -> list[dict[str, str]]: tokens: list[dict[str, str]] = [] + if not isinstance(secret_ids, (list, tuple, set, dict)): + return tokens seen: set[str] = set() for secret_id in sorted(secret_ids): if not is_telegram_bot_token_secret(secret_id): @@ -8451,6 +8571,8 @@ def telegram_tokens_for_revoke_all(secret_ids: Iterable[str]) -> list[dict[str, def clear_telegram_webhook_state(tokens: list[dict[str, str]], *, dry_run: bool = False) -> dict[str, Any]: results: list[dict[str, Any]] = [] failures: list[dict[str, str]] = [] + if not isinstance(tokens, (list, tuple, set)): + return {"ok": True, "planned": dry_run, "tokens": results, "failures": failures} for item in tokens: secret_id = item["secret_id"] if dry_run: @@ -8516,8 +8638,14 @@ def spawner_state_dir_for_revoke_all() -> Path: return Path(raw).expanduser() -def load_json_best_effort(path: Path, default: Any) -> Any: - if not path.exists(): +def load_json_best_effort(path: Any, default: Any) -> Any: + if not path: + return default + path = Path(path) + try: + if not path.exists(): + return default + except OSError: return default try: return json.loads(path.read_text(encoding="utf-8-sig")) @@ -8527,6 +8655,8 @@ def load_json_best_effort(path: Path, default: Any) -> Any: def latest_mission_events(recent: list[Any]) -> dict[str, dict[str, Any]]: latest: dict[str, dict[str, Any]] = {} + if not isinstance(recent, (list, tuple, set)): + return latest for entry in recent: if not isinstance(entry, dict): continue @@ -8929,7 +9059,11 @@ def spark_home_boundary_errors(spark_home: Path = SPARK_HOME) -> list[str]: def spark_home_write_errors(paths: list[Path] | None = None) -> list[str]: errors: list[str] = [] + if paths is not None and not isinstance(paths, (list, tuple, set)): + paths = [paths] for path in paths or [SPARK_HOME, STATE_DIR, CONFIG_DIR, LOG_DIR]: + if not path or not hasattr(path, "exists"): + continue if path.exists() and not os.access(path, os.R_OK | os.W_OK): errors.append(f"{redact_shareable_text(str(path))} is not readable/writable by the current user.") return errors @@ -8939,7 +9073,11 @@ def local_secret_file_permission_errors(paths: list[Path] | None = None) -> list if os.name == "nt": return [] errors: list[str] = [] + if paths is not None and not isinstance(paths, (list, tuple, set)): + paths = [paths] for path in paths or [SECRETS_FILE_PATH, SECRETS_INDEX_PATH]: + if not path or not hasattr(path, "stat"): + continue try: mode = path.stat().st_mode & 0o777 except FileNotFoundError: @@ -9047,9 +9185,11 @@ def security_provider_detail(provider_payload: dict[str, Any]) -> str: return "; ".join(parts) -def git_short_status(path: Path) -> str: - result = subprocess.run( - git_command("-C", str(path), "status", "--porcelain"), +def git_short_status(path: Any) -> str: + if not path: + return "" + path = Path(path) + result = run_git_subprocess(git_command("-C", str(path), "status", "--porcelain"), capture_output=True, text=True, timeout=10, @@ -9057,9 +9197,11 @@ def git_short_status(path: Path) -> str: return result.stdout.strip() if result.returncode == 0 else "" -def git_current_head(path: Path) -> str | None: - result = subprocess.run( - git_command("-C", str(path), "rev-parse", "HEAD"), +def git_current_head(path: Any) -> str | None: + if not path: + return None + path = Path(path) + result = run_git_subprocess(git_command("-C", str(path), "rev-parse", "HEAD"), capture_output=True, text=True, timeout=10, @@ -9186,7 +9328,9 @@ def runtime_supply_chain_warnings(modules: Iterable[Module]) -> list[str]: return warnings -def truthy_env(name: str) -> bool: +def truthy_env(name: Any) -> bool: + if not isinstance(name, str): + return False return str(os.environ.get(name) or "").strip().lower() in {"1", "true", "yes", "on"} @@ -10373,7 +10517,7 @@ def redact_for_llm(value: Any) -> Any: def codex_config_path(env: dict[str, str] | None = None) -> Path: - source = env if env is not None else os.environ + source = env if isinstance(env, dict) else os.environ codex_home = str(source.get("CODEX_HOME") or "").strip() if codex_home: return Path(codex_home).expanduser() / "config.toml" @@ -10440,6 +10584,8 @@ def codex_active_roles() -> list[str]: def codex_client_config_payload(env: dict[str, str] | None = None) -> dict[str, Any]: + if env is not None and not isinstance(env, dict): + env = None path = codex_config_path(env) payload: dict[str, Any] = { "provider": "codex", @@ -10535,6 +10681,10 @@ def atomic_write_text(path: Path, content: str) -> None: def save_codex_client_config(updates: dict[str, str], env: dict[str, str] | None = None) -> dict[str, Any]: + if not isinstance(updates, dict): + updates = {} + if env is not None and not isinstance(env, dict): + env = None normalized = {key: validate_codex_config_value(key, value) for key, value in updates.items() if value is not None} path = codex_config_path(env) before = path.read_text(encoding="utf-8") if path.exists() else "" @@ -10605,14 +10755,15 @@ def render_llm_doctor_prompt(context: dict[str, Any]) -> str: ) -def configured_llm_role_state(role: str) -> dict[str, Any]: +def configured_llm_role_state(role: Any) -> dict[str, Any]: setup_state = load_json(CONFIG_PATH, {}) llm_state = setup_state.get("llm") if isinstance(setup_state, dict) else {} if not isinstance(llm_state, dict): return {} roles = llm_state.get("roles") - if isinstance(roles, dict) and isinstance(roles.get(role), dict): - state = dict(roles[role]) + role_str = str(role or "") + if isinstance(roles, dict) and isinstance(roles.get(role_str), dict): + state = dict(roles[role_str]) else: state = dict(llm_state) state.setdefault("provider", llm_state.get("provider")) @@ -13589,6 +13740,7 @@ def cmd_verify(args: argparse.Namespace) -> int: def resolve_installed_target_modules(target: str | None) -> list[Module]: + target_str = str(target or "") if target is not None else None modules = resolve_installed_modules() if not modules: return [] @@ -14112,6 +14264,8 @@ def process_runtime_detail(pids: dict[str, Any], module_names: list[str]) -> tup def replace_or_append_flag(argv: list[str], flag: str, value: str) -> list[str]: + if not isinstance(argv, (list, tuple)): + argv = [] updated = list(argv) try: index = updated.index(flag) @@ -14127,7 +14281,9 @@ def replace_or_append_flag(argv: list[str], flag: str, value: str) -> list[str]: def module_runtime_command_argv(module: Module, command: str, cwd: Path, env: dict[str, str]) -> list[str]: argv = direct_node_package_script_argv(command, cwd) or runtime_command_argv(command) - if module.name != "spawner-ui": + if not isinstance(env, dict): + env = {} + if not module or getattr(module, "name", None) != "spawner-ui": return argv bind_host = (env.get("SPARK_SPAWNER_HOST") or "").strip() bind_port = (env.get("SPARK_SPAWNER_PORT") or "").strip() @@ -14138,13 +14294,15 @@ def module_runtime_command_argv(module: Module, command: str, cwd: Path, env: di return argv -def spawner_should_use_liveness_endpoint(env: dict[str, str]) -> bool: +def spawner_should_use_liveness_endpoint(env: Any) -> bool: # Spawner liveness is separate from provider readiness; provider details # stay visible through `spark providers status`. return True def spawner_liveness_can_trust_local_port(env: dict[str, str]) -> bool: + if not isinstance(env, dict): + env = {} if str(env.get("SPARK_LIVE_CONTAINER") or "").strip().lower() in {"1", "true", "yes", "on"}: return True pids = load_pids() @@ -14157,6 +14315,8 @@ def spawner_liveness_can_trust_local_port(env: dict[str, str]) -> bool: def spawner_runtime_port(module: Module, env: dict[str, str]) -> str: + if not isinstance(env, dict): + env = {} bind_port = (env.get("SPARK_SPAWNER_PORT") or os.environ.get("SPARK_SPAWNER_PORT") or "").strip() if bind_port: return bind_port @@ -14168,21 +14328,27 @@ def spawner_runtime_port(module: Module, env: dict[str, str]) -> str: return "3333" -def spawner_runtime_health_url(module: Module, env: dict[str, str]) -> str: +def spawner_runtime_health_url(module: Module, env: Any) -> str: path = "/api/health/live" if spawner_should_use_liveness_endpoint(env) else "/api/providers" return f"http://127.0.0.1:{spawner_runtime_port(module, env)}{path}" def module_runtime_ready_check(module: Module, env: dict[str, str]) -> str: - if module.name == "spawner-ui": + if not isinstance(env, dict): + env = {} + if not module: + return "" + if getattr(module, "name", None) == "spawner-ui": bind_port = (env.get("SPARK_SPAWNER_PORT") or "").strip() if bind_port: return spawner_runtime_health_url(module, env) return module.ready_check -def expected_runtime_process_names(installed_names: set[str], setup_state: dict[str, Any]) -> list[str]: +def expected_runtime_process_names(installed_names: Any, setup_state: dict[str, Any]) -> list[str]: names: list[str] = [] + if not isinstance(installed_names, (set, list, tuple)): + return names profiles = setup_state.get("telegram_profiles") if isinstance(setup_state, dict) else None has_profiles = isinstance(profiles, dict) and bool(profiles) external_telegram = telegram_ingress_is_external(setup_state if isinstance(setup_state, dict) else {}) @@ -14200,6 +14366,8 @@ def expected_runtime_process_names(installed_names: set[str], setup_state: dict[ def telegram_profile_runtime_status(setup_state: dict[str, Any], pids: dict[str, Any]) -> list[dict[str, Any]]: + if not isinstance(setup_state, dict): + setup_state = {} profiles = setup_state.get("telegram_profiles") if not isinstance(profiles, dict): return [] @@ -14555,15 +14723,18 @@ def spark_invocation_args() -> list[str]: spark_home_wrapper = SPARK_HOME / "bin" / wrapper_name if spark_home_wrapper.exists(): return [str(spark_home_wrapper.resolve())] - argv0 = Path(str(sys.argv[0])).expanduser() - if argv0.exists() and argv0.suffix.lower() not in {".py", ".pyc"}: - return [str(argv0.resolve())] + argv0_str = sys.argv[0] if (sys.argv and len(sys.argv) > 0) else "" + if argv0_str: + argv0 = Path(str(argv0_str)).expanduser() + if argv0.exists() and argv0.suffix.lower() not in {".py", ".pyc"}: + return [str(argv0.resolve())] found = shutil.which("spark") if found: return [found] return [sys.executable, "-m", "spark_cli.cli"] + def shell_join(args: list[str]) -> str: if os.name == "nt": return subprocess.list2cmdline([str(arg) for arg in args]) @@ -14777,7 +14948,7 @@ def wsl_distro_name() -> str | None: def windows_path_to_wsl_path(path_text: str) -> Path: - value = path_text.strip().strip('"') + value = str(path_text or "").strip().strip('"') match = re.match(r"^([A-Za-z]):\\(.*)$", value) if match: drive = match.group(1).lower() @@ -14807,7 +14978,7 @@ def wsl_windows_startup_script_path() -> Path | None: def render_wsl_windows_startup_script(start_command: str, *, distro_name: str | None = None) -> str: - resolved_distro = distro_name or wsl_distro_name() + resolved_distro = str(distro_name or wsl_distro_name() or "").strip() if not resolved_distro: raise ValueError("Could not determine the WSL distro name for Windows-login autostart.") command = subprocess.list2cmdline( @@ -14820,7 +14991,7 @@ def render_wsl_windows_startup_script(start_command: str, *, distro_name: str | "--exec", "sh", "-lc", - start_command, + str(start_command or ""), ] ) return "Set shell = CreateObject(\"WScript.Shell\")\r\n" f"shell.Run {vbs_string(command)}, 0, False\r\n" @@ -14847,12 +15018,14 @@ def windows_run_key_command(startup_path: Path) -> str: def vbs_string(value: str) -> str: - return '"' + value.replace('"', '""') + '"' + val_str = str(value or "") + return '"' + val_str.replace('"', '""') + '"' def write_windows_startup_script(path: Path, start_command: str) -> None: + path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) - hidden_command = f"%ComSpec% /d /s /c {start_command}" + hidden_command = f"%ComSpec% /d /s /c {str(start_command or '')}" path.write_text( "Set shell = CreateObject(\"WScript.Shell\")\r\n" f"shell.CurrentDirectory = {vbs_string(str(SPARK_HOME))}\r\n" @@ -14863,11 +15036,14 @@ def write_windows_startup_script(path: Path, start_command: str) -> None: def windows_cmd_c(command: str) -> str: - return "cmd.exe /c " + subprocess.list2cmdline([command]) + return "cmd.exe /c " + subprocess.list2cmdline([str(command or "")]) def run_autostart_helper(command: list[str]) -> subprocess.CompletedProcess[str]: - return subprocess.run(command, check=False, capture_output=True, text=True) + try: + return subprocess.run(command, check=False, capture_output=True, text=True) + except Exception as e: + return subprocess.CompletedProcess(command, -1, stdout="", stderr=str(e)) def print_helper_failure(command: list[str], result: subprocess.CompletedProcess[str]) -> None: @@ -14920,6 +15096,7 @@ def install_wsl_windows_login_bridge(start_command: str) -> tuple[Path | None, b def autostart_file_audit(path: Path, *, expected_command: str, expected_home: Path | None = None) -> dict[str, Any]: + path = Path(path) audit: dict[str, Any] = { "path": str(path), "exists": path.exists(), @@ -14937,9 +15114,10 @@ def autostart_file_audit(path: Path, *, expected_command: str, expected_home: Pa audit["warnings"].append(f"could not read autostart file: {exc}") return audit audit["readable"] = True - audit["current_command"] = expected_command in content + exp_cmd = str(expected_command or "") + audit["current_command"] = exp_cmd in content home = expected_home or SPARK_HOME - audit["current_home"] = str(home) in content or expected_command in content + audit["current_home"] = str(home) in content or exp_cmd in content try: parent_mode = stat.S_IMODE(path.parent.stat().st_mode) except OSError as exc: @@ -14958,6 +15136,8 @@ def autostart_file_audit(path: Path, *, expected_command: str, expected_home: Pa def print_autostart_file_audit(label: str, path: Path, *, expected_command: str) -> list[str]: + label = str(label or "") + path = Path(path) audit = autostart_file_audit(path, expected_command=expected_command) if not audit["exists"]: return [] @@ -14976,9 +15156,10 @@ def print_autostart_file_audit(label: str, path: Path, *, expected_command: str) return warnings + def cmd_autostart_install(args: argparse.Namespace) -> int: ensure_state_dirs() - target = validate_autostart_target(args.target or "telegram-starter") + target = validate_autostart_target(getattr(args, "target", None) or "telegram-starter") start_command = autostart_shell_command("start", target) stop_command = autostart_shell_command("stop", target) failures = 0 @@ -14994,7 +15175,7 @@ def cmd_autostart_install(args: argparse.Namespace) -> int: else: print("Could not install WSL Windows-login fallback because the WSL distro name could not be determined.") print("Run from inside the target WSL distro, or set WSL_DISTRO_NAME and try again.") - if args.now: + if getattr(args, "now", False): now_command = ["sh", "-lc", start_command] result = run_autostart_helper(now_command) if result.returncode != 0: @@ -15034,7 +15215,7 @@ def cmd_autostart_install(args: argparse.Namespace) -> int: if result.returncode != 0: failures += 1 print_helper_failure(command, result) - if args.now: + if getattr(args, "now", False): command = systemctl_command(scope, "restart", service_path.name) result = run_autostart_helper(command) if result.returncode != 0: @@ -15070,7 +15251,7 @@ def cmd_autostart_install(args: argparse.Namespace) -> int: if result.returncode != 0: failures += 1 print_helper_failure(command, result) - if args.now: + if getattr(args, "now", False): command = ["launchctl", "kickstart", "-k", f"{bootstrap_domain}/{AUTOSTART_LAUNCHD_LABEL}"] result = run_autostart_helper(command) if result.returncode != 0: @@ -15101,7 +15282,7 @@ def cmd_autostart_install(args: argparse.Namespace) -> int: print("Installed Windows Run-key fallback: " + ("yes" if run_key_installed else "no")) if not run_key_installed: failures += 1 - if args.now: + if getattr(args, "now", False): now_command = ["cmd", "/c", start_command] result = run_autostart_helper(now_command) if result.returncode != 0: @@ -15111,7 +15292,7 @@ def cmd_autostart_install(args: argparse.Namespace) -> int: print("Spark will start at login with: " + start_command) return 0 print(f"Installed Windows logon task: {AUTOSTART_WINDOWS_TASK_NAME}") - if args.now: + if getattr(args, "now", False): now_command = ["schtasks", "/Run", "/TN", AUTOSTART_WINDOWS_TASK_NAME] result = run_autostart_helper(now_command) if result.returncode != 0: @@ -15124,6 +15305,7 @@ def cmd_autostart_install(args: argparse.Namespace) -> int: raise SystemExit(f"Autostart is not supported on this platform yet: {sys.platform}") + def cmd_autostart_uninstall(_: argparse.Namespace) -> int: failures = 0 if sys.platform.startswith("linux") and running_under_wsl(): @@ -15199,6 +15381,8 @@ def cmd_autostart_profile(args: argparse.Namespace) -> int: profile = normalize_telegram_profile(getattr(args, "profile", None)) enabled = getattr(args, "state", "") == "on" setup_state = load_json(CONFIG_PATH, {}) + if not isinstance(setup_state, dict): + setup_state = {} profiles = setup_state.get("telegram_profiles") if isinstance(setup_state, dict) else None if not isinstance(profiles, dict) or profile not in profiles or not isinstance(profiles.get(profile), dict): print(f"Telegram profile is not configured: {profile}") @@ -15359,6 +15543,8 @@ def load_user_config() -> dict[str, Any]: def save_user_config(config: dict[str, Any]) -> None: + if not isinstance(config, dict): + config = {} save_json(USER_CONFIG_PATH, config) @@ -15366,7 +15552,12 @@ def save_user_config(config: dict[str, Any]) -> None: def dotted_get(config: dict[str, Any], key: str, default: Any = None) -> Any: - parts = key.split(".") + if not isinstance(config, dict): + return default + key_str = str(key or "") + if not key_str: + return default + parts = key_str.split(".") current: Any = config for part in parts: if not isinstance(current, dict) or part not in current: @@ -15375,14 +15566,18 @@ def dotted_get(config: dict[str, Any], key: str, default: Any = None) -> Any: return current -def validate_config_key(key: str) -> None: - if not key or any(not part for part in key.split(".")): +def validate_config_key(key: Any) -> None: + key_str = str(key or "") + if not key_str or any(not part for part in key_str.split(".")): raise ValueError("config key must contain non-empty dot-separated segments") def dotted_set(config: dict[str, Any], key: str, value: Any) -> None: + if not isinstance(config, dict): + raise ValueError("config must be a dictionary") validate_config_key(key) - parts = key.split(".") + key_str = str(key or "") + parts = key_str.split(".") current = config for part in parts[:-1]: existing = current.get(part) @@ -15394,8 +15589,11 @@ def dotted_set(config: dict[str, Any], key: str, value: Any) -> None: def dotted_unset(config: dict[str, Any], key: str) -> bool: + if not isinstance(config, dict): + return False validate_config_key(key) - parts = key.split(".") + key_str = str(key or "") + parts = key_str.split(".") current: Any = config for part in parts[:-1]: if not isinstance(current, dict) or part not in current: @@ -15407,8 +15605,10 @@ def dotted_unset(config: dict[str, Any], key: str) -> bool: return False -def coerce_config_value(raw: str) -> Any: +def coerce_config_value(raw: Any) -> Any: """Parse a CLI-supplied value into JSON-native types where possible.""" + if not isinstance(raw, str): + return raw try: return json.loads(raw) except (TypeError, ValueError): @@ -15416,9 +15616,13 @@ def coerce_config_value(raw: str) -> Any: def cmd_config_get(args: argparse.Namespace) -> int: - value = dotted_get(load_user_config(), args.key, default=CONFIG_MISSING) + key = getattr(args, "key", None) + if not key: + print("Error: config key is required", file=sys.stderr) + return 1 + value = dotted_get(load_user_config(), key, default=CONFIG_MISSING) if value is CONFIG_MISSING: - print(f"{args.key} is not set") + print(f"{key} is not set") return 1 if isinstance(value, (dict, list)): print(json.dumps(value, indent=2)) @@ -15430,30 +15634,39 @@ def cmd_config_get(args: argparse.Namespace) -> int: def cmd_config_set(args: argparse.Namespace) -> int: + key = getattr(args, "key", None) + val_raw = getattr(args, "value", None) + if not key: + print("Error: config key is required", file=sys.stderr) + return 1 config = load_user_config() - value = coerce_config_value(args.value) + value = coerce_config_value(val_raw) try: - dotted_set(config, args.key, value) + dotted_set(config, key, value) except ValueError as exc: print(f"Error: {exc}", file=sys.stderr) return 1 save_user_config(config) - print(f"Set {args.key} = {json.dumps(value)}") + print(f"Set {key} = {json.dumps(value)}") return 0 def cmd_config_unset(args: argparse.Namespace) -> int: + key = getattr(args, "key", None) + if not key: + print("Error: config key is required", file=sys.stderr) + return 1 config = load_user_config() try: - removed = dotted_unset(config, args.key) + removed = dotted_unset(config, key) except ValueError as exc: print(f"Error: {exc}", file=sys.stderr) return 1 if not removed: - print(f"{args.key} was not set") + print(f"{key} was not set") return 1 save_user_config(config) - print(f"Unset {args.key}") + print(f"Unset {key}") return 0 @@ -15547,8 +15760,9 @@ def cmd_config_list(_: argparse.Namespace) -> int: INIT_MAX_NAME_LENGTH = 64 -def validate_init_module_name(name: str) -> None: - if len(name) > INIT_MAX_NAME_LENGTH: +def validate_init_module_name(name: Any) -> None: + name_str = str(name or "") + if len(name_str) > INIT_MAX_NAME_LENGTH: raise SystemExit( "Module name is too long " f"({len(name)} chars). Use {INIT_MAX_NAME_LENGTH} characters or fewer." @@ -15559,8 +15773,9 @@ def validate_init_module_name(name: str) -> None: ) -def render_init_spark_toml(name: str, kind: str, description: str) -> str: - if kind == "python": +def render_init_spark_toml(name: Any, kind: Any, description: Any) -> str: + kind_str = str(kind or "").lower() + if kind_str == "python": runtime_kind = "python" runtime_version = ">=3.11" healthcheck = "python -c \\\"print('ok')\\\"" @@ -15583,7 +15798,8 @@ def render_init_spark_toml(name: str, kind: str, description: str) -> str: ) -def scaffold_module_files(target_dir: Path, name: str, kind: str, description: str) -> list[Path]: +def scaffold_module_files(target_dir: Any, name: Any, kind: Any, description: Any) -> list[Path]: + target_dir = Path(target_dir) target_dir.mkdir(parents=True, exist_ok=True) spark_toml = target_dir / "spark.toml" readme = target_dir / "README.md" @@ -15608,7 +15824,7 @@ def scaffold_module_files(target_dir: Path, name: str, kind: str, description: s def cmd_init(args: argparse.Namespace) -> int: - name = args.name.strip() + name = str(getattr(args, "name", "") or "").strip() validate_init_module_name(name) target_dir = Path(args.path).resolve() if args.path else Path(name).resolve() if target_dir.exists() and any(target_dir.iterdir()) and not args.force: @@ -15628,8 +15844,12 @@ def cmd_init(args: argparse.Namespace) -> int: def cmd_search(args: argparse.Namespace) -> int: registry = load_registry_definition() entries = registry.get("modules", {}) or {} + if not isinstance(entries, dict): + entries = {} installed = load_json(REGISTRY_PATH, {}) - query = (args.query or "").strip().lower() + if not isinstance(installed, dict): + installed = {} + query = str(getattr(args, "query", "") or "").strip().lower() hits: list[tuple[str, str, bool, bool]] = [] for name, metadata in entries.items(): @@ -15665,7 +15885,13 @@ def cmd_secrets_list(_: argparse.Namespace) -> int: def cmd_secrets_set(args: argparse.Namespace) -> int: - if args.value is not None: + secret_id = getattr(args, "secret_id", None) + if not secret_id: + print("Error: secret_id is required", file=sys.stderr) + return 1 + val_arg = getattr(args, "value", None) + if val_arg is not None: + value = val_arg value = args.value elif stdin_is_tty(): value = read_secret_interactive( @@ -15684,7 +15910,11 @@ def cmd_secrets_set(args: argparse.Namespace) -> int: def cmd_secrets_get(args: argparse.Namespace) -> int: - value = fetch_secret(args.secret_id) + secret_id = getattr(args, "secret_id", None) + if not secret_id: + print("Error: secret_id is required", file=sys.stderr) + return 1 + value = fetch_secret(secret_id) if value is None: raise SystemExit(f"No value stored for {args.secret_id}.") if args.reveal: @@ -15700,7 +15930,11 @@ def cmd_secrets_get(args: argparse.Namespace) -> int: def cmd_secrets_delete(args: argparse.Namespace) -> int: - if delete_secret(args.secret_id): + secret_id = getattr(args, "secret_id", None) + if not secret_id: + print("Error: secret_id is required", file=sys.stderr) + return 1 + if delete_secret(secret_id): # This prints only the secret label after deletion. # codeql[py/clear-text-logging-sensitive-data] print(f"Deleted {args.secret_id}.") @@ -15712,9 +15946,13 @@ def cmd_secrets_delete(args: argparse.Namespace) -> int: def cmd_logs(args: argparse.Namespace) -> int: + target = getattr(args, "target", None) + if not target: + print("Error: target module is required", file=sys.stderr) + return 1 installed = resolve_installed_modules() - if args.target not in installed: - raise SystemExit(unknown_installed_module_message(args.target, installed)) + if target not in installed: + raise SystemExit(unknown_installed_module_message(target, installed)) requested_profile = getattr(args, "profile", None) if args.target == "spark-telegram-bot" and requested_profile is None: profile = primary_telegram_profile() @@ -15737,7 +15975,8 @@ def cmd_logs(args: argparse.Namespace) -> int: def cmd_update(args: argparse.Namespace) -> int: ensure_state_dirs() - modules = resolve_installed_target_modules(args.target) + target = getattr(args, "target", None) + modules = resolve_installed_target_modules(target) if not modules: print("No installed Spark modules recorded.") return 0 @@ -15841,9 +16080,11 @@ def cmd_update(args: argparse.Namespace) -> int: def cmd_uninstall(args: argparse.Namespace) -> int: - if getattr(args, "all", False) and args.target: + all_flag = getattr(args, "all", False) + target = getattr(args, "target", None) + if all_flag and target: raise SystemExit("Use either a target or --all, not both.") - if not getattr(args, "all", False) and not args.target: + if not all_flag and not target: raise SystemExit("Specify a module to uninstall, or use --all to uninstall everything.") if getattr(args, "purge_home", False) and not getattr(args, "yes", False): raise SystemExit("Refusing to purge Spark home without --yes.") @@ -16127,6 +16368,8 @@ def onboarding_guide_payload() -> dict[str, Any]: def cmd_guide(args: argparse.Namespace) -> int: payload = onboarding_guide_payload() + if not isinstance(args, argparse.Namespace): + args = argparse.Namespace() if getattr(args, "json", False): print(json.dumps(payload, indent=2)) return 0 @@ -16196,9 +16439,9 @@ def cmd_guide(args: argparse.Namespace) -> int: return 0 -def positive_int_arg(value: str) -> int: +def positive_int_arg(value: Any) -> int: try: - parsed = int(value) + parsed = int(str(value or "")) except ValueError as exc: raise argparse.ArgumentTypeError( f"expected a positive integer, got {value!r}" @@ -16213,8 +16456,9 @@ def positive_int_arg(value: str) -> int: def _wrap_subgroup_help(group_parser: argparse.ArgumentParser, subcommands: list[str]) -> None: original_error = group_parser.error - def friendly_error(message: str) -> None: - if message and "arguments are required" in message: + def friendly_error(message: Any) -> None: + message_str = str(message or "") + if message_str and "arguments are required" in message_str: group_parser.print_usage(sys.stderr) sys.stderr.write( f"\n{group_parser.prog} needs a subcommand. Try one of: " diff --git a/src/spark_cli/env_files.py b/src/spark_cli/env_files.py index ba18b85d..5499cd47 100644 --- a/src/spark_cli/env_files.py +++ b/src/spark_cli/env_files.py @@ -2,7 +2,7 @@ def normalize_env_file_value(value: str) -> str: - normalized = value.strip() + normalized = str(value or "").strip() if len(normalized) >= 2 and normalized[0] == normalized[-1] and normalized[0] in {"'", '"'}: return normalized[1:-1] return normalized diff --git a/src/spark_cli/runtime_policy.py b/src/spark_cli/runtime_policy.py index 08a73198..9deb3b44 100644 --- a/src/spark_cli/runtime_policy.py +++ b/src/spark_cli/runtime_policy.py @@ -12,42 +12,45 @@ def split_single_argv_command(command: str, subject: str) -> list[str]: - parts = shlex.split(command, posix=True) + parts = shlex.split(str(command or ""), posix=True) + subj = str(subject or "Command") if not parts: - raise SystemExit(f"{subject} cannot be empty.") + raise SystemExit(f"{subj} cannot be empty.") if any(part in SHELL_CHAIN_TOKENS for part in parts): - raise SystemExit(f"{subject} must be a single argv command, not a shell command chain.") + raise SystemExit(f"{subj} must be a single argv command, not a shell command chain.") return parts def resolve_runtime_executable(name: str) -> str: - path = shutil.which(name) + name_str = str(name or "") + path = shutil.which(name_str) if path: return path - if os.name == "nt" and not name.lower().endswith((".exe", ".cmd", ".bat", ".ps1")): + if os.name == "nt" and not name_str.lower().endswith((".exe", ".cmd", ".bat", ".ps1")): for suffix in (".cmd", ".exe", ".bat"): - path = shutil.which(name + suffix) + path = shutil.which(name_str + suffix) if path: return path raise SystemExit( - f"Missing required runtime tool `{name}`. Install it, reopen the terminal, then rerun the command. " + f"Missing required runtime tool `{name_str}`. Install it, reopen the terminal, then rerun the command. " "For Node modules, install Node.js 22+ or rerun Spark's installer with managed Node enabled." ) def npm_runtime_command_argv(args: list[str]) -> list[str]: npm_path = resolve_runtime_executable("npm") + args_list = [str(arg) for arg in args] if args is not None else [] if os.name == "nt" and os.path.splitext(npm_path)[1].lower() in {".cmd", ".bat"}: npm_dir = os.path.dirname(npm_path) node_path = shutil.which("node") or os.path.join(npm_dir, "node.exe") npm_cli = os.path.join(npm_dir, "node_modules", "npm", "bin", "npm-cli.js") if node_path and os.path.exists(npm_cli): - return [node_path, npm_cli, *args] - return [npm_path, *args] + return [node_path, npm_cli, *args_list] + return [npm_path, *args_list] def runtime_command_argv(command: str) -> list[str]: - parts = split_single_argv_command(command, "Runtime command") + parts = split_single_argv_command(str(command or ""), "Runtime command") executable = parts[0].lower() if executable in {"python", "python3"}: return [str(Path(sys.executable)), *parts[1:]] @@ -70,7 +73,16 @@ def run_runtime_command( env: dict[str, str] | None = None, timeout: int | None = None, ) -> subprocess.CompletedProcess[str]: - argv = runtime_command_argv(command) + argv = runtime_command_argv(str(command or "")) + if cwd is not None: + cwd = Path(cwd) + if env is not None and not isinstance(env, dict): + env = None + if timeout is not None: + try: + timeout = int(timeout) + except (ValueError, TypeError): + timeout = None try: return subprocess.run( argv, diff --git a/src/spark_cli/security/approval.py b/src/spark_cli/security/approval.py index a3a403d9..6b1e40f0 100644 --- a/src/spark_cli/security/approval.py +++ b/src/spark_cli/security/approval.py @@ -52,51 +52,64 @@ def to_dict(self) -> dict[str, object]: def _digest_command(argv: list[str]) -> str: - redacted = [SECRET_LIKE_PATTERN.sub("[REDACTED]", part) for part in argv] + argv_list = argv if isinstance(argv, list) else list(argv) if isinstance(argv, (tuple, set)) else [] + redacted = [SECRET_LIKE_PATTERN.sub("[REDACTED]", str(part or "")) for part in argv_list] return hashlib.sha256("\0".join(redacted).encode("utf-8")).hexdigest() def _lower_parts(argv: list[str]) -> list[str]: - return [part.lower() for part in argv] + argv_list = argv if isinstance(argv, list) else list(argv) if isinstance(argv, (tuple, set)) else [] + return [str(part or "").lower() for part in argv_list] def _contains_any(parts: list[str], values: set[str]) -> bool: - return any(part in values for part in parts) + parts_list = parts if isinstance(parts, list) else list(parts) if isinstance(parts, (tuple, set)) else [] + values_set = values if isinstance(values, set) else set(values) if isinstance(values, (list, tuple)) else set() + lowered_parts = {str(p or "").lower() for p in parts_list} + lowered_values = {str(v or "").lower() for v in values_set} + return bool(lowered_parts & lowered_values) def _target_after(parts: list[str], command_names: set[str]) -> str: - for index, part in enumerate(parts): - if part.lower() in command_names and index + 1 < len(parts): - for candidate in parts[index + 1 :]: - if not candidate.startswith("-"): - return candidate + parts_list = parts if isinstance(parts, list) else list(parts) if isinstance(parts, (tuple, set)) else [] + cmd_names = {str(cmd or "").lower() for cmd in (command_names if isinstance(command_names, set) else set(command_names or []))} + for index, part in enumerate(parts_list): + part_str = str(part or "").lower() + if part_str in cmd_names and index + 1 < len(parts_list): + for candidate in parts_list[index + 1 :]: + candidate_str = str(candidate or "") + if not candidate_str.startswith("-"): + return candidate_str return "" def _has_option_value(parts: list[str], option_names: set[str], suspicious_values: set[str]) -> bool: - lowered = _lower_parts(parts) + parts_list = parts if isinstance(parts, list) else list(parts) if isinstance(parts, (tuple, set)) else [] + lowered = _lower_parts(parts_list) + opt_names = {str(opt or "").lower() for opt in (option_names if isinstance(option_names, set) else set(option_names or []))} + susp_vals = {str(susp or "").lower() for susp in (suspicious_values if isinstance(suspicious_values, set) else set(suspicious_values or []))} for index, part in enumerate(lowered): value = "" if "=" in part: name, value = part.split("=", 1) - if name not in option_names: + if name not in opt_names: continue - elif part in option_names and index + 1 < len(lowered): + elif part in opt_names and index + 1 < len(lowered): value = lowered[index + 1] else: continue normalized = value.replace("\\", "/").rstrip("/") if ( - normalized in suspicious_values - or any(normalized.startswith(item.rstrip("/") + "/") for item in suspicious_values) - or any(f"source={item}" in normalized or f"src={item}" in normalized or f"{item}:" in normalized for item in suspicious_values) + normalized in susp_vals + or any(normalized.startswith(item.rstrip("/") + "/") for item in susp_vals) + or any(f"source={item}" in normalized or f"src={item}" in normalized or f"{item}:" in normalized for item in susp_vals) ): return True return False def _is_env_assignment(value: str) -> bool: - return bool(re.match(r"^[A-Za-z_][A-Za-z0-9_]*=.*", value)) + return bool(re.match(r"^[A-Za-z_][A-Za-z0-9_]*=.*", str(value or ""))) def _decision( @@ -109,8 +122,11 @@ def _decision( target_display: str = "", confirmation_phrase: str = "", ) -> ApprovalDecision: + argv_list = argv if isinstance(argv, list) else list(argv) if isinstance(argv, (tuple, set)) else [] requires = action_class != "none" phrase = confirmation_phrase + ctx_non_interactive = getattr(context, "non_interactive", False) + ctx_surface = getattr(context, "surface", "cli") if requires and not phrase: noun = target_display or action_class.replace("_", " ") phrase = f"approve {noun}".strip().lower()[:80] @@ -118,25 +134,27 @@ def _decision( action_class=action_class, risk=risk, requires_approval=requires, - approval_mode="blocked" if requires and context.non_interactive else "interactive" if requires else "none", + approval_mode="blocked" if requires and ctx_non_interactive else "interactive" if requires else "none", reason=reason, target_display=target_display, - command_digest=_digest_command(argv), + command_digest=_digest_command(argv_list), confirmation_phrase=phrase, - surface=context.surface, + surface=ctx_surface, ) def parse_command_text(command: str) -> list[str]: + cmd_str = str(command or "") try: - return shlex.split(command, posix=True) + return shlex.split(cmd_str, posix=True) except ValueError: - return command.split() + return cmd_str.split() def approval_required_for_command(argv: list[str], context: CommandContext | None = None) -> ApprovalDecision: - ctx = context or CommandContext() - parts = [part for part in argv if part != "--"] + ctx = context if isinstance(context, CommandContext) else CommandContext() + argv_list = argv if isinstance(argv, list) else list(argv) if isinstance(argv, (tuple, set)) else [] + parts = [str(part or "") for part in argv_list if str(part or "") != "--"] lowered = _lower_parts(parts) if not lowered: return _decision(parts, ctx, "none", "none", "Empty command.") diff --git a/src/spark_cli/system_map.py b/src/spark_cli/system_map.py index 93377ffb..6a9a0442 100644 --- a/src/spark_cli/system_map.py +++ b/src/spark_cli/system_map.py @@ -658,7 +658,8 @@ def inspect_builder_state_db(builder_home: Path) -> dict[str, Any]: def summarize_upgrade_ledger(repo_paths: list[Path]) -> dict[str, Any]: - for repo in repo_paths: + paths_list = [Path(p) for p in as_list(repo_paths)] + for repo in paths_list: candidate = repo / "docs" / "SPARK_UPGRADE_LEDGER.yaml" if not candidate.exists(): continue @@ -679,7 +680,7 @@ def summarize_upgrade_ledger(repo_paths: list[Path]) -> dict[str, Any]: def summarize_capability_ledger(builder_home: Path) -> dict[str, Any]: - path = builder_home / "artifacts" / "capability-ledger" / "capability-ledger.json" + path = Path(builder_home) / "artifacts" / "capability-ledger" / "capability-ledger.json" data, error = read_json(path) out: dict[str, Any] = {"path": str(path), "exists": path.exists(), "redaction": "shape only; contents omitted"} if error and error != "missing": @@ -688,7 +689,7 @@ def summarize_capability_ledger(builder_home: Path) -> dict[str, Any]: if isinstance(data, list): out["entry_count"] = len(data) elif isinstance(data, dict): - out["top_level_keys"] = sorted(data.keys()) + out["top_level_keys"] = sorted(str(k) for k in data.keys()) for key, value in data.items(): if isinstance(value, list): out[f"{key}_count"] = len(value) @@ -696,6 +697,7 @@ def summarize_capability_ledger(builder_home: Path) -> dict[str, Any]: def count_safe_jsonl(path: Path) -> dict[str, Any]: + path = Path(path) out: dict[str, Any] = { "path": str(path), "exists": path.exists(), @@ -750,20 +752,22 @@ def inspect_safe_jsonl_samples( identifier_fields: dict[str, str] | None = None, limit: int = 40, ) -> dict[str, Any]: + path = Path(path) + safe_fields = tuple(str(f) for f in as_list(safe_fields)) out: dict[str, Any] = { - "source": source, + "source": str(source or ""), "path": str(path), "exists": path.exists(), - "limit": limit, + "limit": int(limit or 40), "redaction": "bounded samples over allowlisted primitive metadata only; raw messages and text previews omitted", } if not path.exists(): return out - identifier_fields = identifier_fields or {} + identifier_fields = as_dict(identifier_fields) line_count = parsed_count = parse_errors = redacted_key_name_count = 0 key_counts: Counter[str] = Counter() - samples: deque[dict[str, Any]] = deque(maxlen=max(0, min(int(limit), 100))) + samples: deque[dict[str, Any]] = deque(maxlen=max(0, min(int(limit or 40), 100))) try: with path.open("r", encoding="utf-8") as handle: for line in handle: @@ -809,6 +813,8 @@ def inspect_safe_jsonl_samples( def safe_jsonl_sample_value(field: str, value: Any, *, identifier_fields: dict[str, str]) -> Any: + field = str(field or "") + identifier_fields = as_dict(identifier_fields) if value is None or isinstance(value, (bool, int, float)): return value if isinstance(value, str): @@ -824,6 +830,7 @@ def safe_jsonl_sample_value(field: str, value: Any, *, identifier_fields: dict[s def inspect_telegram_final_answer_gate(path: Path) -> dict[str, Any]: + path = Path(path) out = inspect_safe_jsonl_samples( path, source="telegram_final_answer_gate", @@ -842,6 +849,7 @@ def inspect_telegram_final_answer_gate(path: Path) -> dict[str, Any]: def inspect_telegram_outbound_audit(path: Path) -> dict[str, Any]: + path = Path(path) return inspect_safe_jsonl_samples( path, source="telegram_outbound_audit", @@ -850,6 +858,8 @@ def inspect_telegram_outbound_audit(path: Path) -> dict[str, Any]: def inspect_spawner_prd_auto_trace(path: Path, *, builder_home: Path) -> dict[str, Any]: + path = Path(path) + builder_home = Path(builder_home) out = inspect_safe_jsonl_samples( path, source="spawner_prd_auto_trace", @@ -909,14 +919,16 @@ def inspect_spawner_prd_auto_trace(path: Path, *, builder_home: Path) -> dict[st def inspect_builder_request_id_overlap(builder_home: Path, request_ids: set[str]) -> dict[str, Any]: + builder_home = Path(builder_home) + request_ids_set = set(str(r) for r in as_list(request_ids)) db_path = builder_home / "state.db" out: dict[str, Any] = { "source": "builder_events", "exists": db_path.exists(), - "checked_request_id_count": len(request_ids), + "checked_request_id_count": len(request_ids_set), "redaction": "overlap counts only; request id values omitted", } - if not request_ids or not db_path.exists(): + if not request_ids_set or not db_path.exists(): out["matched_builder_request_id_count"] = 0 return out try: @@ -932,7 +944,7 @@ def inspect_builder_request_id_overlap(builder_home: Path, request_ids: set[str] out["request_id_column_exists"] = False out["matched_builder_request_id_count"] = 0 return out - candidates = sorted(request_ids)[:500] + candidates = sorted(request_ids_set)[:500] placeholders = ",".join("?" for _ in candidates) matched = conn.execute( f""" @@ -951,14 +963,16 @@ def inspect_builder_request_id_overlap(builder_home: Path, request_ids: set[str] def inspect_builder_trace_ref_overlap(builder_home: Path, trace_refs: set[str]) -> dict[str, Any]: + builder_home = Path(builder_home) + trace_refs_set = set(str(t) for t in as_list(trace_refs)) db_path = builder_home / "state.db" out: dict[str, Any] = { "source": "builder_events", "exists": db_path.exists(), - "checked_trace_ref_count": len(trace_refs), + "checked_trace_ref_count": len(trace_refs_set), "redaction": "overlap counts only; trace ref values omitted", } - if not trace_refs or not db_path.exists(): + if not trace_refs_set or not db_path.exists(): out["matched_builder_trace_ref_count"] = 0 return out try: @@ -974,7 +988,7 @@ def inspect_builder_trace_ref_overlap(builder_home: Path, trace_refs: set[str]) out["trace_ref_column_exists"] = False out["matched_builder_trace_ref_count"] = 0 return out - candidates = sorted(trace_refs)[:500] + candidates = sorted(trace_refs_set)[:500] placeholders = ",".join("?" for _ in candidates) matched = conn.execute( f""" @@ -993,6 +1007,7 @@ def inspect_builder_trace_ref_overlap(builder_home: Path, trace_refs: set[str]) def inspect_spawner_authority_verdicts(path: Path) -> dict[str, Any]: + path = Path(path) out: dict[str, Any] = { "source": "spawner_prd_auto_trace", "path": str(path), @@ -1067,6 +1082,8 @@ def inspect_spawner_authority_verdicts(path: Path) -> dict[str, Any]: def build_spark_os_review_candidates(path: Path, *, builder_home: Path) -> dict[str, Any]: + path = Path(path) + builder_home = Path(builder_home) out: dict[str, Any] = { "schema_version": REVIEW_CANDIDATES_SCHEMA, "source": "spawner_prd_auto_trace", @@ -1361,6 +1378,7 @@ def build_spark_os_review_candidates(path: Path, *, builder_home: Path) -> dict[ def inspect_json_shape(path: Path) -> dict[str, Any]: + path = Path(path) data, error = read_json(path) out: dict[str, Any] = {"path": str(path), "exists": path.exists(), "redaction": "shape only; values omitted"} if error and error != "missing": @@ -1379,6 +1397,7 @@ def inspect_json_shape(path: Path) -> dict[str, Any]: def inspect_file_metadata(path: Path) -> dict[str, Any]: + path = Path(path) out: dict[str, Any] = { "path": str(path), "exists": path.exists(), @@ -1399,14 +1418,15 @@ def inspect_file_metadata(path: Path) -> dict[str, Any]: def safe_short_string(value: str, limit: int = 240) -> str: - cleaned = re.sub(r"(?i)(api[_-]?key|token|secret)([=:\s]+)(\S+)", r"\1\2[redacted]", value.strip()) + cleaned = re.sub(r"(?i)(api[_-]?key|token|secret)([=:\s]+)(\S+)", r"\1\2[redacted]", str(value or "").strip()) + limit = int(limit or 240) if len(cleaned) <= limit: return cleaned return cleaned[: limit - 3] + "..." def sensitive_identifier(value: str) -> bool: - lowered = value.lower() + lowered = str(value or "").lower() return bool( re.search(r"(human|telegram|user|chat):", lowered) or re.search(r"\d{7,}", lowered) @@ -1415,11 +1435,14 @@ def sensitive_identifier(value: str) -> bool: def redacted_identifier(column: str, value: str) -> str: - digest = hashlib.sha256(value.encode("utf-8", errors="ignore")).hexdigest()[:12] + column = str(column or "") + value_str = str(value or "") + digest = hashlib.sha256(value_str.encode("utf-8", errors="ignore")).hexdigest()[:12] return f"{column}:redacted:{digest}" def safe_builder_event_value(column: str, value: Any) -> Any: + column = str(column or "") if value is None: return None if isinstance(value, (int, float, bool)): @@ -1431,11 +1454,12 @@ def safe_builder_event_value(column: str, value: Any) -> Any: def key_has_raw_memory_hint(key: Any) -> bool: - lowered = str(key).lower() + lowered = str(key or "").lower() return any(hint in lowered for hint in RAW_MEMORY_KEY_HINTS) def safe_memory_status_value(value: Any, *, depth: int = 0) -> Any: + depth = int(depth or 0) if depth > 4: return "[depth-limit]" if value is None or isinstance(value, (bool, int, float)): @@ -1464,6 +1488,7 @@ def count_raw_memory_hint_keys(value: Any) -> int: def read_memory_movement_status_export(builder_home: Path) -> dict[str, Any]: + builder_home = Path(builder_home) path = builder_home / "artifacts" / "memory-movement-index" / "memory-movement-status.json" data, error = read_json(path) out: dict[str, Any] = { @@ -1490,6 +1515,7 @@ def read_memory_movement_status_export(builder_home: Path) -> dict[str, Any]: def count_files_under(path: Path, *, max_files: int = 5000) -> dict[str, Any]: + path = Path(path) out: dict[str, Any] = { "path": str(path), "exists": path.exists(), @@ -1525,6 +1551,7 @@ def count_files_under(path: Path, *, max_files: int = 5000) -> dict[str, Any]: def count_schema_files(path: Path, *, max_files: int = 500) -> dict[str, Any]: + path = Path(path) out: dict[str, Any] = { "path": str(path), "exists": path.exists(), @@ -1554,6 +1581,8 @@ def count_schema_files(path: Path, *, max_files: int = 500) -> dict[str, Any]: def repo_source_ref(repo_path: Path, path: Path) -> str: + repo_path = Path(repo_path) + path = Path(path) try: return path.relative_to(repo_path).as_posix() except ValueError: @@ -1573,21 +1602,21 @@ def proof_verdict( ) -> dict[str, Any]: return { "schema_version": CAPABILITY_PROOF_VERDICTS_SCHEMA, - "domain": domain, - "status": status, + "domain": str(domain or ""), + "status": str(status or ""), "satisfied": status == "passed", - "source_kind": source_kind, - "source_ref": source_ref, - "source_schema_version": schema_version, - "source_status": raw_status, - "source_verdict": raw_verdict, - "detail_counts": detail_counts or {}, + "source_kind": str(source_kind or ""), + "source_ref": str(source_ref or "") if source_ref else None, + "source_schema_version": str(schema_version or "") if schema_version else None, + "source_status": str(raw_status or "") if raw_status else None, + "source_verdict": str(raw_verdict or "") if raw_verdict else None, + "detail_counts": detail_counts if isinstance(detail_counts, dict) else {}, "redaction": "metadata only; proof bodies, commands, labels, and raw evidence omitted", } def missing_proof_verdict(domain: str) -> dict[str, Any]: - return proof_verdict(domain=domain, status="missing", source_kind="not_found") + return proof_verdict(domain=str(domain or ""), status="missing", source_kind="not_found") def source_presence_verdict( @@ -1597,20 +1626,24 @@ def source_presence_verdict( source_path: Path, source_kind: str, ) -> dict[str, Any]: + repo_path = Path(repo_path) + source_path = Path(source_path) if source_path.exists(): return proof_verdict( - domain=domain, + domain=str(domain or ""), status="present_unverified", - source_kind=source_kind, + source_kind=str(source_kind or ""), source_ref=repo_source_ref(repo_path, source_path), ) return missing_proof_verdict(domain) def status_from_json_verdict(data: dict[str, Any], *, passed_keys: tuple[str, ...] = ()) -> str: + data_dict = data if isinstance(data, dict) else {} + passed_keys_tuple = tuple(str(k) for k in as_list(passed_keys)) values = [ - str(data.get("verdict") or "").strip().lower(), - str(data.get("status") or "").strip().lower(), + str(data_dict.get("verdict") or "").strip().lower(), + str(data_dict.get("status") or "").strip().lower(), ] if any( value @@ -1622,8 +1655,8 @@ def status_from_json_verdict(data: dict[str, Any], *, passed_keys: tuple[str, .. for value in values ): return "blocked" - for key in passed_keys: - value = data.get(key) + for key in passed_keys_tuple: + value = data_dict.get(key) if value is True: return "passed" if value is False: @@ -1642,6 +1675,11 @@ def json_proof_verdict( source_kind: str, passed_keys: tuple[str, ...] = (), ) -> dict[str, Any]: + repo_path = Path(repo_path) + rel_path = str(rel_path or "") + domain = str(domain or "") + source_kind = str(source_kind or "") + passed_keys_tuple = tuple(str(k) for k in as_list(passed_keys)) path = repo_path / rel_path data, error = read_json(path) if error == "missing": @@ -1656,7 +1694,7 @@ def json_proof_verdict( payload = as_dict(data) return proof_verdict( domain=domain, - status=status_from_json_verdict(payload, passed_keys=passed_keys), + status=status_from_json_verdict(payload, passed_keys=passed_keys_tuple), source_kind=source_kind, source_ref=repo_source_ref(repo_path, path), schema_version=first_string(payload.get("schema_version")), @@ -1670,6 +1708,8 @@ def json_proof_verdict( def first_run_artifact(repo_path: Path, rel_path: str) -> Path | None: + repo_path = Path(repo_path) + rel_path = str(rel_path or "") runs_root = repo_path / "runs" if not runs_root.exists(): return None @@ -1684,6 +1724,7 @@ def first_run_artifact(repo_path: Path, rel_path: str) -> Path | None: def labs_creator_proof_sources(repo_path: Path) -> dict[str, Any]: + repo_path = Path(repo_path) gate_path = repo_path / LABS_CREATOR_SURFACE_FILES["release_gate"] benchmark_path = first_run_artifact(repo_path, LABS_CREATOR_RUN_ARTIFACTS["benchmark_manifest"]) loop_policy_path = first_run_artifact(repo_path, LABS_CREATOR_RUN_ARTIFACTS["loop_policy"]) @@ -1741,6 +1782,11 @@ def swarm_specialization_proof_sources( promotion_packet_count: int, evidence_ledger_count: int, ) -> dict[str, Any]: + repo_path = Path(repo_path) + benchmark_adapter_counts = benchmark_adapter_counts if isinstance(benchmark_adapter_counts, dict) else {} + rollback_policy_counts = rollback_policy_counts if isinstance(rollback_policy_counts, dict) else {} + promotion_packet_count = int(promotion_packet_count or 0) + evidence_ledger_count = int(evidence_ledger_count or 0) proof_sources = { "benchmark": ( proof_verdict(domain="benchmark", status="present_unverified", source_kind="benchmark_adapter_config") @@ -1773,8 +1819,8 @@ def swarm_specialization_proof_sources( status="present_unverified", source_kind="collective_packet_or_ledger", detail_counts={ - "promotion_packet_count": int(promotion_packet_count or 0), - "evidence_ledger_count": int(evidence_ledger_count or 0), + "promotion_packet_count": promotion_packet_count, + "evidence_ledger_count": evidence_ledger_count, }, ) if promotion_packet_count or evidence_ledger_count @@ -1788,6 +1834,8 @@ def capability_proof_summary( proof_verdicts: dict[str, Any], required_labels: dict[str, str], ) -> dict[str, Any]: + proof_verdicts = proof_verdicts if isinstance(proof_verdicts, dict) else {} + required_labels = required_labels if isinstance(required_labels, dict) else {} counts: Counter[str] = Counter() passed: list[str] = [] blocked: list[str] = [] @@ -1827,6 +1875,7 @@ def capability_proof_summary( def inspect_labs_creator_surface(repo_path: Path) -> dict[str, Any] | None: + repo_path = Path(repo_path) schema_dir = repo_path / "docs" / "creator_system" / "schemas" if not schema_dir.exists() and repo_path.name != "spark-domain-chip-labs": return None @@ -1847,7 +1896,7 @@ def inspect_labs_creator_surface(repo_path: Path) -> dict[str, Any] | None: pass return { - "repo": repo_path.name, + "repo": str(repo_path.name or ""), "schema_inventory": count_schema_files(schema_dir), "review_and_release_sources": { label: {"path": str(repo_path / rel_path), "exists": (repo_path / rel_path).exists()} @@ -1866,6 +1915,7 @@ def inspect_labs_creator_surface(repo_path: Path) -> dict[str, Any] | None: def inspect_swarm_specialization_surface(repo_path: Path) -> dict[str, Any] | None: + repo_path = Path(repo_path) config_path = repo_path / "config" / "specialization-paths.json" schemas_dir = repo_path / "schemas" has_specialization_schema = False @@ -1883,7 +1933,8 @@ def inspect_swarm_specialization_surface(repo_path: Path) -> dict[str, Any] | No return None config, error = read_json(config_path) - path_rows = as_list(as_dict(config).get("paths")) if isinstance(config, dict) else [] + config_dict = as_dict(config) + path_rows = as_list(config_dict.get("paths")) categories: Counter[str] = Counter() loop_kinds: Counter[str] = Counter() benchmark_adapters: Counter[str] = Counter() @@ -1913,7 +1964,7 @@ def inspect_swarm_specialization_surface(repo_path: Path) -> dict[str, Any] | No evidence_ledger_count = 0 return { - "repo": repo_path.name, + "repo": str(repo_path.name or ""), "config": { "path": str(config_path), "exists": config_path.exists(), @@ -2628,6 +2679,12 @@ def inspect_memory_lane_trace_join(conn: sqlite3.Connection) -> dict[str, Any]: limit 25 """ ).fetchall() + + def get_val(r, key, idx): + if isinstance(r, sqlite3.Row) or (hasattr(r, "keys") and key in r.keys()): + return r[key] + return r[idx] + out.update( { "status": "present" if trace_ref_present_count else "missing_trace_refs", @@ -2641,11 +2698,11 @@ def inspect_memory_lane_trace_join(conn: sqlite3.Connection) -> dict[str, Any]: "request_id_coverage_ratio": round(request_id_present_count / row_count, 4) if row_count else 0.0, "lane_status_counts": [ { - "artifact_lane": str(row["artifact_lane"]), - "status": str(row["status"]), - "row_count": int(row["row_count"] or 0), - "request_id_present_count": int(row["request_id_present_count"] or 0), - "trace_ref_present_count": int(row["trace_ref_present_count"] or 0), + "artifact_lane": str(get_val(row, "artifact_lane", 0)), + "status": str(get_val(row, "status", 1)), + "row_count": int(get_val(row, "row_count", 2) or 0), + "request_id_present_count": int(get_val(row, "request_id_present_count", 3) or 0), + "trace_ref_present_count": int(get_val(row, "trace_ref_present_count", 4) or 0), } for row in lane_rows ], @@ -3500,6 +3557,9 @@ def build_capability_catalog(repos: list[dict[str, Any]]) -> dict[str, Any]: def read_text_or_none(path: Path) -> str | None: + if not path: + return None + path = Path(path) if not path.exists(): return None try: @@ -3509,21 +3569,23 @@ def read_text_or_none(path: Path) -> str | None: def literal_assignment(text: str | None, name: str) -> Any: - if not text: + text_str = str(text or "") + if not text_str: return None + name_str = str(name or "") try: - tree = ast.parse(text) + tree = ast.parse(text_str) except SyntaxError: return None for node in tree.body: if not isinstance(node, ast.Assign): - if isinstance(node, ast.AnnAssign) and isinstance(node.target, ast.Name) and node.target.id == name: + if isinstance(node, ast.AnnAssign) and isinstance(node.target, ast.Name) and node.target.id == name_str: try: return ast.literal_eval(node.value) except Exception: return None continue - if any(isinstance(target, ast.Name) and target.id == name for target in node.targets): + if any(isinstance(target, ast.Name) and target.id == name_str for target in node.targets): try: return ast.literal_eval(node.value) except Exception: @@ -3532,43 +3594,54 @@ def literal_assignment(text: str | None, name: str) -> Any: def regex_int(text: str | None, pattern: str) -> int | None: - if not text: + text_str = str(text or "") + pattern_str = str(pattern or "") + if not text_str or not pattern_str: return None - match = re.search(pattern, text) + match = re.search(pattern_str, text_str) if not match: return None try: return int(match.group(1)) - except ValueError: + except (ValueError, IndexError): return None def regex_string(text: str | None, pattern: str) -> str | None: - if not text: + text_str = str(text or "") + pattern_str = str(pattern or "") + if not text_str or not pattern_str: return None - match = re.search(pattern, text) + match = re.search(pattern_str, text_str) if not match: return None - value = match.group(1).strip() - return value or None + try: + value = match.group(1).strip() + return value or None + except IndexError: + return None def parse_ts_union(text: str | None, type_name: str) -> list[str]: - if not text: + text_str = str(text or "") + type_name_str = str(type_name or "") + if not text_str or not type_name_str: return [] - match = re.search(rf"export\s+type\s+{re.escape(type_name)}\s*=\s*([^;]+);", text, re.S) + match = re.search(rf"export\s+type\s+{re.escape(type_name_str)}\s*=\s*([^;]+);", text_str, re.S) if not match: return [] return re.findall(r"'([^']+)'|\"([^\"]+)\"", match.group(1)) def clean_ts_union(values: list[tuple[str, str]] | list[str]) -> list[str]: + if not isinstance(values, (list, tuple, set)): + return [] cleaned: list[str] = [] for value in values: - if isinstance(value, tuple): - item = next((part for part in value if part), "") + if isinstance(value, (tuple, list, set)): + item = next((str(part) for part in value if part), "") else: - item = value + item = str(value or "") if item and item not in cleaned: cleaned.append(item) return cleaned @@ -3579,25 +3652,28 @@ def parse_ts_union_values(text: str | None, type_name: str) -> list[str]: def ts_function_body(text: str | None, function_name: str) -> str: - if not text: + text_str = str(text or "") + function_name_str = str(function_name or "") + if not text_str or not function_name_str: return "" match = re.search( - rf"export\s+function\s+{re.escape(function_name)}\s*\([^)]*\)[^{{]*{{(?P.*?)\n}}", - text, + rf"export\s+function\s+{re.escape(function_name_str)}\s*\([^)]*\)[^{{]*{{(?P.*?)\n}}", + text_str, re.S, ) return match.group("body") if match else "" def ts_allowed_profiles(text: str | None, function_name: str, profiles: list[str]) -> list[str]: + profiles_list = list(profiles) if isinstance(profiles, (list, tuple, set)) else [] body = ts_function_body(text, function_name) if not body: return [] denied = set(re.findall(r"profile\s*!==\s*'([^']+)'", body)) if denied: - return [profile for profile in profiles if profile not in denied] + return [profile for profile in profiles_list if profile not in denied] allowed = re.findall(r"profile\s*===\s*'([^']+)'", body) - return [profile for profile in profiles if profile in set(allowed)] + return [profile for profile in profiles_list if profile in set(allowed)] def parse_ts_access_levels(text: str | None) -> dict[str, int]: @@ -3613,6 +3689,7 @@ def parse_ts_access_levels(text: str | None) -> dict[str, int]: def inspect_cli_access_source(path: Path) -> dict[str, Any]: + path = Path(path) text = read_text_or_none(path) lower_profiles = literal_assignment(text, "LOWER_ACCESS_PROFILES") profiles = [] @@ -3646,6 +3723,7 @@ def inspect_cli_access_source(path: Path) -> dict[str, Any]: def inspect_cli_capability_source(path: Path) -> dict[str, Any]: + path = Path(path) text = read_text_or_none(path) toxic_pairs = literal_assignment(text, "TOXIC_CAPABILITY_PAIRS") dimensions = [] @@ -3668,6 +3746,7 @@ def inspect_cli_capability_source(path: Path) -> dict[str, Any]: def inspect_telegram_access_source(path: Path) -> dict[str, Any]: + path = Path(path) text = read_text_or_none(path) profiles = parse_ts_union_values(text, "SparkAccessProfile") requirements = parse_ts_union_values(text, "SparkAccessRequirement") @@ -3692,19 +3771,21 @@ def inspect_telegram_access_source(path: Path) -> dict[str, Any]: def extract_js_object_block(text: str | None, marker: str) -> str: - if not text: + text_str = str(text or "") + marker_str = str(marker or "") + if not text_str or not marker_str: return "" - marker_index = text.find(marker) + marker_index = text_str.find(marker_str) if marker_index < 0: return "" - start = text.find("{", marker_index) + start = text_str.find("{", marker_index) if start < 0: return "" depth = 0 quote: str | None = None escaped = False - for index in range(start, len(text)): - char = text[index] + for index in range(start, len(text_str)): + char = text_str[index] if quote: if escaped: escaped = False @@ -3723,11 +3804,12 @@ def extract_js_object_block(text: str | None, marker: str) -> str: elif char == "}": depth -= 1 if depth == 0: - return text[start + 1 : index] + return text_str[start + 1 : index] return "" def inspect_spawner_access_sources(root: Path) -> dict[str, Any]: + root = Path(root) lanes_path = root / "src" / "lib" / "server" / "access-execution-lanes.ts" actions_path = root / "src" / "lib" / "server" / "access-execution-actions.ts" high_agency_path = root / "src" / "lib" / "server" / "high-agency-workers.ts" @@ -3776,15 +3858,18 @@ def inspect_spawner_access_sources(root: Path) -> dict[str, Any]: def js_const_object_values(text: str | None, object_name: str) -> dict[str, str]: - if not text: + text_str = str(text or "") + object_name_str = str(object_name or "") + if not text_str or not object_name_str: return {} - match = re.search(rf"export\s+const\s+{re.escape(object_name)}\s*=\s*{{(?P.*?)\n}};", text, re.S) + match = re.search(rf"export\s+const\s+{re.escape(object_name_str)}\s*=\s*{{(?P.*?)\n}};", text_str, re.S) if not match: return {} return {key: value for key, value in re.findall(r"(\w+):\s*['\"]([^'\"]+)['\"]", match.group("body"))} def inspect_browser_authority(root: Path) -> dict[str, Any]: + root = Path(root) constants_path = root / "src" / "protocol" / "constants.js" policy_path = root / "src" / "protocol" / "policy.js" contract_path = root / "docs" / "BROWSER_HOOK_CONTRACT_V1.md" @@ -3814,6 +3899,7 @@ def inspect_browser_authority(root: Path) -> dict[str, Any]: def inspect_public_output_authority(desktop: Path) -> dict[str, Any]: + desktop = Path(desktop) swarm_root = desktop / "spark-swarm" labs_root = desktop / "spark-domain-chip-labs" sync_validation_path = swarm_root / "apps" / "api" / "src" / "collective" / "sync-validation.ts" @@ -3850,6 +3936,9 @@ def inspect_public_output_authority(desktop: Path) -> dict[str, Any]: def build_authority_view(desktop: Path, setup_summary: dict[str, Any], spark_home: Path | None = None) -> dict[str, Any]: + desktop = Path(desktop) + setup_summary = setup_summary if isinstance(setup_summary, dict) else {} + spark_home = Path(spark_home) if spark_home is not None else None module_sources = spark_home / "modules" if spark_home is not None else None spark_cli_package_root = Path(__file__).resolve().parent spark_cli_repo_root = spark_cli_package_root.parent.parent @@ -3917,7 +4006,10 @@ def resolve_repo_root(repo_name: str) -> Path: return { "schema_version": AUTHORITY_VIEW_SCHEMA, "generated_at": utc_now(), - "authority": "observability_non_authoritative", + "redaction": ( + "policy constants, safe command labels, source existence, and aggregate gate counts only; " + "env files, profile preference files, token values, chat ids, raw mission text, and browser content are not read" + ), "observed_sources": observed_sources, "default_access_level_hint": cli_access.get("default_access_level"), "telegram_profile_count": access_profile_count, @@ -3939,10 +4031,6 @@ def resolve_repo_root(repo_name: str) -> Path: ), "publication_checks_required": len(as_list(public_output_authority.get("required_publication_checks"))), }, - "redaction": ( - "policy constants, safe command labels, source existence, and aggregate gate counts only; " - "env files, profile preference files, token values, chat ids, raw mission text, and browser content are not read" - ), "next_required_bridges": [ "Promote this compiled AuthorityViewV1 into Builder AOC as evidence, not policy authority.", "Point Telegram access/status replies at this view for compact drilldowns without raw ids.", @@ -3959,14 +4047,16 @@ def trace_repair_id(*parts: Any) -> str: def trace_repair_owner(component: str) -> dict[str, str]: - return as_dict(TRACE_REPAIR_COMPONENT_OWNERS.get(component)) or { + comp = str(component or "") + return as_dict(TRACE_REPAIR_COMPONENT_OWNERS.get(comp)) or { "owner_repo": "spark-intelligence-builder", - "source_module": f"{component} event emission", + "source_module": f"{comp} event emission", } def build_trace_current_health(trace_index: dict[str, Any]) -> dict[str, Any]: - trace_health = as_dict(trace_index.get("builder_trace_health")) + trace_idx = trace_index if isinstance(trace_index, dict) else {} + trace_health = as_dict(trace_idx.get("builder_trace_health")) recent_windows = [as_dict(row) for row in as_list(trace_health.get("recent_windows"))] total_missing = int(trace_health.get("missing_trace_ref_count") or 0) current_window = next( @@ -4024,13 +4114,14 @@ def build_trace_current_health(trace_index: dict[str, Any]) -> dict[str, Any]: def build_trace_repair_queue(trace_index: dict[str, Any]) -> list[dict[str, Any]]: + trace_idx = trace_index if isinstance(trace_index, dict) else {} queue: list[dict[str, Any]] = [] - trace_health = as_dict(trace_index.get("builder_trace_health")) - current_health = as_dict(trace_index.get("trace_current_health")) or build_trace_current_health(trace_index) + trace_health = as_dict(trace_idx.get("builder_trace_health")) + current_health = as_dict(trace_idx.get("trace_current_health")) or build_trace_current_health(trace_idx) historical_scope = str(current_health.get("repair_scope") or "") == "historical_backlog" - telegram_gate = as_dict(trace_index.get("telegram_final_answer_gate_samples")) + telegram_gate = as_dict(trace_idx.get("telegram_final_answer_gate_samples")) telegram_join = as_dict(telegram_gate.get("trace_join")) - spawner = as_dict(trace_index.get("spawner_prd_auto_trace_samples")) + spawner = as_dict(trace_idx.get("spawner_prd_auto_trace_samples")) spawner_join = as_dict(spawner.get("join_keys")) spawner_request_overlap = as_dict(spawner.get("builder_request_overlap")) spawner_trace_overlap = as_dict(spawner.get("builder_trace_ref_overlap")) @@ -4147,9 +4238,10 @@ def build_trace_repair_queue(trace_index: dict[str, Any]) -> list[dict[str, Any] def build_builder_trace_repair_cards(trace_index: dict[str, Any]) -> dict[str, Any]: - trace_health = as_dict(trace_index.get("builder_trace_health")) - current_health = as_dict(trace_index.get("trace_current_health")) or build_trace_current_health(trace_index) - repair_queue = [as_dict(item) for item in as_list(trace_index.get("trace_repair_queue"))] + trace_idx = trace_index if isinstance(trace_index, dict) else {} + trace_health = as_dict(trace_idx.get("builder_trace_health")) + current_health = as_dict(trace_idx.get("trace_current_health")) or build_trace_current_health(trace_idx) + repair_queue = [as_dict(item) for item in as_list(trace_idx.get("trace_repair_queue"))] cards: list[dict[str, Any]] = [] for item in repair_queue: @@ -4290,6 +4382,8 @@ def build_builder_trace_repair_cards(trace_index: dict[str, Any]) -> dict[str, A def build_trace_index(spark_home: Path, builder_home: Path) -> dict[str, Any]: + spark_home = Path(spark_home) + builder_home = Path(builder_home) spawner_state = spark_home / "state" / "spawner-ui" telegram_state = spark_home / "state" / "spark-telegram-bot" trace_index = { @@ -4333,6 +4427,7 @@ def build_trace_index(spark_home: Path, builder_home: Path) -> dict[str, Any]: def build_memory_movement_index(builder_home: Path) -> dict[str, Any]: + builder_home = Path(builder_home) builder_memory_tables = inspect_builder_memory_tables(builder_home) trace_join = as_dict(builder_memory_tables.get("memory_lane_trace_join")) trace_bridge_instruction = ( @@ -4364,13 +4459,14 @@ def build_memory_movement_index(builder_home: Path) -> dict[str, Any]: def build_gaps(system_map: dict[str, Any]) -> list[dict[str, str]]: - registry_modules = set(as_dict(system_map.get("registry", {}).get("modules")).keys()) - installed_modules = set(as_dict(system_map.get("installed_modules")).keys()) - repos = as_list(system_map.get("discovered_repos")) + sys_map = system_map if isinstance(system_map, dict) else {} + registry_modules = set(as_dict(sys_map.get("registry", {}).get("modules")).keys()) + installed_modules = set(as_dict(sys_map.get("installed_modules")).keys()) + repos = as_list(sys_map.get("discovered_repos")) raw_gaps: list[dict[str, str]] = [] def add_gap(severity: str, area: str, item: str, message: str) -> None: - raw_gaps.append({"severity": severity, "area": area, "item": item, "message": message}) + raw_gaps.append({"severity": str(severity), "area": str(area), "item": str(item), "message": str(message)}) for module_id in sorted(registry_modules - installed_modules): add_gap("info", "install", module_id, "Registry module is not installed in the current local Spark home.") @@ -4400,7 +4496,7 @@ def add_gap(severity: str, area: str, item: str, message: str) -> None: "Repo declares spark-chip.json but is not in installed state or starter registry.", ) - for module in as_list(system_map.get("modules")): + for module in as_list(sys_map.get("modules")): installed = as_dict(module.get("installed")) if installed and installed.get("path") and not Path(str(installed.get("path"))).exists(): add_gap("warning", "install", str(module.get("id")), "Installed module path does not exist.") @@ -4420,45 +4516,50 @@ def add_gap(severity: str, area: str, item: str, message: str) -> None: def repo_owner_surface(name: str) -> str: - if name in OWNER_SURFACES: - return OWNER_SURFACES[name] - if name.startswith("domain-chip-"): + name_str = str(name or "") + if name_str in OWNER_SURFACES: + return OWNER_SURFACES[name_str] + if name_str.startswith("domain-chip-"): return "domain chip candidate" - if name.startswith("specialization-path-"): + if name_str.startswith("specialization-path-"): return "specialization path candidate" - if "telegram" in name: + if "telegram" in name_str: return "Telegram-adjacent surface" - if "swarm" in name: + if "swarm" in name_str: return "Swarm-adjacent surface" - if "spark" in name: + if "spark" in name_str: return "Spark-adjacent repo" return "unclassified" def repo_manifest_presence(repo: dict[str, Any]) -> dict[str, bool]: - contract_files = set(as_list(repo.get("contract_files"))) + repo_dict = repo if isinstance(repo, dict) else {} + contract_files = set(as_list(repo_dict.get("contract_files"))) return { - "spark_toml": bool(as_dict(repo.get("spark_toml"))), - "spark_chip": bool(as_dict(repo.get("spark_chip"))), - "skill_manifest": bool(as_dict(repo.get("skill_manifest"))), + "spark_toml": bool(as_dict(repo_dict.get("spark_toml"))), + "spark_chip": bool(as_dict(repo_dict.get("spark_chip"))), + "skill_manifest": bool(as_dict(repo_dict.get("skill_manifest"))), "agents_md": "AGENTS.md" in contract_files, "contract_file_count": bool(contract_files), } def repo_release_status(name: str, git: dict[str, Any], manifest: dict[str, bool], registry_present: bool) -> tuple[str, str | None, str]: - dirty = int(git.get("dirty_tracked_count") or 0) - untracked = int(git.get("untracked_count") or 0) - behind = int(git.get("behind") or 0) - if not git.get("available"): + git_dict = git if isinstance(git, dict) else {} + manifest_dict = manifest if isinstance(manifest, dict) else {} + name_str = str(name or "") + dirty = int(git_dict.get("dirty_tracked_count") or 0) + untracked = int(git_dict.get("untracked_count") or 0) + behind = int(git_dict.get("behind") or 0) + if not git_dict.get("available"): return "not_release_candidate", "not a git repo", "inspect or ignore before product work" if dirty or untracked: return "blocked", "dirty worktree", "curate local changes before merge or release" if behind: return "blocked", "behind upstream", "pull or merge upstream before release" - if name in CORE_REPOS and not any(manifest.values()): + if name_str in CORE_REPOS and not any(manifest_dict.values()): return "blocked", "core repo missing Spark manifest", "add or confirm owner manifest before release" - if name == "spark-cli" and any(manifest.values()): + if name_str == "spark-cli" and any(manifest_dict.values()): return "eligible", None, "installer and Spark OS compiler source truth is manifest-declared" if registry_present: return "eligible", None, "safe to consider for the next verified workstream" @@ -4466,19 +4567,23 @@ def repo_release_status(name: str, git: dict[str, Any], manifest: dict[str, bool def repo_risk_class(name: str, release_eligibility: str) -> str: - if name in {"spark-cli", "spark-intelligence-builder", "spark-telegram-bot", "spawner-ui"}: + name_str = str(name or "") + eligibility = str(release_eligibility or "") + if name_str in {"spark-cli", "spark-intelligence-builder", "spark-telegram-bot", "spawner-ui"}: return "critical" - if release_eligibility == "blocked": + if eligibility == "blocked": return "high" - if name in CORE_REPOS: + if name_str in CORE_REPOS: return "medium" return "low" def repo_by_name(system_map: dict[str, Any], name: str) -> dict[str, Any]: - for repo in as_list(system_map.get("discovered_repos")): + sys_map = system_map if isinstance(system_map, dict) else {} + name_str = str(name or "") + for repo in as_list(sys_map.get("discovered_repos")): repo = as_dict(repo) - if repo.get("name") == name: + if repo.get("name") == name_str: return repo return {} @@ -4500,18 +4605,18 @@ def duplicate_truth_item( evidence_details: dict[str, Any] | None = None, ) -> dict[str, Any]: item = { - "id": item_id, - "fact": fact, - "classification": classification, - "severity": severity, - "owner_repo": owner_repo, - "canonical_path": canonical_path, - "duplicate_path": duplicate_path, - "evidence": evidence, - "risk": risk, - "next_safe_action": next_safe_action, - "verification_command": verification_command, - "rollback": rollback, + "id": str(item_id or ""), + "fact": str(fact or ""), + "classification": str(classification or ""), + "severity": str(severity or "warning"), + "owner_repo": str(owner_repo or ""), + "canonical_path": str(canonical_path or ""), + "duplicate_path": str(duplicate_path or ""), + "evidence": str(evidence or ""), + "risk": str(risk or ""), + "next_safe_action": str(next_safe_action or ""), + "verification_command": str(verification_command or ""), + "rollback": str(rollback or ""), } if evidence_details is not None: item["evidence_details"] = evidence_details @@ -4529,7 +4634,7 @@ def duplicate_truth_item( def dirty_family_for_path(path_value: str) -> str: - normalized = path_value.replace("\\", "/").strip() + normalized = str(path_value or "").replace("\\", "/").strip() if " -> " in normalized: normalized = normalized.split(" -> ", 1)[1].strip() parts = [part for part in normalized.split("/") if part] @@ -4543,6 +4648,7 @@ def dirty_family_for_path(path_value: str) -> str: def git_dirty_family_counts(path: Path) -> dict[str, int]: + path = Path(path) if not (path / ".git").exists(): return {} code, status = run_git(path, ["status", "--porcelain"]) @@ -4557,6 +4663,7 @@ def git_dirty_family_counts(path: Path) -> dict[str, int]: def builder_source_audit(path: Path) -> dict[str, Any]: + path = Path(path) git = git_board_status(path) cli_path = path / "src" / "spark_intelligence" / "cli.py" command_markers: dict[str, bool] = {name: False for name in BUILDER_AOC_COMMAND_MARKERS} @@ -4589,6 +4696,7 @@ def builder_source_audit(path: Path) -> dict[str, Any]: def spawner_state_source_audit(path: Path) -> dict[str, Any]: + path = Path(path) reference_needles = (".spawner", "SPAWNER_STATE_DIR", "spawnerStateDir", "spawner-state") family_counts: Counter[str] = Counter() file_count = 0 @@ -4641,12 +4749,13 @@ def spawner_state_source_audit(path: Path) -> dict[str, Any]: def git_dirty_from_repo(repo: dict[str, Any]) -> tuple[int, int]: - git = as_dict(repo.get("git")) + repo_dict = repo if isinstance(repo, dict) else {} + git = as_dict(repo_dict.get("git")) dirty = int(git.get("dirty_tracked_count") or 0) untracked = int(git.get("untracked_count") or 0) if dirty or untracked: return dirty, untracked - path = repo.get("path") + path = repo_dict.get("path") if isinstance(path, str) and path.strip(): status = git_board_status(Path(path)) return int(status.get("dirty_tracked_count") or 0), int(status.get("untracked_count") or 0) @@ -4675,11 +4784,12 @@ def installed_runtime_clean_summary(installed_modules: dict[str, Any], module_id def build_duplicate_truths(system_map: dict[str, Any]) -> dict[str, Any]: - source_roots = as_dict(system_map.get("source_roots")) + sys_map = system_map if isinstance(system_map, dict) else {} + source_roots = as_dict(sys_map.get("source_roots")) spark_home = Path(str(source_roots.get("spark_home") or "")).expanduser() desktop = Path(str(source_roots.get("desktop") or "")).expanduser() - installed_modules = as_dict(system_map.get("installed_modules")) - registry_modules = as_dict(as_dict(system_map.get("registry")).get("modules")) + installed_modules = as_dict(sys_map.get("installed_modules")) + registry_modules = as_dict(as_dict(sys_map.get("registry")).get("modules")) items: list[dict[str, Any]] = [] builder_installed = as_dict(installed_modules.get("spark-intelligence-builder")) @@ -4813,7 +4923,7 @@ def build_duplicate_truths(system_map: dict[str, Any]) -> dict[str, Any]: ("domain-chip-memory", "Memory substrate owner repo curation state", "domain-chip-memory", "owner_repo_dirty"), ("spark-memory-quality-dashboard", "Memory dashboard projection state", "spark-memory-quality-dashboard", "projection_dirty"), ]: - repo = repo_by_name(system_map, repo_name) + repo = repo_by_name(sys_map, repo_name) if not repo: continue dirty, untracked = git_dirty_from_repo(repo) @@ -4950,7 +5060,7 @@ def build_duplicate_truths(system_map: dict[str, Any]) -> dict[str, Any]: ) ) - browser_extension = repo_by_name(system_map, "spark-browser-extension") + browser_extension = repo_by_name(sys_map, "spark-browser-extension") if browser_extension: items.append( duplicate_truth_item( @@ -5004,11 +5114,12 @@ def build_duplicate_truths(system_map: dict[str, Any]) -> dict[str, Any]: def build_repo_board(system_map: dict[str, Any]) -> dict[str, Any]: - registry_modules = set(as_dict(system_map.get("registry", {}).get("modules")).keys()) - installed_modules = set(as_dict(system_map.get("installed_modules")).keys()) + sys_map = system_map if isinstance(system_map, dict) else {} + registry_modules = set(as_dict(as_dict(sys_map.get("registry")).get("modules")).keys()) + installed_modules = set(as_dict(sys_map.get("installed_modules")).keys()) rows: list[dict[str, Any]] = [] - for repo in as_list(system_map.get("discovered_repos")): + for repo in as_list(sys_map.get("discovered_repos")): repo = as_dict(repo) name = str(repo.get("name") or "") ids = repo_ids(repo) @@ -5048,7 +5159,7 @@ def build_repo_board(system_map: dict[str, Any]) -> dict[str, Any]: "blocked_release_count": sum(1 for row in rows if row["release_eligibility"] == "blocked"), "critical_repo_count": sum(1 for row in rows if row["risk_class"] == "critical"), } - duplicate_truths = build_duplicate_truths(system_map) + duplicate_truths = build_duplicate_truths(sys_map) summary["duplicate_truth_count"] = as_dict(duplicate_truths.get("summary")).get("item_count", 0) summary["critical_duplicate_truth_count"] = as_dict( as_dict(duplicate_truths.get("summary")).get("severity_counts") @@ -5082,17 +5193,18 @@ def build_repo_board(system_map: dict[str, Any]) -> dict[str, Any]: def build_voice_surface_view(system_map: dict[str, Any]) -> dict[str, Any]: - repos = [as_dict(repo) for repo in as_list(system_map.get("discovered_repos"))] + sys_map = system_map if isinstance(system_map, dict) else {} + repos = [as_dict(repo) for repo in as_list(sys_map.get("discovered_repos"))] repo_names = {str(repo.get("name")) for repo in repos} repo_paths = { str(repo.get("name")): Path(str(repo.get("path"))).expanduser() for repo in repos if isinstance(repo.get("path"), str) and str(repo.get("path")).strip() } - installed_modules = set(as_dict(system_map.get("installed_modules")).keys()) + installed_modules = set(as_dict(sys_map.get("installed_modules")).keys()) available = "spark-voice-comms" in repo_names installed = "spark-voice-comms" in installed_modules - source_roots = as_dict(system_map.get("source_roots")) + source_roots = as_dict(sys_map.get("source_roots")) runtime_state_error = "spark_home_missing" runtime_state: dict[str, Any] = {} @@ -5103,21 +5215,23 @@ def build_voice_surface_view(system_map: dict[str, Any]) -> dict[str, Any]: if isinstance(runtime_state_raw, dict): runtime_state = runtime_state_raw - runtime_state_export_present = runtime_state.get("schema_version") == "spark.voice_runtime_state.v1" + runtime_state_export_present = as_dict(runtime_state).get("schema_version") == "spark.voice_runtime_state.v1" if runtime_state and not runtime_state_export_present: runtime_state_error = "invalid_schema" - runtime_stt = as_dict(runtime_state.get("stt")) if runtime_state_export_present else {} - runtime_tts = as_dict(runtime_state.get("tts")) if runtime_state_export_present else {} - runtime_delivery = as_dict(runtime_state.get("telegram_delivery")) if runtime_state_export_present else {} - runtime_claims = as_dict(runtime_state.get("claim_levels")) if runtime_state_export_present else {} - runtime_sources = [str(item) for item in as_list(runtime_state.get("source_ledger"))] if runtime_state_export_present else [] + runtime_stt = as_dict(as_dict(runtime_state).get("stt")) if runtime_state_export_present else {} + runtime_tts = as_dict(as_dict(runtime_state).get("tts")) if runtime_state_export_present else {} + runtime_delivery = as_dict(as_dict(runtime_state).get("telegram_delivery")) if runtime_state_export_present else {} + runtime_claims = as_dict(as_dict(runtime_state).get("claim_levels")) if runtime_state_export_present else {} + runtime_sources = [str(item) for item in as_list(as_dict(runtime_state).get("source_ledger"))] if runtime_state_export_present else [] stt_ready = runtime_stt.get("ready") is True tts_ready = runtime_tts.get("ready") is True delivery_ready = runtime_delivery.get("ready") is True configured = runtime_claims.get("configured") is True or stt_ready or tts_ready def source_file_contains(repo_name: str, relative: str, *needles: str) -> bool: + repo_name = str(repo_name or "") + relative = str(relative or "") root = repo_paths.get(repo_name) if root is None: return False @@ -5128,7 +5242,7 @@ def source_file_contains(repo_name: str, relative: str, *needles: str) -> bool: text = path.read_text(encoding="utf-8", errors="replace") except OSError: return False - return all(needle in text for needle in needles) + return all(str(needle) in text for needle in needles) voice_hook_has_transcribe = source_file_contains( "spark-voice-comms", @@ -5290,6 +5404,7 @@ def source_file_contains(repo_name: str, relative: str, *needles: str) -> bool: def build_operating_cockpit(compiled: dict[str, Any]) -> dict[str, Any]: + compiled = compiled if isinstance(compiled, dict) else {} system_map = as_dict(compiled.get("system_map")) repo_board = as_dict(compiled.get("repo_board")) trace_index = as_dict(compiled.get("trace_index")) @@ -5369,6 +5484,9 @@ def build_operating_cockpit(compiled: dict[str, Any]) -> dict[str, Any]: def compile_system_map(desktop: Path, spark_home: Path, registry_path: Path) -> dict[str, Any]: + desktop = Path(desktop) + spark_home = Path(spark_home) + registry_path = Path(registry_path) state_dir = spark_home / "state" registry, registry_error = read_json(registry_path) installed, installed_error = read_json(state_dir / "installed.json") @@ -5423,11 +5541,14 @@ def compile_system_map(desktop: Path, spark_home: Path, registry_path: Path) -> def write_json(path: Path, payload: Any) -> None: + path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8") def write_gaps_markdown(path: Path, gaps: list[dict[str, str]], system_map: dict[str, Any]) -> None: + path = Path(path) + system_map = system_map if isinstance(system_map, dict) else {} lines = [ "# Spark System Map Gaps", "", @@ -5439,18 +5560,20 @@ def write_gaps_markdown(path: Path, gaps: list[dict[str, str]], system_map: dict "", f"- modules compiled: {len(as_list(system_map.get('modules')))}", f"- discovered repos: {len(as_list(system_map.get('discovered_repos')))}", - f"- gaps: {len(gaps)}", + f"- gaps: {len(as_list(gaps))}", "", "## Gaps", "", ] - if not gaps: + gaps_list = as_list(gaps) + if not gaps_list: lines.append("- No gaps detected by this compiler pass.") else: - for gap in gaps: - count = int(gap.get("count", "1")) + for gap in gaps_list: + gap_dict = as_dict(gap) + count = int(gap_dict.get("count", "1")) suffix = f" Observed {count} times." if count > 1 else "" - lines.append(f"- [{gap['severity']}] {gap['area']} / {gap['item']}: {gap['message']}{suffix}") + lines.append(f"- [{gap_dict.get('severity')}] {gap_dict.get('area')} / {gap_dict.get('item')}: {gap_dict.get('message')}{suffix}") lines.extend( [ "", @@ -5468,7 +5591,9 @@ def write_gaps_markdown(path: Path, gaps: list[dict[str, str]], system_map: dict def write_compiled_outputs(out_dir: Path, compiled: dict[str, Any]) -> dict[str, str]: - system_map = as_dict(compiled["system_map"]) + out_dir = Path(out_dir) + compiled = compiled if isinstance(compiled, dict) else {} + system_map = as_dict(compiled.get("system_map")) paths = { "system_map": out_dir / "system-map.json", "authority_view": out_dir / "authority-view.json", @@ -5481,22 +5606,24 @@ def write_compiled_outputs(out_dir: Path, compiled: dict[str, Any]) -> dict[str, "gaps": out_dir / "gaps.md", } write_json(paths["system_map"], system_map) - write_json(paths["authority_view"], compiled["authority_view"]) - write_json(paths["capability_catalog"], compiled["capability_catalog"]) - write_json(paths["trace_index"], compiled["trace_index"]) - write_json(paths["memory_movement_index"], compiled["memory_movement_index"]) - write_json(paths["repo_board"], compiled["repo_board"]) - write_json(paths["voice_surface_view"], compiled["voice_surface_view"]) - write_json(paths["operating_cockpit"], compiled["operating_cockpit"]) + write_json(paths["authority_view"], compiled.get("authority_view")) + write_json(paths["capability_catalog"], compiled.get("capability_catalog")) + write_json(paths["trace_index"], compiled.get("trace_index")) + write_json(paths["memory_movement_index"], compiled.get("memory_movement_index")) + write_json(paths["repo_board"], compiled.get("repo_board")) + write_json(paths["voice_surface_view"], compiled.get("voice_surface_view")) + write_json(paths["operating_cockpit"], compiled.get("operating_cockpit")) write_gaps_markdown(paths["gaps"], as_list(system_map.get("gaps")), system_map) return {key: str(path) for key, path in paths.items()} def compile_summary(compiled: dict[str, Any], written: dict[str, str] | None = None) -> dict[str, Any]: - system_map = as_dict(compiled["system_map"]) - capability_catalog = as_dict(compiled["capability_catalog"]) - trace_index = as_dict(compiled["trace_index"]) - memory_index = as_dict(compiled["memory_movement_index"]) + compiled = compiled if isinstance(compiled, dict) else {} + written = written if isinstance(written, dict) else {} + system_map = as_dict(compiled.get("system_map")) + capability_catalog = as_dict(compiled.get("capability_catalog")) + trace_index = as_dict(compiled.get("trace_index")) + memory_index = as_dict(compiled.get("memory_movement_index")) repo_board = as_dict(compiled.get("repo_board")) voice_surface = as_dict(compiled.get("voice_surface_view")) duplicate_truths = as_dict(repo_board.get("duplicate_truths")) @@ -5520,7 +5647,7 @@ def compile_summary(compiled: dict[str, Any], written: dict[str, str] | None = N "capability_cards": len(as_list(capability_catalog.get("capability_cards"))), "authority_sources": { key: as_dict(value).get("exists") - for key, value in as_dict(as_dict(compiled["authority_view"]).get("observed_sources")).items() + for key, value in as_dict(as_dict(compiled.get("authority_view")).get("observed_sources")).items() }, "builder_event_rows": builder_events.get("row_count"), "builder_event_samples": builder_event_samples.get("sample_count"),