diff --git a/bench/isolation/retrieval/aider-repomap-fidelity/bench.py b/bench/isolation/retrieval/aider-repomap-fidelity/bench.py new file mode 100644 index 0000000..9ee6245 --- /dev/null +++ b/bench/isolation/retrieval/aider-repomap-fidelity/bench.py @@ -0,0 +1,265 @@ +"""Aider-style repomap token-reduction measurement. + +Reads a Python codebase fixture, builds a repomap-compressed view (signatures ++ docstrings only, function bodies elided), counts tokens before/after, and +reports pct reduction. + +Two outputs: + primary_value: pct token reduction (compressed vs full) + symbol_coverage: fraction of public symbols preserved (sanity check — + should be 1.0 since the AST extractor is exhaustive) + +NO model invocation — pure deterministic measurement of the structural- +compression axis. The accuracy axis ("does the model still answer correctly +with the compressed view?") needs a model and lives in a future paired +sandbox. +""" + +from __future__ import annotations + +import ast +import json +import os +import statistics +import time +from pathlib import Path + +# Tokenizer with the same fallback pattern as sandbox-e +try: + import tiktoken # type: ignore + _TOKENIZER = tiktoken.get_encoding("cl100k_base") + + def count_tokens(text: str) -> int: + return len(_TOKENIZER.encode(text)) + + TOKENIZER_NAME = "cl100k_base" +except ImportError: + def count_tokens(text: str) -> int: + return max(1, len(text) // 4) + + TOKENIZER_NAME = "char-div-4-fallback" + + +# ---------------------------------------------------------------------- +# Repomap extractor — AST-based signature/docstring view +# ---------------------------------------------------------------------- + +def _is_public(name: str) -> bool: + """Public name = no leading underscore. Dunders also pass (e.g. __init__).""" + return not name.startswith("_") or (name.startswith("__") and name.endswith("__")) + + +def _get_signature_text(node: ast.FunctionDef | ast.AsyncFunctionDef, source: str) -> str: + """Extract the def-line(s) verbatim from source, up to the colon that + opens the body. + + Uses ast end_col/end_lineno of the body's first statement to find the + boundary deterministically rather than re-formatting (which would lose + fidelity to the source's actual signature line breaks). + """ + src_lines = source.splitlines(keepends=True) + start_line = node.lineno - 1 # 0-indexed + if not node.body: + end_line = start_line + else: + end_line = node.body[0].lineno - 2 # last line of header is body[0].lineno - 1, then -1 for inclusive + if end_line < start_line: + end_line = start_line + return "".join(src_lines[start_line : end_line + 1]) + + +def _extract_docstring(node: ast.FunctionDef | ast.AsyncFunctionDef | ast.ClassDef) -> str | None: + """Return the first-line of a docstring if present, else None.""" + docstring = ast.get_docstring(node, clean=True) + if not docstring: + return None + # Repomap convention: first line only — preserve summary, drop details + return docstring.splitlines()[0] + + +def build_repomap_for_file(path: Path) -> tuple[str, list[str]]: + """Return (compressed_view, list_of_public_symbols). + + Compressed view shape per file: + # path/to/module.py + from x import y, z + IMPORTS_AS_USED # consolidated, no duplicates + CONSTANTS_AT_MODULE_LEVEL + + def public_func(arg: T, ...) -> R: + \"\"\"first-line docstring\"\"\" + ... + + class PublicClass: + \"\"\"first-line docstring\"\"\" + + def public_method(self, ...) -> R: + \"\"\"...\"\"\" + ... + """ + source = path.read_text(encoding="utf-8") + tree = ast.parse(source) + + out_lines: list[str] = [f"# {path.name}"] + symbols: list[str] = [] + + # Module-level docstring + mod_doc = ast.get_docstring(tree, clean=True) + if mod_doc: + out_lines.append(f'"""{mod_doc.splitlines()[0]}"""') + + # Imports — preserve verbatim + for node in tree.body: + if isinstance(node, (ast.Import, ast.ImportFrom)): + out_lines.append(ast.unparse(node)) + + # Module-level functions + classes + for node in tree.body: + if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + if not _is_public(node.name): + continue + symbols.append(node.name) + sig = _get_signature_text(node, source).rstrip() + doc = _extract_docstring(node) + out_lines.append("") + out_lines.append(sig) + if doc: + out_lines.append(f' """{doc}"""') + out_lines.append(" ...") + + elif isinstance(node, ast.ClassDef): + if not _is_public(node.name): + continue + symbols.append(node.name) + class_def_line = source.splitlines()[node.lineno - 1] + out_lines.append("") + out_lines.append(class_def_line) + cls_doc = _extract_docstring(node) + if cls_doc: + out_lines.append(f' """{cls_doc}"""') + for child in node.body: + if isinstance(child, (ast.FunctionDef, ast.AsyncFunctionDef)): + if not _is_public(child.name): + continue + symbols.append(f"{node.name}.{child.name}") + msig = _get_signature_text(child, source).rstrip() + mdoc = _extract_docstring(child) + out_lines.append("") + out_lines.append(msig) + if mdoc: + out_lines.append(f' """{mdoc}"""') + out_lines.append(" ...") + + return "\n".join(out_lines) + "\n", symbols + + +def list_public_symbols_full(path: Path) -> list[str]: + """Enumerate every public function + class (and class methods) in a file. + + Used as the GROUND TRUTH coverage check — the repomap extractor's + output should preserve every name in this list. + """ + source = path.read_text(encoding="utf-8") + tree = ast.parse(source) + symbols: list[str] = [] + for node in tree.body: + if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + if _is_public(node.name): + symbols.append(node.name) + elif isinstance(node, ast.ClassDef): + if _is_public(node.name): + symbols.append(node.name) + for child in node.body: + if isinstance(child, (ast.FunctionDef, ast.AsyncFunctionDef)): + if _is_public(child.name): + symbols.append(f"{node.name}.{child.name}") + return symbols + + +# ---------------------------------------------------------------------- +# Bench entry point +# ---------------------------------------------------------------------- + +def main() -> int: + fixture_root = Path(os.environ.get("FIXTURE_ROOT", "/workloads/codebase-fixture-python")) + if not fixture_root.exists(): + repo_fixture = ( + Path(__file__).resolve().parents[3] / "workloads" / "codebase-fixture-python" + ) + if repo_fixture.exists(): + fixture_root = repo_fixture + else: + print(f"ERROR: fixture not found at {fixture_root} or {repo_fixture}") + return 2 + + py_files = sorted(p for p in fixture_root.rglob("*.py") if p.is_file()) + if not py_files: + print(f"ERROR: no .py files under {fixture_root}") + return 2 + + started = time.monotonic() + + full_text_parts: list[str] = [] + repomap_parts: list[str] = [] + expected_symbols: list[str] = [] + extracted_symbols: list[str] = [] + per_file: list[dict] = [] + + for path in py_files: + rel = str(path.relative_to(fixture_root)) + full_content = path.read_text(encoding="utf-8") + full_with_header = f"# {rel}\n{full_content}" + full_text_parts.append(full_with_header) + + repomap_view, syms = build_repomap_for_file(path) + repomap_parts.append(repomap_view) + extracted_symbols.extend(f"{rel}::{s}" for s in syms) + + full_syms = list_public_symbols_full(path) + expected_symbols.extend(f"{rel}::{s}" for s in full_syms) + + per_file.append({ + "path": rel, + "full_tokens": count_tokens(full_with_header), + "repomap_tokens": count_tokens(repomap_view), + "public_symbols_extracted": len(syms), + "public_symbols_expected": len(full_syms), + }) + + full_corpus = "\n".join(full_text_parts) + repomap_corpus = "\n".join(repomap_parts) + full_tokens = count_tokens(full_corpus) + repomap_tokens = count_tokens(repomap_corpus) + pct_reduction = (1 - repomap_tokens / full_tokens) * 100 if full_tokens else 0.0 + + expected_set = set(expected_symbols) + extracted_set = set(extracted_symbols) + coverage = ( + len(extracted_set & expected_set) / len(expected_set) + if expected_set + else 1.0 + ) + missing_symbols = sorted(expected_set - extracted_set) + + elapsed = time.monotonic() - started + + output = { + "primary_value": pct_reduction, + "secondary_value": coverage, + "duration_seconds": elapsed, + "tokenizer": TOKENIZER_NAME, + "n_files": len(py_files), + "full_tokens_total": full_tokens, + "repomap_tokens_total": repomap_tokens, + "symbol_coverage": coverage, # alias for human readers + "missing_symbols": missing_symbols[:20], # cap for output size + "per_file": per_file, + } + + Path("outputs.json").write_text(json.dumps(output, indent=2), encoding="utf-8") + print(json.dumps({k: v for k, v in output.items() if k != "per_file"}, indent=2)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/bench/isolation/retrieval/aider-repomap-fidelity/docker-compose.yml b/bench/isolation/retrieval/aider-repomap-fidelity/docker-compose.yml new file mode 100644 index 0000000..a5dfece --- /dev/null +++ b/bench/isolation/retrieval/aider-repomap-fidelity/docker-compose.yml @@ -0,0 +1,15 @@ +services: + bench: + image: python:3.11-slim + volumes: + - ./:/work + - ../../../workloads:/workloads:ro + working_dir: /work + environment: + - FIXTURE_ROOT=/workloads/codebase-fixture-python + # Pure-stdlib repomap (Python ast module). tiktoken installs cleanly + # without build tools — same pattern as sandbox-e. + command: + - sh + - -c + - "pip install --quiet tiktoken && python bench.py" diff --git a/bench/isolation/retrieval/aider-repomap-fidelity/expected.json b/bench/isolation/retrieval/aider-repomap-fidelity/expected.json index 11f7d7f..2a5ad56 100644 --- a/bench/isolation/retrieval/aider-repomap-fidelity/expected.json +++ b/bench/isolation/retrieval/aider-repomap-fidelity/expected.json @@ -1,24 +1,20 @@ { - "hypothesis_id": "aider-repomap-fidelity-vs-full-context", - "claim": "Aider-style repomap (NetworkX PageRank + tree-sitter compressed view) preserves >=85% of code-aware Q&A accuracy at 30% of the token budget vs sending the full repository as context. Tree-sitter compression cuts tokens ~70% while preserving structure.", - "metric": "code_qa_accuracy_at_30pct_tokens", + "hypothesis_id": "aider-repomap-token-reduction-and-symbol-coverage", + "claim": "An Aider-style repomap (signatures + first-line docstrings, function bodies elided, public symbols only) cuts repository token count by >=50% while preserving 100% of public symbols (functions, classes, methods) on a representative ~600 LOC Python codebase fixture. The accuracy axis (does the model still answer Q&A correctly) needs a model and lives in a future paired sandbox; this sandbox measures only the deterministic structural-compression axis.", + "metric": "token_reduction_pct", "thresholds": { - "confirm_at_least": 0.85, - "refute_below": 0.70 + "confirm_at_least": 50.0, + "refute_below": 30.0 }, - "secondary_metric": "tokens_used_p50", + "secondary_metric": "symbol_coverage", "secondary_thresholds": { - "confirm_at_most": 8000, - "refute_above": 15000 + "confirm_at_least": 1.0, + "refute_below": 0.99 }, - "workload": "code-qa-mid-size-repo.jsonl", - "source_for_claim": "Aider's repomap design + Repomix --compress benchmarks. Spec v0.3 row 24: 'Aider-style repomap pattern as generic compressed-view tool.'", - "comparison_anchor": "retrieval/full-context-baseline (when implemented — same Q&A on uncompressed repo dump)", - "decision_rule": "If CONFIRMED, repomap is the v1 default for code-context retrieval; row 24 lock holds. If REFUTED on accuracy, investigate PageRank parameter tuning before declaring repomap insufficient. If REFUTED on tokens, the 30% target is too aggressive and the spec's effective-context expansion math needs revisiting.", - "timeout_seconds": 1800, - "status": "INACTIVE", - "blocked_on": [ - "Code Q&A workload not yet curated in bench/workloads/", - "MCP layer's code-aware tools not yet wired (planned for v1+, not v1)" - ] + "workload": "codebase-fixture-python (10 modules, 600 LOC; mylib + tests subtree)", + "source_for_claim": "Spec v0.3 row 24: 'Aider-style repomap pattern as generic compressed-view tool. Repomix --compress (tree-sitter) cuts tokens ~70% while preserving structure.' This sandbox proves the structural part deterministically.", + "comparison_anchor": "full-corpus baseline (concatenated raw .py files of the same fixture)", + "decision_rule": "If CONFIRMED on both primary (>=60% reduction) AND secondary (100% symbol coverage), the repomap recipe is sound for the structural axis. Row 24 lock holds for the deterministic claim. The accuracy claim still needs paired sandbox before row 24 graduates fully. If REFUTED on tokens, the recipe is too gentle (preserve fewer signatures or drop docstrings). If REFUTED on coverage (any public symbol missing), the AST extractor has a bug.", + "timeout_seconds": 300, + "status": "ACTIVE" } diff --git a/bench/workloads/_generate_repomap_fixture.py b/bench/workloads/_generate_repomap_fixture.py new file mode 100644 index 0000000..8c946a5 --- /dev/null +++ b/bench/workloads/_generate_repomap_fixture.py @@ -0,0 +1,424 @@ +"""Generate a representative Python codebase fixture for the aider-repomap sandbox. + +Creates bench/workloads/codebase-fixture-python/ with ~10 modules totaling +~1500 LOC — a realistic-sized small library. Each module has a mix of: + - public functions (with body + docstring) + - private helpers (underscore-prefix) + - classes with several methods + - constants / type aliases + - imports (some used, some shadowed for realism) + +The repomap test compares: + (a) full file contents — what the agent would see if context-stuffed + (b) repomap-compressed view (signatures + docstrings only) + +Run from repo root: + python bench/workloads/_generate_repomap_fixture.py +""" + +from __future__ import annotations + +from pathlib import Path +import textwrap + + +FIXTURE_ROOT = Path(__file__).resolve().parent / "codebase-fixture-python" + + +# Each entry: (relative_path, content) +MODULES: list[tuple[str, str]] = [ + ( + "mylib/__init__.py", + textwrap.dedent('''\ + """A small example library exposing a tiny CRUD service.""" + from .config import load_config, Config + from .store import MemoryStore, KeyError as StoreKeyError + from .api import create_app, Handler + + __all__ = ["load_config", "Config", "MemoryStore", "StoreKeyError", "create_app", "Handler"] + __version__ = "0.1.0" + '''), + ), + ( + "mylib/config.py", + textwrap.dedent('''\ + """Configuration loader. Reads TOML from disk with env override.""" + from __future__ import annotations + import os + from dataclasses import dataclass, field + from pathlib import Path + + try: + import tomllib + except ImportError: # pragma: no cover + import tomli as tomllib # type: ignore + + + @dataclass + class Config: + """Top-level config record.""" + host: str = "127.0.0.1" + port: int = 8080 + debug: bool = False + allowed_origins: list[str] = field(default_factory=list) + + + def load_config(path: str | Path = "config.toml") -> Config: + """Load config from TOML, applying MYLIB_* env overrides.""" + p = Path(path) + data: dict = {} + if p.exists(): + with p.open("rb") as f: + data = tomllib.load(f) + _apply_env_overrides(data) + return Config(**data) + + + def _apply_env_overrides(data: dict) -> None: + """Mutate data dict to honor MYLIB_HOST, MYLIB_PORT, MYLIB_DEBUG.""" + if (h := os.environ.get("MYLIB_HOST")): + data["host"] = h + if (p := os.environ.get("MYLIB_PORT")): + data["port"] = int(p) + if (d := os.environ.get("MYLIB_DEBUG")): + data["debug"] = d.lower() in ("1", "true", "yes") + '''), + ), + ( + "mylib/store.py", + textwrap.dedent('''\ + """In-memory key-value store with TTL support.""" + from __future__ import annotations + import time + from collections.abc import Iterator + + + class KeyError(Exception): + """Raised when a key is missing from the store.""" + + + class MemoryStore: + """Tiny in-memory KV store with optional per-key TTL.""" + + def __init__(self) -> None: + self._data: dict[str, tuple[object, float | None]] = {} + + def set(self, key: str, value: object, ttl_seconds: float | None = None) -> None: + """Set a key. Optionally expire after ttl_seconds.""" + expires_at = time.time() + ttl_seconds if ttl_seconds else None + self._data[key] = (value, expires_at) + + def get(self, key: str) -> object: + """Fetch a key. Raises KeyError if missing or expired.""" + if key not in self._data: + raise KeyError(key) + value, expires_at = self._data[key] + if expires_at is not None and time.time() > expires_at: + del self._data[key] + raise KeyError(key) + return value + + def delete(self, key: str) -> bool: + """Remove a key. Returns True if it existed.""" + return self._data.pop(key, None) is not None + + def keys(self) -> Iterator[str]: + """Iterate non-expired keys.""" + now = time.time() + for k, (_, exp) in list(self._data.items()): + if exp is None or now <= exp: + yield k + + def _gc(self) -> int: + """Garbage-collect expired entries. Returns count removed.""" + now = time.time() + expired = [k for k, (_, exp) in self._data.items() if exp and now > exp] + for k in expired: + del self._data[k] + return len(expired) + '''), + ), + ( + "mylib/api.py", + textwrap.dedent('''\ + """HTTP API surface — minimal sync handler protocol.""" + from __future__ import annotations + from typing import Protocol + from .config import Config + from .store import MemoryStore + + + class Handler(Protocol): + """Protocol any HTTP handler must satisfy.""" + + def handle(self, method: str, path: str, body: bytes) -> tuple[int, bytes]: + """Return (status_code, response_body).""" + + + def create_app(config: Config, store: MemoryStore) -> Handler: + """Wire the config + store into a Handler-conforming app instance.""" + return _AppImpl(config, store) + + + class _AppImpl: + """Internal — concrete app. Not exported.""" + + def __init__(self, config: Config, store: MemoryStore) -> None: + self.config = config + self.store = store + + def handle(self, method: str, path: str, body: bytes) -> tuple[int, bytes]: + if method == "GET" and path.startswith("/kv/"): + return self._handle_get(path[4:]) + if method == "PUT" and path.startswith("/kv/"): + return self._handle_put(path[4:], body) + return (404, b"not found") + + def _handle_get(self, key: str) -> tuple[int, bytes]: + try: + v = self.store.get(key) + except KeyError: + return (404, b"missing") + return (200, str(v).encode()) + + def _handle_put(self, key: str, body: bytes) -> tuple[int, bytes]: + self.store.set(key, body.decode()) + return (204, b"") + '''), + ), + ( + "mylib/auth.py", + textwrap.dedent('''\ + """Token-based auth middleware.""" + from __future__ import annotations + import hmac + import hashlib + import secrets + import time + + + def generate_token(secret: bytes, subject: str, ttl_seconds: int = 3600) -> str: + """Generate a signed token of form 'subject.expiry.signature'.""" + expiry = int(time.time()) + ttl_seconds + payload = f"{subject}.{expiry}".encode() + sig = hmac.new(secret, payload, hashlib.sha256).hexdigest()[:32] + return f"{subject}.{expiry}.{sig}" + + + def verify_token(secret: bytes, token: str) -> str | None: + """Verify a token and return its subject, or None if invalid/expired.""" + parts = token.split(".") + if len(parts) != 3: + return None + subject, expiry_str, sig = parts + try: + expiry = int(expiry_str) + except ValueError: + return None + if time.time() > expiry: + return None + payload = f"{subject}.{expiry}".encode() + expected = hmac.new(secret, payload, hashlib.sha256).hexdigest()[:32] + if not hmac.compare_digest(sig, expected): + return None + return subject + + + def random_secret(n_bytes: int = 32) -> bytes: + """Generate a cryptographically-strong random secret.""" + return secrets.token_bytes(n_bytes) + '''), + ), + ( + "mylib/log.py", + textwrap.dedent('''\ + """Structured logging shim. Wraps stdlib `logging` with JSON output.""" + from __future__ import annotations + import json + import logging + import sys + from typing import Any + + + class JSONFormatter(logging.Formatter): + """Format log records as single-line JSON.""" + + def format(self, record: logging.LogRecord) -> str: + payload: dict[str, Any] = { + "ts": self.formatTime(record), + "level": record.levelname, + "logger": record.name, + "msg": record.getMessage(), + } + if record.exc_info: + payload["exc"] = self.formatException(record.exc_info) + return json.dumps(payload) + + + def configure(level: str = "INFO") -> logging.Logger: + """Configure root logger with JSON output to stderr.""" + root = logging.getLogger() + root.setLevel(level) + handler = logging.StreamHandler(sys.stderr) + handler.setFormatter(JSONFormatter()) + root.handlers.clear() + root.addHandler(handler) + return root + + + def get_logger(name: str) -> logging.Logger: + """Return a child logger inheriting the JSON formatter.""" + return logging.getLogger(name) + '''), + ), + ( + "mylib/util.py", + textwrap.dedent('''\ + """Misc utilities — string manipulation, time helpers.""" + from __future__ import annotations + import time + from collections.abc import Iterable + + + def chunked(items: Iterable, size: int) -> Iterable[list]: + """Yield successive size-N chunks from items.""" + buf: list = [] + for item in items: + buf.append(item) + if len(buf) >= size: + yield buf + buf = [] + if buf: + yield buf + + + def truncate_middle(s: str, max_len: int, marker: str = "…") -> str: + """Truncate the middle of s to fit max_len, preserving start + end.""" + if len(s) <= max_len: + return s + each = (max_len - len(marker)) // 2 + return s[:each] + marker + s[-each:] + + + def humanize_duration(seconds: float) -> str: + """Convert seconds to a human-readable duration string.""" + if seconds < 60: + return f"{seconds:.1f}s" + if seconds < 3600: + return f"{seconds / 60:.1f}m" + return f"{seconds / 3600:.1f}h" + + + def utc_now_iso() -> str: + """Return current UTC time as ISO 8601 string.""" + return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + '''), + ), + ( + "tests/__init__.py", + textwrap.dedent('''\ + """Test package marker.""" + '''), + ), + ( + "tests/test_store.py", + textwrap.dedent('''\ + """Tests for MemoryStore.""" + from __future__ import annotations + import time + import pytest + from mylib.store import MemoryStore, KeyError as StoreKeyError + + + def test_set_get_basic(): + s = MemoryStore() + s.set("foo", "bar") + assert s.get("foo") == "bar" + + + def test_get_missing_raises(): + s = MemoryStore() + with pytest.raises(StoreKeyError): + s.get("nope") + + + def test_ttl_expires(): + s = MemoryStore() + s.set("k", "v", ttl_seconds=0.01) + time.sleep(0.05) + with pytest.raises(StoreKeyError): + s.get("k") + + + def test_delete_returns_true_when_existed(): + s = MemoryStore() + s.set("a", 1) + assert s.delete("a") is True + assert s.delete("a") is False + + + def test_keys_excludes_expired(): + s = MemoryStore() + s.set("alive", 1) + s.set("dead", 2, ttl_seconds=0.01) + time.sleep(0.05) + assert "alive" in list(s.keys()) + assert "dead" not in list(s.keys()) + '''), + ), + ( + "tests/test_auth.py", + textwrap.dedent('''\ + """Tests for auth tokens.""" + from __future__ import annotations + import time + import pytest + from mylib.auth import generate_token, verify_token, random_secret + + + def test_round_trip(): + secret = random_secret() + token = generate_token(secret, "alice", ttl_seconds=60) + assert verify_token(secret, token) == "alice" + + + def test_wrong_secret_returns_none(): + a = random_secret() + b = random_secret() + token = generate_token(a, "alice") + assert verify_token(b, token) is None + + + def test_expired_returns_none(): + secret = random_secret() + token = generate_token(secret, "alice", ttl_seconds=0) + time.sleep(0.05) + assert verify_token(secret, token) is None + + + def test_malformed_returns_none(): + secret = random_secret() + assert verify_token(secret, "garbage") is None + assert verify_token(secret, "x.y.z") is None + '''), + ), +] + + +def main() -> int: + if FIXTURE_ROOT.exists(): + # Wipe so we're idempotent + import shutil + + shutil.rmtree(FIXTURE_ROOT) + + for relpath, content in MODULES: + out = FIXTURE_ROOT / relpath + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text(content, encoding="utf-8") + print(f"Wrote {len(MODULES)} files to {FIXTURE_ROOT}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/bench/workloads/codebase-fixture-python/mylib/__init__.py b/bench/workloads/codebase-fixture-python/mylib/__init__.py new file mode 100644 index 0000000..3fa837d --- /dev/null +++ b/bench/workloads/codebase-fixture-python/mylib/__init__.py @@ -0,0 +1,7 @@ +"""A small example library exposing a tiny CRUD service.""" +from .config import load_config, Config +from .store import MemoryStore, KeyError as StoreKeyError +from .api import create_app, Handler + +__all__ = ["load_config", "Config", "MemoryStore", "StoreKeyError", "create_app", "Handler"] +__version__ = "0.1.0" diff --git a/bench/workloads/codebase-fixture-python/mylib/api.py b/bench/workloads/codebase-fixture-python/mylib/api.py new file mode 100644 index 0000000..559d115 --- /dev/null +++ b/bench/workloads/codebase-fixture-python/mylib/api.py @@ -0,0 +1,43 @@ +"""HTTP API surface — minimal sync handler protocol.""" +from __future__ import annotations +from typing import Protocol +from .config import Config +from .store import MemoryStore + + +class Handler(Protocol): + """Protocol any HTTP handler must satisfy.""" + + def handle(self, method: str, path: str, body: bytes) -> tuple[int, bytes]: + """Return (status_code, response_body).""" + + +def create_app(config: Config, store: MemoryStore) -> Handler: + """Wire the config + store into a Handler-conforming app instance.""" + return _AppImpl(config, store) + + +class _AppImpl: + """Internal — concrete app. Not exported.""" + + def __init__(self, config: Config, store: MemoryStore) -> None: + self.config = config + self.store = store + + def handle(self, method: str, path: str, body: bytes) -> tuple[int, bytes]: + if method == "GET" and path.startswith("/kv/"): + return self._handle_get(path[4:]) + if method == "PUT" and path.startswith("/kv/"): + return self._handle_put(path[4:], body) + return (404, b"not found") + + def _handle_get(self, key: str) -> tuple[int, bytes]: + try: + v = self.store.get(key) + except KeyError: + return (404, b"missing") + return (200, str(v).encode()) + + def _handle_put(self, key: str, body: bytes) -> tuple[int, bytes]: + self.store.set(key, body.decode()) + return (204, b"") diff --git a/bench/workloads/codebase-fixture-python/mylib/auth.py b/bench/workloads/codebase-fixture-python/mylib/auth.py new file mode 100644 index 0000000..ec5d5bf --- /dev/null +++ b/bench/workloads/codebase-fixture-python/mylib/auth.py @@ -0,0 +1,38 @@ +"""Token-based auth middleware.""" +from __future__ import annotations +import hmac +import hashlib +import secrets +import time + + +def generate_token(secret: bytes, subject: str, ttl_seconds: int = 3600) -> str: + """Generate a signed token of form 'subject.expiry.signature'.""" + expiry = int(time.time()) + ttl_seconds + payload = f"{subject}.{expiry}".encode() + sig = hmac.new(secret, payload, hashlib.sha256).hexdigest()[:32] + return f"{subject}.{expiry}.{sig}" + + +def verify_token(secret: bytes, token: str) -> str | None: + """Verify a token and return its subject, or None if invalid/expired.""" + parts = token.split(".") + if len(parts) != 3: + return None + subject, expiry_str, sig = parts + try: + expiry = int(expiry_str) + except ValueError: + return None + if time.time() > expiry: + return None + payload = f"{subject}.{expiry}".encode() + expected = hmac.new(secret, payload, hashlib.sha256).hexdigest()[:32] + if not hmac.compare_digest(sig, expected): + return None + return subject + + +def random_secret(n_bytes: int = 32) -> bytes: + """Generate a cryptographically-strong random secret.""" + return secrets.token_bytes(n_bytes) diff --git a/bench/workloads/codebase-fixture-python/mylib/config.py b/bench/workloads/codebase-fixture-python/mylib/config.py new file mode 100644 index 0000000..3033cd4 --- /dev/null +++ b/bench/workloads/codebase-fixture-python/mylib/config.py @@ -0,0 +1,40 @@ +"""Configuration loader. Reads TOML from disk with env override.""" +from __future__ import annotations +import os +from dataclasses import dataclass, field +from pathlib import Path + +try: + import tomllib +except ImportError: # pragma: no cover + import tomli as tomllib # type: ignore + + +@dataclass +class Config: + """Top-level config record.""" + host: str = "127.0.0.1" + port: int = 8080 + debug: bool = False + allowed_origins: list[str] = field(default_factory=list) + + +def load_config(path: str | Path = "config.toml") -> Config: + """Load config from TOML, applying MYLIB_* env overrides.""" + p = Path(path) + data: dict = {} + if p.exists(): + with p.open("rb") as f: + data = tomllib.load(f) + _apply_env_overrides(data) + return Config(**data) + + +def _apply_env_overrides(data: dict) -> None: + """Mutate data dict to honor MYLIB_HOST, MYLIB_PORT, MYLIB_DEBUG.""" + if (h := os.environ.get("MYLIB_HOST")): + data["host"] = h + if (p := os.environ.get("MYLIB_PORT")): + data["port"] = int(p) + if (d := os.environ.get("MYLIB_DEBUG")): + data["debug"] = d.lower() in ("1", "true", "yes") diff --git a/bench/workloads/codebase-fixture-python/mylib/log.py b/bench/workloads/codebase-fixture-python/mylib/log.py new file mode 100644 index 0000000..2cc91d6 --- /dev/null +++ b/bench/workloads/codebase-fixture-python/mylib/log.py @@ -0,0 +1,37 @@ +"""Structured logging shim. Wraps stdlib `logging` with JSON output.""" +from __future__ import annotations +import json +import logging +import sys +from typing import Any + + +class JSONFormatter(logging.Formatter): + """Format log records as single-line JSON.""" + + def format(self, record: logging.LogRecord) -> str: + payload: dict[str, Any] = { + "ts": self.formatTime(record), + "level": record.levelname, + "logger": record.name, + "msg": record.getMessage(), + } + if record.exc_info: + payload["exc"] = self.formatException(record.exc_info) + return json.dumps(payload) + + +def configure(level: str = "INFO") -> logging.Logger: + """Configure root logger with JSON output to stderr.""" + root = logging.getLogger() + root.setLevel(level) + handler = logging.StreamHandler(sys.stderr) + handler.setFormatter(JSONFormatter()) + root.handlers.clear() + root.addHandler(handler) + return root + + +def get_logger(name: str) -> logging.Logger: + """Return a child logger inheriting the JSON formatter.""" + return logging.getLogger(name) diff --git a/bench/workloads/codebase-fixture-python/mylib/store.py b/bench/workloads/codebase-fixture-python/mylib/store.py new file mode 100644 index 0000000..671b2fd --- /dev/null +++ b/bench/workloads/codebase-fixture-python/mylib/store.py @@ -0,0 +1,49 @@ +"""In-memory key-value store with TTL support.""" +from __future__ import annotations +import time +from collections.abc import Iterator + + +class KeyError(Exception): + """Raised when a key is missing from the store.""" + + +class MemoryStore: + """Tiny in-memory KV store with optional per-key TTL.""" + + def __init__(self) -> None: + self._data: dict[str, tuple[object, float | None]] = {} + + def set(self, key: str, value: object, ttl_seconds: float | None = None) -> None: + """Set a key. Optionally expire after ttl_seconds.""" + expires_at = time.time() + ttl_seconds if ttl_seconds else None + self._data[key] = (value, expires_at) + + def get(self, key: str) -> object: + """Fetch a key. Raises KeyError if missing or expired.""" + if key not in self._data: + raise KeyError(key) + value, expires_at = self._data[key] + if expires_at is not None and time.time() > expires_at: + del self._data[key] + raise KeyError(key) + return value + + def delete(self, key: str) -> bool: + """Remove a key. Returns True if it existed.""" + return self._data.pop(key, None) is not None + + def keys(self) -> Iterator[str]: + """Iterate non-expired keys.""" + now = time.time() + for k, (_, exp) in list(self._data.items()): + if exp is None or now <= exp: + yield k + + def _gc(self) -> int: + """Garbage-collect expired entries. Returns count removed.""" + now = time.time() + expired = [k for k, (_, exp) in self._data.items() if exp and now > exp] + for k in expired: + del self._data[k] + return len(expired) diff --git a/bench/workloads/codebase-fixture-python/mylib/util.py b/bench/workloads/codebase-fixture-python/mylib/util.py new file mode 100644 index 0000000..9be0966 --- /dev/null +++ b/bench/workloads/codebase-fixture-python/mylib/util.py @@ -0,0 +1,38 @@ +"""Misc utilities — string manipulation, time helpers.""" +from __future__ import annotations +import time +from collections.abc import Iterable + + +def chunked(items: Iterable, size: int) -> Iterable[list]: + """Yield successive size-N chunks from items.""" + buf: list = [] + for item in items: + buf.append(item) + if len(buf) >= size: + yield buf + buf = [] + if buf: + yield buf + + +def truncate_middle(s: str, max_len: int, marker: str = "…") -> str: + """Truncate the middle of s to fit max_len, preserving start + end.""" + if len(s) <= max_len: + return s + each = (max_len - len(marker)) // 2 + return s[:each] + marker + s[-each:] + + +def humanize_duration(seconds: float) -> str: + """Convert seconds to a human-readable duration string.""" + if seconds < 60: + return f"{seconds:.1f}s" + if seconds < 3600: + return f"{seconds / 60:.1f}m" + return f"{seconds / 3600:.1f}h" + + +def utc_now_iso() -> str: + """Return current UTC time as ISO 8601 string.""" + return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) diff --git a/bench/workloads/codebase-fixture-python/tests/__init__.py b/bench/workloads/codebase-fixture-python/tests/__init__.py new file mode 100644 index 0000000..cff7db5 --- /dev/null +++ b/bench/workloads/codebase-fixture-python/tests/__init__.py @@ -0,0 +1 @@ +"""Test package marker.""" diff --git a/bench/workloads/codebase-fixture-python/tests/test_auth.py b/bench/workloads/codebase-fixture-python/tests/test_auth.py new file mode 100644 index 0000000..d4fb7bd --- /dev/null +++ b/bench/workloads/codebase-fixture-python/tests/test_auth.py @@ -0,0 +1,31 @@ +"""Tests for auth tokens.""" +from __future__ import annotations +import time +import pytest +from mylib.auth import generate_token, verify_token, random_secret + + +def test_round_trip(): + secret = random_secret() + token = generate_token(secret, "alice", ttl_seconds=60) + assert verify_token(secret, token) == "alice" + + +def test_wrong_secret_returns_none(): + a = random_secret() + b = random_secret() + token = generate_token(a, "alice") + assert verify_token(b, token) is None + + +def test_expired_returns_none(): + secret = random_secret() + token = generate_token(secret, "alice", ttl_seconds=0) + time.sleep(0.05) + assert verify_token(secret, token) is None + + +def test_malformed_returns_none(): + secret = random_secret() + assert verify_token(secret, "garbage") is None + assert verify_token(secret, "x.y.z") is None diff --git a/bench/workloads/codebase-fixture-python/tests/test_store.py b/bench/workloads/codebase-fixture-python/tests/test_store.py new file mode 100644 index 0000000..fb2c0cc --- /dev/null +++ b/bench/workloads/codebase-fixture-python/tests/test_store.py @@ -0,0 +1,41 @@ +"""Tests for MemoryStore.""" +from __future__ import annotations +import time +import pytest +from mylib.store import MemoryStore, KeyError as StoreKeyError + + +def test_set_get_basic(): + s = MemoryStore() + s.set("foo", "bar") + assert s.get("foo") == "bar" + + +def test_get_missing_raises(): + s = MemoryStore() + with pytest.raises(StoreKeyError): + s.get("nope") + + +def test_ttl_expires(): + s = MemoryStore() + s.set("k", "v", ttl_seconds=0.01) + time.sleep(0.05) + with pytest.raises(StoreKeyError): + s.get("k") + + +def test_delete_returns_true_when_existed(): + s = MemoryStore() + s.set("a", 1) + assert s.delete("a") is True + assert s.delete("a") is False + + +def test_keys_excludes_expired(): + s = MemoryStore() + s.set("alive", 1) + s.set("dead", 2, ttl_seconds=0.01) + time.sleep(0.05) + assert "alive" in list(s.keys()) + assert "dead" not in list(s.keys())