From 76191ca55e5c183d41ad0b21d910be4072fe7e26 Mon Sep 17 00:00:00 2001 From: Zefan Cai <67849306+Zefan-Cai@users.noreply.github.com> Date: Tue, 31 Mar 2026 11:25:55 -0500 Subject: [PATCH] Add structured artifact index for workspace outputs --- README.md | 6 +- src/artifact_index.py | 312 +++++++++++++++++++++++++++++++++++ src/manager.py | 13 +- src/utils.py | 2 + src/writing_manifest.py | 59 ++++++- tests/test_artifact_index.py | 80 +++++++++ 6 files changed, 463 insertions(+), 9 deletions(-) create mode 100644 src/artifact_index.py create mode 100644 tests/test_artifact_index.py diff --git a/README.md b/README.md index b74d8ec..40aed5e 100644 --- a/README.md +++ b/README.md @@ -308,7 +308,7 @@ File boundaries: ## 📂 Run Layout -Each run contains `user_input.txt`, `memory.md`, `prompt_cache/`, `operator_state/`, `stages/`, `workspace/`, `logs.txt`, and `logs_raw.jsonl`. The substantive research payload lives in `workspace/`. +Each run contains `user_input.txt`, `memory.md`, `run_manifest.json`, `artifact_index.json`, `prompt_cache/`, `operator_state/`, `stages/`, `workspace/`, `logs.txt`, and `logs_raw.jsonl`. The substantive research payload lives in `workspace/`. ```mermaid flowchart TD @@ -338,6 +338,8 @@ Workspace directories: Other run state: - `memory.md`: approved cross-stage memory only. +- `run_manifest.json`: machine-readable run and stage lifecycle state. +- `artifact_index.json`: machine-readable index over `workspace/data`, `workspace/results`, and `workspace/figures`. - `prompt_cache/`: exact prompts used for stage attempts and repairs. - `operator_state/`: per-stage Claude session IDs. - `stages/`: draft and promoted stage summaries. @@ -416,7 +418,7 @@ Open work is tracked here so contributors can pick up clear, decoupled improveme - Cross-stage rollback and invalidation. When a later stage reveals that an earlier design decision is wrong, the workflow should be able to jump back to an earlier stage and mark downstream stages as stale. This is the biggest current control-flow gap. - Machine-readable run manifest. Add a single source of truth such as `run_manifest.json` to track stage status, approval state, stale dependencies, session IDs, and key artifact pointers. This should make both automation and future UI work much cleaner. - Continuation handoff compression. Add a short machine-generated stage handoff file that summarizes what is already correct, what is missing, and which files matter most. This should reduce context growth and make continuation more stable over long runs. -- Result schema and artifact indexing. Standardize `workspace/data/`, `workspace/results/`, and `workspace/figures/` around explicit schemas and generate an artifact index automatically. Later stages and the UI should consume structured metadata instead of scanning ad hoc files. +- ~~Result schema and artifact indexing.~~ Standardize `workspace/data/`, `workspace/results/`, and `workspace/figures/` around explicit schemas and generate an artifact index automatically. The workflow now writes `artifact_index.json`, carries basic inferred or declared schema metadata, and feeds the index into later-stage prompt context and the writing manifest. - Writing pipeline hardening. Turn Stage 07 into a reliable manuscript production pipeline with stable conference and journal-style paper structures, bibliography handling, table and figure inclusion, and reproducible PDF compilation. The goal is a submission-ready paper package, not just writing notes. - Review and dissemination package. Expand Stage 08 so it produces readiness checklists, threats-to-validity notes, artifact manifests, release notes, and external-facing research bundles. The final stage should feel like packaging a paper for real release, not just wrapping up text. - Frontend run dashboard. Build a lightweight UI that can browse runs, stage status, summaries, logs, artifacts, and validation failures. It should read from the run directory and manifest rather than introducing a database first. diff --git a/src/artifact_index.py b/src/artifact_index.py new file mode 100644 index 0000000..c75e159 --- /dev/null +++ b/src/artifact_index.py @@ -0,0 +1,312 @@ +from __future__ import annotations + +import csv +import json +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path + +from .utils import FIGURE_SUFFIXES, MACHINE_DATA_SUFFIXES, RESULT_SUFFIXES, RunPaths + + +@dataclass(frozen=True) +class ArtifactRecord: + category: str + rel_path: str + filename: str + suffix: str + size_bytes: int + updated_at: str + schema: dict[str, object] = field(default_factory=dict) + + def to_dict(self) -> dict[str, object]: + return { + "category": self.category, + "rel_path": self.rel_path, + "filename": self.filename, + "suffix": self.suffix, + "size_bytes": self.size_bytes, + "updated_at": self.updated_at, + "schema": self.schema, + } + + @classmethod + def from_dict(cls, payload: dict[str, object]) -> "ArtifactRecord": + return cls( + category=str(payload.get("category", "")).strip(), + rel_path=str(payload.get("rel_path", "")).strip(), + filename=str(payload.get("filename", "")).strip(), + suffix=str(payload.get("suffix", "")).strip(), + size_bytes=int(payload.get("size_bytes", 0)), + updated_at=str(payload.get("updated_at", "")).strip(), + schema=dict(payload.get("schema", {})), + ) + + +@dataclass(frozen=True) +class ArtifactIndex: + generated_at: str + artifact_count: int + counts_by_category: dict[str, int] + artifacts: list[ArtifactRecord] + + def to_dict(self) -> dict[str, object]: + return { + "generated_at": self.generated_at, + "artifact_count": self.artifact_count, + "counts_by_category": dict(self.counts_by_category), + "artifacts": [artifact.to_dict() for artifact in self.artifacts], + } + + @classmethod + def from_dict(cls, payload: dict[str, object]) -> "ArtifactIndex": + artifacts = [ + ArtifactRecord.from_dict(item) + for item in payload.get("artifacts", []) + if isinstance(item, dict) + ] + return cls( + generated_at=str(payload.get("generated_at", "")).strip(), + artifact_count=int(payload.get("artifact_count", len(artifacts))), + counts_by_category={ + str(key): int(value) + for key, value in dict(payload.get("counts_by_category", {})).items() + }, + artifacts=artifacts, + ) + + +def write_artifact_index(paths: RunPaths) -> ArtifactIndex: + artifacts = _scan_artifacts(paths) + counts_by_category = { + category: len([artifact for artifact in artifacts if artifact.category == category]) + for category in ("data", "results", "figures") + } + index = ArtifactIndex( + generated_at=datetime.now().isoformat(timespec="seconds"), + artifact_count=len(artifacts), + counts_by_category=counts_by_category, + artifacts=artifacts, + ) + paths.artifact_index.write_text( + json.dumps(index.to_dict(), indent=2, ensure_ascii=True) + "\n", + encoding="utf-8", + ) + return index + + +def ensure_artifact_index(paths: RunPaths) -> ArtifactIndex: + index = load_artifact_index(paths.artifact_index) + if index is not None: + return index + return write_artifact_index(paths) + + +def load_artifact_index(path: Path) -> ArtifactIndex | None: + if not path.exists(): + return None + payload = json.loads(path.read_text(encoding="utf-8")) + return ArtifactIndex.from_dict(payload) + + +def format_artifact_index_for_prompt(index: ArtifactIndex, max_entries_per_category: int = 5) -> str: + if not index.artifacts: + return "No structured data, result, or figure artifacts have been indexed yet." + + lines = [ + f"Artifact index generated at: {index.generated_at}", + f"Indexed artifacts: {index.artifact_count}", + ] + for category in ("data", "results", "figures"): + entries = [artifact for artifact in index.artifacts if artifact.category == category] + if not entries: + continue + lines.append(f"\n### {category.title()}") + for artifact in entries[:max_entries_per_category]: + schema_bits = _schema_summary(artifact.schema) + suffix_label = artifact.suffix.lstrip(".") or "file" + summary = f"- `{artifact.rel_path}` ({suffix_label}, {artifact.size_bytes} bytes)" + if schema_bits: + summary += f" | {schema_bits}" + lines.append(summary) + remaining = len(entries) - max_entries_per_category + if remaining > 0: + lines.append(f"- ... {remaining} more {category} artifacts indexed.") + + return "\n".join(lines) + + +def indexed_artifacts_for_category(index: ArtifactIndex, category: str) -> list[dict[str, object]]: + return [ + artifact.to_dict() + for artifact in index.artifacts + if artifact.category == category + ] + + +def _scan_artifacts(paths: RunPaths) -> list[ArtifactRecord]: + records: list[ArtifactRecord] = [] + for category, directory, suffixes in ( + ("data", paths.data_dir, MACHINE_DATA_SUFFIXES), + ("results", paths.results_dir, RESULT_SUFFIXES), + ("figures", paths.figures_dir, FIGURE_SUFFIXES), + ): + if not directory.exists(): + continue + for path in sorted(directory.rglob("*")): + if not path.is_file() or path.suffix.lower() not in suffixes: + continue + if path.name.endswith(".schema.json"): + continue + stat = path.stat() + records.append( + ArtifactRecord( + category=category, + rel_path=str(path.relative_to(paths.workspace_root)), + filename=path.name, + suffix=path.suffix.lower(), + size_bytes=stat.st_size, + updated_at=datetime.fromtimestamp(stat.st_mtime).isoformat(timespec="seconds"), + schema=_infer_schema(path, category, paths.workspace_root), + ) + ) + return records + + +def _infer_schema(path: Path, category: str, workspace_root: Path) -> dict[str, object]: + sidecar_path = path.parent / f"{path.name}.schema.json" + if sidecar_path.exists(): + try: + declared = json.loads(sidecar_path.read_text(encoding="utf-8")) + return { + "source": "declared", + "sidecar_path": str(sidecar_path.relative_to(workspace_root)), + "definition": declared, + } + except json.JSONDecodeError: + return { + "source": "declared", + "sidecar_path": str(sidecar_path.relative_to(workspace_root)), + "error": "invalid_json", + } + + suffix = path.suffix.lower() + if suffix == ".json": + return _infer_json_schema(path) + if suffix == ".jsonl": + return _infer_jsonl_schema(path) + if suffix in {".csv", ".tsv"}: + return _infer_tabular_schema(path, delimiter="\t" if suffix == ".tsv" else ",") + if suffix in {".yaml", ".yml"}: + return {"source": "inferred", "kind": "yaml_document"} + if suffix == ".parquet": + return {"source": "inferred", "kind": "parquet_table"} + if suffix == ".npz": + return {"source": "inferred", "kind": "numpy_archive"} + if suffix == ".npy": + return {"source": "inferred", "kind": "numpy_array"} + if category == "figures": + return {"source": "inferred", "kind": "figure", "format": suffix.lstrip(".")} + return {"source": "inferred", "kind": "file"} + + +def _infer_json_schema(path: Path) -> dict[str, object]: + try: + payload = json.loads(path.read_text(encoding="utf-8")) + except json.JSONDecodeError: + return {"source": "inferred", "kind": "json", "error": "invalid_json"} + + if isinstance(payload, dict): + return { + "source": "inferred", + "kind": "object", + "keys": sorted(str(key) for key in payload.keys())[:20], + } + if isinstance(payload, list): + item_keys: set[str] = set() + for item in payload[:20]: + if isinstance(item, dict): + item_keys.update(str(key) for key in item.keys()) + schema: dict[str, object] = { + "source": "inferred", + "kind": "array", + "item_count": len(payload), + } + if item_keys: + schema["item_keys"] = sorted(item_keys) + return schema + return { + "source": "inferred", + "kind": type(payload).__name__, + } + + +def _infer_jsonl_schema(path: Path) -> dict[str, object]: + row_count = 0 + keys: set[str] = set() + with path.open("r", encoding="utf-8") as handle: + for raw_line in handle: + line = raw_line.strip() + if not line: + continue + row_count += 1 + try: + payload = json.loads(line) + except json.JSONDecodeError: + return {"source": "inferred", "kind": "jsonl", "error": "invalid_jsonl"} + if isinstance(payload, dict): + keys.update(str(key) for key in payload.keys()) + + schema: dict[str, object] = { + "source": "inferred", + "kind": "jsonl", + "row_count": row_count, + } + if keys: + schema["keys"] = sorted(keys) + return schema + + +def _infer_tabular_schema(path: Path, delimiter: str) -> dict[str, object]: + with path.open("r", encoding="utf-8", newline="") as handle: + reader = csv.reader(handle, delimiter=delimiter) + rows = list(reader) + + if not rows: + return {"source": "inferred", "kind": "table", "columns": [], "row_count": 0} + + header = [column.strip() for column in rows[0]] + return { + "source": "inferred", + "kind": "table", + "columns": header, + "row_count": max(len(rows) - 1, 0), + } + + +def _schema_summary(schema: dict[str, object]) -> str: + if not schema: + return "" + + kind = str(schema.get("kind") or schema.get("source") or "").strip() + parts: list[str] = [kind] if kind else [] + + if isinstance(schema.get("columns"), list) and schema["columns"]: + columns = ", ".join(str(column) for column in schema["columns"][:6]) + parts.append(f"columns={columns}") + if isinstance(schema.get("keys"), list) and schema["keys"]: + keys = ", ".join(str(key) for key in schema["keys"][:6]) + parts.append(f"keys={keys}") + if isinstance(schema.get("item_keys"), list) and schema["item_keys"]: + keys = ", ".join(str(key) for key in schema["item_keys"][:6]) + parts.append(f"item_keys={keys}") + if "row_count" in schema: + parts.append(f"rows={schema['row_count']}") + if "item_count" in schema: + parts.append(f"items={schema['item_count']}") + if "sidecar_path" in schema: + parts.append(f"schema={schema['sidecar_path']}") + if "error" in schema: + parts.append(f"error={schema['error']}") + + return ", ".join(part for part in parts if part) diff --git a/src/manager.py b/src/manager.py index 36c32fd..fcd4ad9 100644 --- a/src/manager.py +++ b/src/manager.py @@ -6,6 +6,7 @@ from pathlib import Path from typing import TextIO +from .artifact_index import format_artifact_index_for_prompt, write_artifact_index from .manifest import ( ensure_run_manifest, format_manifest_status, @@ -144,6 +145,7 @@ def _create_run(self, user_goal: str, venue: str | None = None) -> RunPaths: initialize_memory(paths, user_goal) config = initialize_run_config(paths, model=self.operator.model, venue=venue) initialize_run_manifest(paths) + write_artifact_index(paths) append_log_entry(paths.logs, "run_start", f"Run root: {paths.run_root}") append_log_entry( paths.logs, @@ -418,10 +420,11 @@ def _run_stage(self, paths: RunPaths, stage: StageSpec) -> bool: self._stage_file_paths(stage_markdown), ) write_stage_handoff(paths, stage, stage_markdown) + write_artifact_index(paths) append_log_entry( paths.logs, f"{stage.slug} approved", - "Stage approved and appended to memory.", + f"Stage approved and appended to memory.\nUpdated artifact index: {paths.artifact_index}", ) self._print(f"Approved {stage.stage_title}.") return True @@ -451,6 +454,14 @@ def _build_stage_prompt( + format_venue_for_prompt(paths) + "\n" ) + artifact_index = write_artifact_index(paths) + stage_template = ( + stage_template.rstrip() + + "\n\n## Structured Artifact Index\n\n" + + f"Run-wide artifact index: `{paths.artifact_index.resolve()}`\n\n" + + format_artifact_index_for_prompt(artifact_index) + + "\n" + ) if stage.slug == "07_writing": manifest = build_writing_manifest(paths) stage_template = ( diff --git a/src/utils.py b/src/utils.py index d51acdb..b310ab8 100644 --- a/src/utils.py +++ b/src/utils.py @@ -34,6 +34,7 @@ class RunPaths: memory: Path run_config: Path run_manifest: Path + artifact_index: Path logs: Path logs_raw: Path prompt_cache_dir: Path @@ -151,6 +152,7 @@ def build_run_paths(run_root: Path) -> RunPaths: memory=run_root / "memory.md", run_config=run_root / "run_config.json", run_manifest=run_root / "run_manifest.json", + artifact_index=run_root / "artifact_index.json", logs=run_root / "logs.txt", logs_raw=run_root / "logs_raw.jsonl", prompt_cache_dir=run_root / "prompt_cache", diff --git a/src/writing_manifest.py b/src/writing_manifest.py index ec64b2e..c2ff2f8 100644 --- a/src/writing_manifest.py +++ b/src/writing_manifest.py @@ -4,6 +4,7 @@ from datetime import datetime from pathlib import Path +from .artifact_index import indexed_artifacts_for_category, write_artifact_index from .utils import RunPaths @@ -12,11 +13,13 @@ def build_writing_manifest(paths: RunPaths) -> dict[str, object]: + artifact_index = write_artifact_index(paths) manifest = { "generated_at": datetime.now().isoformat(timespec="seconds"), - "figures": scan_figures(paths.figures_dir), - "result_files": scan_results(paths.results_dir), - "data_files": _scan_dir(paths.data_dir), + "artifact_index_path": str(paths.artifact_index.relative_to(paths.run_root)), + "figures": indexed_artifacts_for_category(artifact_index, "figures"), + "result_files": indexed_artifacts_for_category(artifact_index, "results"), + "data_files": indexed_artifacts_for_category(artifact_index, "data"), "stage_summaries": _collect_stage_summaries(paths), } @@ -65,20 +68,42 @@ def scan_results(results_dir: Path) -> list[dict[str, object]]: def format_manifest_for_prompt(manifest: dict[str, object]) -> str: parts: list[str] = [] + artifact_index_path = manifest.get("artifact_index_path") + if isinstance(artifact_index_path, str) and artifact_index_path.strip(): + parts.append(f"Artifact index: `{artifact_index_path}`") figures = manifest.get("figures", []) if isinstance(figures, list) and figures: - parts.append("### Available Figures") + parts.append("\n### Available Figures") for fig in figures: if isinstance(fig, dict): - parts.append(f"- `{fig['rel_path']}` ({fig['size_bytes']} bytes)") + line = f"- `{fig['rel_path']}` ({fig['size_bytes']} bytes)" + schema = _format_schema(fig.get("schema")) + if schema: + line += f" | {schema}" + parts.append(line) result_files = manifest.get("result_files", []) if isinstance(result_files, list) and result_files: parts.append("\n### Available Result Files") for result in result_files: if isinstance(result, dict): - parts.append(f"- `{result['rel_path']}` (type: {result['type']})") + line = f"- `{result['rel_path']}` (type: {result.get('suffix', '').lstrip('.') or 'file'})" + schema = _format_schema(result.get("schema")) + if schema: + line += f" | {schema}" + parts.append(line) + + data_files = manifest.get("data_files", []) + if isinstance(data_files, list) and data_files: + parts.append("\n### Available Data Files") + for data_file in data_files: + if isinstance(data_file, dict): + line = f"- `{data_file['rel_path']}`" + schema = _format_schema(data_file.get("schema")) + if schema: + line += f" | {schema}" + parts.append(line) stage_summaries = manifest.get("stage_summaries", {}) if isinstance(stage_summaries, dict) and stage_summaries: @@ -110,3 +135,25 @@ def _collect_stage_summaries(paths: RunPaths) -> dict[str, str]: if not stage_file.name.endswith(".tmp.md"): summaries[stage_file.stem] = str(stage_file.relative_to(paths.run_root)) return summaries + + +def _format_schema(schema: object) -> str: + if not isinstance(schema, dict) or not schema: + return "" + + pieces: list[str] = [] + kind = str(schema.get("kind") or schema.get("source") or "").strip() + if kind: + pieces.append(kind) + if isinstance(schema.get("columns"), list) and schema["columns"]: + pieces.append("columns=" + ", ".join(str(item) for item in schema["columns"][:6])) + if isinstance(schema.get("keys"), list) and schema["keys"]: + pieces.append("keys=" + ", ".join(str(item) for item in schema["keys"][:6])) + if "row_count" in schema: + pieces.append(f"rows={schema['row_count']}") + if "item_count" in schema: + pieces.append(f"items={schema['item_count']}") + if "sidecar_path" in schema: + pieces.append(f"schema={schema['sidecar_path']}") + + return ", ".join(pieces) diff --git a/tests/test_artifact_index.py b/tests/test_artifact_index.py new file mode 100644 index 0000000..3f7b1eb --- /dev/null +++ b/tests/test_artifact_index.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +import json +import tempfile +import unittest +from pathlib import Path + +from src.artifact_index import format_artifact_index_for_prompt, load_artifact_index, write_artifact_index +from src.utils import build_run_paths, ensure_run_layout, write_text +from src.writing_manifest import build_writing_manifest + + +class ArtifactIndexTests(unittest.TestCase): + def _build_paths(self) -> object: + tmp_dir = tempfile.TemporaryDirectory() + self.addCleanup(tmp_dir.cleanup) + run_root = Path(tmp_dir.name) / "run" + paths = build_run_paths(run_root) + ensure_run_layout(paths) + return paths + + def test_write_artifact_index_indexes_structured_workspace_artifacts(self) -> None: + paths = self._build_paths() + write_text(paths.data_dir / "dataset.csv", "id,label\n1,cat\n2,dog\n") + write_text( + paths.data_dir / "dataset.csv.schema.json", + json.dumps({"kind": "table", "columns": ["id", "label"], "primary_key": "id"}), + ) + write_text( + paths.results_dir / "metrics.jsonl", + '{"metric":"accuracy","value":0.9}\n{"metric":"loss","value":0.1}\n', + ) + (paths.figures_dir / "accuracy.png").write_bytes(b"\x89PNG fake image data") + + index = write_artifact_index(paths) + self.assertEqual(index.artifact_count, 3) + self.assertEqual(index.counts_by_category["data"], 1) + self.assertEqual(index.counts_by_category["results"], 1) + self.assertEqual(index.counts_by_category["figures"], 1) + + loaded = load_artifact_index(paths.artifact_index) + self.assertIsNotNone(loaded) + assert loaded is not None + + by_path = {artifact.rel_path: artifact for artifact in loaded.artifacts} + self.assertEqual(by_path["data/dataset.csv"].schema["source"], "declared") + self.assertEqual( + by_path["data/dataset.csv"].schema["sidecar_path"], + "data/dataset.csv.schema.json", + ) + self.assertEqual(by_path["results/metrics.jsonl"].schema["row_count"], 2) + self.assertIn("metric", by_path["results/metrics.jsonl"].schema["keys"]) + self.assertEqual(by_path["figures/accuracy.png"].schema["kind"], "figure") + + prompt_context = format_artifact_index_for_prompt(loaded) + self.assertIn("results/metrics.jsonl", prompt_context) + self.assertIn("rows=2", prompt_context) + + def test_writing_manifest_reuses_artifact_index_metadata(self) -> None: + paths = self._build_paths() + write_text(paths.data_dir / "study_design.json", '{"dataset":"demo"}') + write_text(paths.results_dir / "scores.csv", "step,score\n1,0.5\n2,0.7\n") + (paths.figures_dir / "curve.png").write_bytes(b"\x89PNG fake image data") + + manifest = build_writing_manifest(paths) + self.assertEqual(manifest["artifact_index_path"], "artifact_index.json") + + result_files = manifest["result_files"] + assert isinstance(result_files, list) + self.assertEqual(result_files[0]["rel_path"], "results/scores.csv") + self.assertEqual(result_files[0]["schema"]["row_count"], 2) + + data_files = manifest["data_files"] + assert isinstance(data_files, list) + self.assertEqual(data_files[0]["rel_path"], "data/study_design.json") + self.assertEqual(data_files[0]["schema"]["kind"], "object") + + +if __name__ == "__main__": + unittest.main()