diff --git a/execution_context.py b/execution_context.py new file mode 100644 index 00000000..faee3414 --- /dev/null +++ b/execution_context.py @@ -0,0 +1,284 @@ +#!/usr/bin/env python3 +""" +Execution Context for Orthogonal Engineering. + +Provides a deterministic, content-addressed execution pipeline with: +- Canonical content normalization +- Merkle-root based state hashing +- Audit logging with UUIDs +- Execute/Simulate modes +- Invariant verification and self-repair + +Author: Orthogonal Engineering +PR: #64 +Version: 1.0.0 +""" + +import hashlib +import json +import uuid +from copy import deepcopy + + +# --------------------------------------------------------------------------- +# Execution Context +# --------------------------------------------------------------------------- + +class ExecutionContext: + """ + Holds the mutable state for a single deterministic execution run. + + Attributes: + seed: Opaque seed string used to tag this run. + manifest: Dict containing at least ``root_hash`` (expected Merkle root). + commands: Ordered list of command dicts to be applied. + repo_state: Dict mapping file paths to their current content. + current_hash: Most-recently computed Merkle root (str or None). + audit_log: Ordered list of operation records written by log_operation. + """ + + def __init__(self, seed: str, manifest: dict, commands: list): + self.seed = seed + self.manifest = manifest + self.commands = commands + self.repo_state: dict = {} + self.current_hash: str | None = None + self.audit_log: list = [] + + +# --------------------------------------------------------------------------- +# Canonicalization +# --------------------------------------------------------------------------- + +def sort_json_keys(obj): + """Recursively sort dictionary keys for canonical JSON serialization.""" + if isinstance(obj, dict): + return {k: sort_json_keys(obj[k]) for k in sorted(obj.keys())} + if isinstance(obj, list): + return [sort_json_keys(e) for e in obj] + return obj + + +def canonicalize(content: str) -> bytes: + """ + Canonicalize *content* to a stable byte representation. + + Steps: + 1. Normalize line endings to ``\\n``. + 2. Strip trailing whitespace from every line. + 3. If the result is valid JSON, re-serialize with sorted keys and no + extra whitespace. + 4. Return the result encoded as UTF-8 bytes. + + Args: + content: Raw text content of any file. + + Returns: + Canonical UTF-8 bytes. + """ + content = content.replace('\r\n', '\n').replace('\r', '\n') + content = '\n'.join(line.rstrip() for line in content.split('\n')) + try: + data = json.loads(content) + content = json.dumps(sort_json_keys(data), separators=(',', ':')) + except json.JSONDecodeError: + pass + return content.encode('utf-8') + + +# --------------------------------------------------------------------------- +# Hashing +# --------------------------------------------------------------------------- + +def sha256(data: bytes) -> str: + """Return the hex-encoded SHA-256 digest of *data*.""" + return hashlib.sha256(data).hexdigest() + + +def merkle_root(files: dict) -> str: + """ + Compute a binary Merkle root over the values of *files*. + + Leaves are SHA-256 hashes of the canonical form of each file's content, + taken in insertion order. Odd nodes at any level are duplicated (Bitcoin + convention). An empty dict returns the SHA-256 of the empty byte string. + + Args: + files: Dict mapping file paths to their text content. + + Returns: + Hex-encoded SHA-256 Merkle root. + """ + if not files: + return sha256(b'') + leaves = [sha256(canonicalize(content)) for content in files.values()] + while len(leaves) > 1: + next_level = [] + for i in range(0, len(leaves), 2): + left = leaves[i] + right = leaves[i + 1] if i + 1 < len(leaves) else left + next_level.append(sha256((left + right).encode())) + leaves = next_level + return leaves[0] + + +# --------------------------------------------------------------------------- +# Audit Logging +# --------------------------------------------------------------------------- + +def log_operation(ctx: ExecutionContext, command: dict, mode: str, output_hash: str) -> None: + """ + Append one operation record to *ctx.audit_log*. + + Each record carries a fresh UUID so that replays can be distinguished even + when the command content is identical. + + Args: + ctx: Active execution context. + command: The command dict that was (or was not) applied. + mode: ``"MODE_1_EXECUTE"`` or ``"MODE_2_SIMULATE"``. + output_hash: Merkle root of ``ctx.repo_state`` after the operation. + """ + ctx.audit_log.append({ + "operation_id": str(uuid.uuid4()), + "mode": mode, + "command": command, + "output_hash": output_hash, + "metadata": {}, + }) + + +# --------------------------------------------------------------------------- +# Modes +# --------------------------------------------------------------------------- + +def execute_command(ctx: ExecutionContext, command: dict, mode: str) -> None: + """ + Apply *command* to *ctx* according to *mode*, then log the operation. + + ``MODE_1_EXECUTE`` — writes ``command['content']`` to ``command['file']`` + in ``ctx.repo_state``. + ``MODE_2_SIMULATE`` — performs no state mutation; only logs the operation. + + Args: + ctx: Active execution context. + command: Dict with at least ``'file'`` and ``'content'`` keys. + mode: Execution mode string. + """ + if mode == "MODE_1_EXECUTE": + ctx.repo_state[command['file']] = command['content'] + # MODE_2_SIMULATE: intentionally no state mutation + output_hash = merkle_root(ctx.repo_state) + ctx.current_hash = output_hash + log_operation(ctx, command, mode, output_hash) + + +# --------------------------------------------------------------------------- +# Verification & Repair +# --------------------------------------------------------------------------- + +MAX_REPAIR_ATTEMPTS = 3 + + +def verify_invariants(ctx: ExecutionContext) -> bool: + """ + Return ``True`` iff the current Merkle root matches the manifest. + + Args: + ctx: Active execution context. + + Returns: + True if ``merkle_root(ctx.repo_state) == ctx.manifest['root_hash']``. + """ + return merkle_root(ctx.repo_state) == ctx.manifest['root_hash'] + + +def canonicalize_repo(ctx: ExecutionContext) -> dict: + """ + Re-canonicalize every file in *ctx.repo_state* in place. + + Args: + ctx: Active execution context (mutated). + + Returns: + The updated ``ctx.repo_state``. + """ + new_state = {} + for k, v in ctx.repo_state.items(): + new_state[k] = canonicalize(v).decode('utf-8') + ctx.repo_state = new_state + return ctx.repo_state + + +def enter_mode_0_halt(ctx: ExecutionContext) -> bool: + """ + Enter MODE_0_HALT — irrecoverable integrity failure. + + Args: + ctx: Active execution context (unused but kept for future extension). + + Returns: + Always ``False``. + """ + return False + + +def integrity_loop(ctx: ExecutionContext) -> bool: + """ + Verify invariants, attempting up to MAX_REPAIR_ATTEMPTS canonicalization + passes before entering MODE_0_HALT. + + Args: + ctx: Active execution context. + + Returns: + True if invariants are satisfied; False if all repair attempts fail. + """ + attempts = 0 + while attempts < MAX_REPAIR_ATTEMPTS: + if verify_invariants(ctx): + return True + canonicalize_repo(ctx) + attempts += 1 + return enter_mode_0_halt(ctx) + + +# --------------------------------------------------------------------------- +# Test Harness +# --------------------------------------------------------------------------- + +def test_determinism(seed: str, manifest: dict, commands: list) -> bool: + """ + Run the command sequence 5 times and assert that all Merkle roots agree. + + Each run gets an independent deep-copy of *commands* to prevent state + leakage between runs. + + Args: + seed: Seed string passed to each ExecutionContext. + manifest: Manifest dict passed to each ExecutionContext. + commands: Command list to apply in each run. + + Returns: + True if all 5 runs produce the same Merkle root; False otherwise. + """ + results = [] + for _ in range(5): + ctx = ExecutionContext(seed, manifest, deepcopy(commands)) + for cmd in commands: + execute_command(ctx, cmd, "MODE_1_EXECUTE") + results.append(merkle_root(ctx.repo_state)) + return len(set(results)) == 1 + + +# --------------------------------------------------------------------------- +# Example / Standalone Entry Point +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + manifest = {"root_hash": sha256(b"")} + commands = [{"file": "a.txt", "content": "Hello"}] + ctx = ExecutionContext("seed123", manifest, commands) + for cmd in commands: + execute_command(ctx, cmd, "MODE_1_EXECUTE") + integrity_loop(ctx) diff --git a/tests/test_determinism.py b/tests/test_determinism.py new file mode 100644 index 00000000..def02235 --- /dev/null +++ b/tests/test_determinism.py @@ -0,0 +1,382 @@ +#!/usr/bin/env python3 +""" +Determinism Tests for the ExecutionContext pipeline. + +Validates that the ExecutionContext, canonicalization, Merkle-root hashing, +command execution, and integrity-loop all behave deterministically across +repeated runs and are consistent with fixed expected values. + +Tests: + D_001 Canonicalize is idempotent + D_002 sha256 of fixed bytes matches expected digest + D_003 merkle_root of empty dict returns sha256(b'') + D_004 merkle_root of single file matches expected value + D_005 merkle_root of two files is order-sensitive (stable) + D_006 MODE_1_EXECUTE mutates repo_state; MODE_2_SIMULATE does not + D_007 execute_command always appends exactly one audit-log entry + D_008 test_determinism returns True for a fixed command sequence + D_009 verify_invariants returns True when manifest matches repo state + D_010 integrity_loop returns True after canonicalization repairs mismatches + D_011 integrity_loop returns False when invariants cannot be satisfied + +Author: Orthogonal Engineering +PR: #64 +Version: 1.0.0 +""" + +import sys +import os +from copy import deepcopy +from pathlib import Path + +# --------------------------------------------------------------------------- +# Import the module under test +# --------------------------------------------------------------------------- +_REPO_ROOT = Path(__file__).parent.parent +sys.path.insert(0, str(_REPO_ROOT)) + +from execution_context import ( + ExecutionContext, + canonicalize, + sha256, + merkle_root, + execute_command, + log_operation, + verify_invariants, + canonicalize_repo, + integrity_loop, + enter_mode_0_halt, + test_determinism, + MAX_REPAIR_ATTEMPTS, +) + + +# --------------------------------------------------------------------------- +# Helper +# --------------------------------------------------------------------------- + +def _violation(test_id: str, message: str) -> AssertionError: + """Build a richly annotated AssertionError for CI log traceability.""" + frame = sys._getframe(1) + location = f"{frame.f_code.co_filename}:{frame.f_lineno}" + detail = ( + f"\n[DETERMINISM FAILURE]\n" + f" Test : {test_id}\n" + f" OS : {sys.platform}\n" + f" Python : {sys.version}\n" + f" Location : {location}\n" + f" Detail : {message}" + ) + return AssertionError(detail) + + +# --------------------------------------------------------------------------- +# D_001 canonicalize is idempotent +# --------------------------------------------------------------------------- + +def run_d001_canonicalize_idempotent(): + """D_001: canonicalize(canonicalize(x).decode()) == canonicalize(x).""" + samples = [ + "Hello World", + "line1 \nline2\t\nline3", + '{"b": 1, "a": 2}', + "\r\nWindows\r\nLine\r\nEndings\r\n", + "", + ] + for s in samples: + first = canonicalize(s) + second = canonicalize(first.decode('utf-8')) + if first != second: + raise _violation( + "D_001", + f"canonicalize not idempotent for input={s!r}: " + f"first={first!r} second={second!r}", + ) + + +# --------------------------------------------------------------------------- +# D_002 sha256 of fixed bytes matches expected digest +# --------------------------------------------------------------------------- + +def run_d002_sha256_known_vector(): + """D_002: sha256(b'') matches NIST reference value.""" + expected = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + got = sha256(b"") + if got != expected: + raise _violation( + "D_002", + f"sha256(b'') mismatch: expected={expected} got={got}", + ) + + expected2 = "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824" + got2 = sha256(b"hello") + if got2 != expected2: + raise _violation( + "D_002", + f"sha256(b'hello') mismatch: expected={expected2} got={got2}", + ) + + +# --------------------------------------------------------------------------- +# D_003 merkle_root of empty dict +# --------------------------------------------------------------------------- + +def run_d003_merkle_root_empty(): + """D_003: merkle_root({}) == sha256(b'').""" + expected = sha256(b"") + got = merkle_root({}) + if got != expected: + raise _violation( + "D_003", + f"merkle_root({{}}) mismatch: expected={expected} got={got}", + ) + + +# --------------------------------------------------------------------------- +# D_004 merkle_root of single file +# --------------------------------------------------------------------------- + +def run_d004_merkle_root_single_file(): + """D_004: merkle_root with one entry equals sha256(canonicalize(content)).""" + content = "Hello" + expected = sha256(canonicalize(content)) + got = merkle_root({"a.txt": content}) + if got != expected: + raise _violation( + "D_004", + f"merkle_root single file mismatch: expected={expected} got={got}", + ) + + +# --------------------------------------------------------------------------- +# D_005 merkle_root of two files is stable +# --------------------------------------------------------------------------- + +def run_d005_merkle_root_two_files_stable(): + """D_005: merkle_root of two files is consistent across multiple calls.""" + files = {"a.txt": "Alpha", "b.txt": "Beta"} + root1 = merkle_root(files) + root2 = merkle_root(files) + if root1 != root2: + raise _violation( + "D_005", + f"merkle_root not stable: run1={root1} run2={root2}", + ) + if len(root1) != 64: + raise _violation( + "D_005", + f"merkle_root unexpected length: {len(root1)} (expected 64 hex chars)", + ) + + +# --------------------------------------------------------------------------- +# D_006 MODE_1_EXECUTE mutates; MODE_2_SIMULATE does not +# --------------------------------------------------------------------------- + +def run_d006_execute_vs_simulate(): + """D_006: MODE_1_EXECUTE writes to repo_state; MODE_2_SIMULATE does not.""" + manifest = {"root_hash": sha256(b"")} + cmd = {"file": "x.txt", "content": "data"} + + # MODE_1_EXECUTE should mutate + ctx_exec = ExecutionContext("seed", manifest, [cmd]) + execute_command(ctx_exec, cmd, "MODE_1_EXECUTE") + if "x.txt" not in ctx_exec.repo_state: + raise _violation("D_006", "MODE_1_EXECUTE did not write to repo_state") + + # MODE_2_SIMULATE must not mutate + ctx_sim = ExecutionContext("seed", manifest, [cmd]) + execute_command(ctx_sim, cmd, "MODE_2_SIMULATE") + if ctx_sim.repo_state: + raise _violation( + "D_006", + f"MODE_2_SIMULATE mutated repo_state: {ctx_sim.repo_state}", + ) + + +# --------------------------------------------------------------------------- +# D_007 execute_command always appends one audit-log entry +# --------------------------------------------------------------------------- + +def run_d007_audit_log_entry(): + """D_007: every execute_command call appends exactly one audit-log record.""" + manifest = {"root_hash": sha256(b"")} + cmd = {"file": "f.txt", "content": "v"} + ctx = ExecutionContext("seed", manifest, [cmd]) + + for mode in ("MODE_1_EXECUTE", "MODE_2_SIMULATE"): + before = len(ctx.audit_log) + execute_command(ctx, cmd, mode) + after = len(ctx.audit_log) + if after != before + 1: + raise _violation( + "D_007", + f"audit_log length mismatch after {mode}: before={before} after={after}", + ) + entry = ctx.audit_log[-1] + for field in ("operation_id", "mode", "command", "output_hash", "metadata"): + if field not in entry: + raise _violation( + "D_007", + f"audit_log entry missing field '{field}': {entry}", + ) + if entry["mode"] != mode: + raise _violation( + "D_007", + f"audit_log entry mode mismatch: expected={mode} got={entry['mode']}", + ) + + +# --------------------------------------------------------------------------- +# D_008 test_determinism returns True for a fixed command sequence +# --------------------------------------------------------------------------- + +def run_d008_test_determinism_true(): + """D_008: test_determinism returns True for a fixed seed and command list.""" + manifest = {"root_hash": sha256(b"")} + commands = [{"file": "a.txt", "content": "Hello"}] + result = test_determinism("seed123", manifest, commands) + if not result: + raise _violation( + "D_008", + "test_determinism returned False for a simple deterministic command", + ) + + +# --------------------------------------------------------------------------- +# D_009 verify_invariants: True when manifest matches repo state +# --------------------------------------------------------------------------- + +def run_d009_verify_invariants_true(): + """D_009: verify_invariants returns True when manifest root_hash matches.""" + cmd = {"file": "a.txt", "content": "Hello"} + ctx = ExecutionContext("seed", {"root_hash": sha256(b"")}, [cmd]) + execute_command(ctx, cmd, "MODE_1_EXECUTE") + expected_root = merkle_root(ctx.repo_state) + ctx.manifest["root_hash"] = expected_root + if not verify_invariants(ctx): + raise _violation( + "D_009", + "verify_invariants returned False despite matching manifest root_hash", + ) + + +def run_d009_verify_invariants_false(): + """D_009b: verify_invariants returns False when manifest root_hash mismatches.""" + cmd = {"file": "a.txt", "content": "Hello"} + ctx = ExecutionContext("seed", {"root_hash": "deadbeef" * 8}, [cmd]) + execute_command(ctx, cmd, "MODE_1_EXECUTE") + if verify_invariants(ctx): + raise _violation( + "D_009", + "verify_invariants returned True despite mismatching manifest root_hash", + ) + + +# --------------------------------------------------------------------------- +# D_010 integrity_loop succeeds when manifest matches +# --------------------------------------------------------------------------- + +def run_d010_integrity_loop_success(): + """D_010: integrity_loop returns True when repo state matches manifest.""" + cmd = {"file": "a.txt", "content": "Hello"} + ctx = ExecutionContext("seed", {"root_hash": sha256(b"")}, [cmd]) + execute_command(ctx, cmd, "MODE_1_EXECUTE") + ctx.manifest["root_hash"] = merkle_root(ctx.repo_state) + result = integrity_loop(ctx) + if not result: + raise _violation( + "D_010", + "integrity_loop returned False despite matching manifest", + ) + + +def run_d010_integrity_loop_repairs(): + """D_010b: integrity_loop repairs CRLF content and succeeds.""" + crlf_content = "Hello\r\nWorld" + canonical_content = canonicalize(crlf_content).decode('utf-8') + expected_root = merkle_root({"a.txt": canonical_content}) + + ctx = ExecutionContext("seed", {"root_hash": expected_root}, []) + # Insert raw (non-canonical) content directly — simulating a dirty import + ctx.repo_state["a.txt"] = crlf_content + result = integrity_loop(ctx) + if not result: + raise _violation( + "D_010", + "integrity_loop failed to repair CRLF content", + ) + + +# --------------------------------------------------------------------------- +# D_011 integrity_loop enters MODE_0_HALT when invariants cannot be satisfied +# --------------------------------------------------------------------------- + +def run_d011_integrity_loop_halt(): + """D_011: integrity_loop returns False when root_hash is permanently wrong.""" + cmd = {"file": "a.txt", "content": "Hello"} + ctx = ExecutionContext("seed", {"root_hash": "0" * 64}, [cmd]) + execute_command(ctx, cmd, "MODE_1_EXECUTE") + # manifest root_hash is '000...0' — will never match + result = integrity_loop(ctx) + if result is not False: + raise _violation( + "D_011", + f"integrity_loop should have returned False (MODE_0_HALT), got {result!r}", + ) + + +# --------------------------------------------------------------------------- +# Test registry & entry point +# --------------------------------------------------------------------------- + +ALL_TESTS = [ + run_d001_canonicalize_idempotent, + run_d002_sha256_known_vector, + run_d003_merkle_root_empty, + run_d004_merkle_root_single_file, + run_d005_merkle_root_two_files_stable, + run_d006_execute_vs_simulate, + run_d007_audit_log_entry, + run_d008_test_determinism_true, + run_d009_verify_invariants_true, + run_d009_verify_invariants_false, + run_d010_integrity_loop_success, + run_d010_integrity_loop_repairs, + run_d011_integrity_loop_halt, +] + + +def main() -> int: + print("=" * 72) + print("EXECUTION CONTEXT DETERMINISM TESTS") + print(f"OS: {sys.platform}") + print(f"Python: {sys.version}") + print("=" * 72) + + failures = [] + + for fn in ALL_TESTS: + try: + fn() + print(f" PASS {fn.__name__}") + except AssertionError as exc: + print(f" FAIL {fn.__name__}: {exc}") + failures.append(fn.__name__) + except Exception as exc: + print(f" ERROR {fn.__name__}: {exc}") + failures.append(fn.__name__) + + print("=" * 72) + if failures: + print(f"RESULT: {len(failures)} test(s) FAILED") + for name in failures: + print(f" - {name}") + return 1 + + print("RESULT: ALL DETERMINISM TESTS PASSED") + return 0 + + +if __name__ == "__main__": + sys.exit(main())