Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ File boundaries:

## 📂 Run Layout

Each run contains `user_input.txt`, `memory.md`, `prompt_cache/`, `operator_state/`, `stages/`, `workspace/`, `logs.txt`, and `logs_raw.jsonl`. The substantive research payload lives in `workspace/`.
Each run contains `user_input.txt`, `memory.md`, `run_manifest.json`, `artifact_index.json`, `prompt_cache/`, `operator_state/`, `stages/`, `workspace/`, `logs.txt`, and `logs_raw.jsonl`. The substantive research payload lives in `workspace/`.

```mermaid
flowchart TD
Expand Down Expand Up @@ -338,6 +338,8 @@ Workspace directories:
Other run state:

- `memory.md`: approved cross-stage memory only.
- `run_manifest.json`: machine-readable run and stage lifecycle state.
- `artifact_index.json`: machine-readable index over `workspace/data`, `workspace/results`, and `workspace/figures`.
- `prompt_cache/`: exact prompts used for stage attempts and repairs.
- `operator_state/`: per-stage Claude session IDs.
- `stages/`: draft and promoted stage summaries.
Expand Down Expand Up @@ -416,7 +418,7 @@ Open work is tracked here so contributors can pick up clear, decoupled improveme
- Cross-stage rollback and invalidation. When a later stage reveals that an earlier design decision is wrong, the workflow should be able to jump back to an earlier stage and mark downstream stages as stale. This is the biggest current control-flow gap.
- Machine-readable run manifest. Add a single source of truth such as `run_manifest.json` to track stage status, approval state, stale dependencies, session IDs, and key artifact pointers. This should make both automation and future UI work much cleaner.
- Continuation handoff compression. Add a short machine-generated stage handoff file that summarizes what is already correct, what is missing, and which files matter most. This should reduce context growth and make continuation more stable over long runs.
- Result schema and artifact indexing. Standardize `workspace/data/`, `workspace/results/`, and `workspace/figures/` around explicit schemas and generate an artifact index automatically. Later stages and the UI should consume structured metadata instead of scanning ad hoc files.
- ~~Result schema and artifact indexing.~~ Standardize `workspace/data/`, `workspace/results/`, and `workspace/figures/` around explicit schemas and generate an artifact index automatically. The workflow now writes `artifact_index.json`, carries basic inferred or declared schema metadata, and feeds the index into later-stage prompt context and the writing manifest.
- Writing pipeline hardening. Turn Stage 07 into a reliable manuscript production pipeline with stable conference and journal-style paper structures, bibliography handling, table and figure inclusion, and reproducible PDF compilation. The goal is a submission-ready paper package, not just writing notes.
- Review and dissemination package. Expand Stage 08 so it produces readiness checklists, threats-to-validity notes, artifact manifests, release notes, and external-facing research bundles. The final stage should feel like packaging a paper for real release, not just wrapping up text.
- Frontend run dashboard. Build a lightweight UI that can browse runs, stage status, summaries, logs, artifacts, and validation failures. It should read from the run directory and manifest rather than introducing a database first.
Expand Down
312 changes: 312 additions & 0 deletions src/artifact_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
from __future__ import annotations

import csv
import json
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path

from .utils import FIGURE_SUFFIXES, MACHINE_DATA_SUFFIXES, RESULT_SUFFIXES, RunPaths


@dataclass(frozen=True)
class ArtifactRecord:
category: str
rel_path: str
filename: str
suffix: str
size_bytes: int
updated_at: str
schema: dict[str, object] = field(default_factory=dict)

def to_dict(self) -> dict[str, object]:
return {
"category": self.category,
"rel_path": self.rel_path,
"filename": self.filename,
"suffix": self.suffix,
"size_bytes": self.size_bytes,
"updated_at": self.updated_at,
"schema": self.schema,
}

@classmethod
def from_dict(cls, payload: dict[str, object]) -> "ArtifactRecord":
return cls(
category=str(payload.get("category", "")).strip(),
rel_path=str(payload.get("rel_path", "")).strip(),
filename=str(payload.get("filename", "")).strip(),
suffix=str(payload.get("suffix", "")).strip(),
size_bytes=int(payload.get("size_bytes", 0)),
updated_at=str(payload.get("updated_at", "")).strip(),
schema=dict(payload.get("schema", {})),
)


@dataclass(frozen=True)
class ArtifactIndex:
generated_at: str
artifact_count: int
counts_by_category: dict[str, int]
artifacts: list[ArtifactRecord]

def to_dict(self) -> dict[str, object]:
return {
"generated_at": self.generated_at,
"artifact_count": self.artifact_count,
"counts_by_category": dict(self.counts_by_category),
"artifacts": [artifact.to_dict() for artifact in self.artifacts],
}

@classmethod
def from_dict(cls, payload: dict[str, object]) -> "ArtifactIndex":
artifacts = [
ArtifactRecord.from_dict(item)
for item in payload.get("artifacts", [])
if isinstance(item, dict)
]
return cls(
generated_at=str(payload.get("generated_at", "")).strip(),
artifact_count=int(payload.get("artifact_count", len(artifacts))),
counts_by_category={
str(key): int(value)
for key, value in dict(payload.get("counts_by_category", {})).items()
},
artifacts=artifacts,
)


def write_artifact_index(paths: RunPaths) -> ArtifactIndex:
artifacts = _scan_artifacts(paths)
counts_by_category = {
category: len([artifact for artifact in artifacts if artifact.category == category])
for category in ("data", "results", "figures")
}
index = ArtifactIndex(
generated_at=datetime.now().isoformat(timespec="seconds"),
artifact_count=len(artifacts),
counts_by_category=counts_by_category,
artifacts=artifacts,
)
paths.artifact_index.write_text(
json.dumps(index.to_dict(), indent=2, ensure_ascii=True) + "\n",
encoding="utf-8",
)
return index


def ensure_artifact_index(paths: RunPaths) -> ArtifactIndex:
index = load_artifact_index(paths.artifact_index)
if index is not None:
return index
return write_artifact_index(paths)


def load_artifact_index(path: Path) -> ArtifactIndex | None:
if not path.exists():
return None
payload = json.loads(path.read_text(encoding="utf-8"))
return ArtifactIndex.from_dict(payload)


def format_artifact_index_for_prompt(index: ArtifactIndex, max_entries_per_category: int = 5) -> str:
if not index.artifacts:
return "No structured data, result, or figure artifacts have been indexed yet."

lines = [
f"Artifact index generated at: {index.generated_at}",
f"Indexed artifacts: {index.artifact_count}",
]
for category in ("data", "results", "figures"):
entries = [artifact for artifact in index.artifacts if artifact.category == category]
if not entries:
continue
lines.append(f"\n### {category.title()}")
for artifact in entries[:max_entries_per_category]:
schema_bits = _schema_summary(artifact.schema)
suffix_label = artifact.suffix.lstrip(".") or "file"
summary = f"- `{artifact.rel_path}` ({suffix_label}, {artifact.size_bytes} bytes)"
if schema_bits:
summary += f" | {schema_bits}"
lines.append(summary)
remaining = len(entries) - max_entries_per_category
if remaining > 0:
lines.append(f"- ... {remaining} more {category} artifacts indexed.")

return "\n".join(lines)


def indexed_artifacts_for_category(index: ArtifactIndex, category: str) -> list[dict[str, object]]:
return [
artifact.to_dict()
for artifact in index.artifacts
if artifact.category == category
]


def _scan_artifacts(paths: RunPaths) -> list[ArtifactRecord]:
records: list[ArtifactRecord] = []
for category, directory, suffixes in (
("data", paths.data_dir, MACHINE_DATA_SUFFIXES),
("results", paths.results_dir, RESULT_SUFFIXES),
("figures", paths.figures_dir, FIGURE_SUFFIXES),
):
if not directory.exists():
continue
for path in sorted(directory.rglob("*")):
if not path.is_file() or path.suffix.lower() not in suffixes:
continue
if path.name.endswith(".schema.json"):
continue
stat = path.stat()
records.append(
ArtifactRecord(
category=category,
rel_path=str(path.relative_to(paths.workspace_root)),
filename=path.name,
suffix=path.suffix.lower(),
size_bytes=stat.st_size,
updated_at=datetime.fromtimestamp(stat.st_mtime).isoformat(timespec="seconds"),
schema=_infer_schema(path, category, paths.workspace_root),
)
)
return records


def _infer_schema(path: Path, category: str, workspace_root: Path) -> dict[str, object]:
sidecar_path = path.parent / f"{path.name}.schema.json"
if sidecar_path.exists():
try:
declared = json.loads(sidecar_path.read_text(encoding="utf-8"))
return {
"source": "declared",
"sidecar_path": str(sidecar_path.relative_to(workspace_root)),
"definition": declared,
}
except json.JSONDecodeError:
return {
"source": "declared",
"sidecar_path": str(sidecar_path.relative_to(workspace_root)),
"error": "invalid_json",
}

suffix = path.suffix.lower()
if suffix == ".json":
return _infer_json_schema(path)
if suffix == ".jsonl":
return _infer_jsonl_schema(path)
if suffix in {".csv", ".tsv"}:
return _infer_tabular_schema(path, delimiter="\t" if suffix == ".tsv" else ",")
if suffix in {".yaml", ".yml"}:
return {"source": "inferred", "kind": "yaml_document"}
if suffix == ".parquet":
return {"source": "inferred", "kind": "parquet_table"}
if suffix == ".npz":
return {"source": "inferred", "kind": "numpy_archive"}
if suffix == ".npy":
return {"source": "inferred", "kind": "numpy_array"}
if category == "figures":
return {"source": "inferred", "kind": "figure", "format": suffix.lstrip(".")}
return {"source": "inferred", "kind": "file"}


def _infer_json_schema(path: Path) -> dict[str, object]:
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {"source": "inferred", "kind": "json", "error": "invalid_json"}

if isinstance(payload, dict):
return {
"source": "inferred",
"kind": "object",
"keys": sorted(str(key) for key in payload.keys())[:20],
}
if isinstance(payload, list):
item_keys: set[str] = set()
for item in payload[:20]:
if isinstance(item, dict):
item_keys.update(str(key) for key in item.keys())
schema: dict[str, object] = {
"source": "inferred",
"kind": "array",
"item_count": len(payload),
}
if item_keys:
schema["item_keys"] = sorted(item_keys)
return schema
return {
"source": "inferred",
"kind": type(payload).__name__,
}


def _infer_jsonl_schema(path: Path) -> dict[str, object]:
row_count = 0
keys: set[str] = set()
with path.open("r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line:
continue
row_count += 1
try:
payload = json.loads(line)
except json.JSONDecodeError:
return {"source": "inferred", "kind": "jsonl", "error": "invalid_jsonl"}
if isinstance(payload, dict):
keys.update(str(key) for key in payload.keys())

schema: dict[str, object] = {
"source": "inferred",
"kind": "jsonl",
"row_count": row_count,
}
if keys:
schema["keys"] = sorted(keys)
return schema


def _infer_tabular_schema(path: Path, delimiter: str) -> dict[str, object]:
with path.open("r", encoding="utf-8", newline="") as handle:
reader = csv.reader(handle, delimiter=delimiter)
rows = list(reader)

if not rows:
return {"source": "inferred", "kind": "table", "columns": [], "row_count": 0}

header = [column.strip() for column in rows[0]]
return {
"source": "inferred",
"kind": "table",
"columns": header,
"row_count": max(len(rows) - 1, 0),
}


def _schema_summary(schema: dict[str, object]) -> str:
if not schema:
return ""

kind = str(schema.get("kind") or schema.get("source") or "").strip()
parts: list[str] = [kind] if kind else []

if isinstance(schema.get("columns"), list) and schema["columns"]:
columns = ", ".join(str(column) for column in schema["columns"][:6])
parts.append(f"columns={columns}")
if isinstance(schema.get("keys"), list) and schema["keys"]:
keys = ", ".join(str(key) for key in schema["keys"][:6])
parts.append(f"keys={keys}")
if isinstance(schema.get("item_keys"), list) and schema["item_keys"]:
keys = ", ".join(str(key) for key in schema["item_keys"][:6])
parts.append(f"item_keys={keys}")
if "row_count" in schema:
parts.append(f"rows={schema['row_count']}")
if "item_count" in schema:
parts.append(f"items={schema['item_count']}")
if "sidecar_path" in schema:
parts.append(f"schema={schema['sidecar_path']}")
if "error" in schema:
parts.append(f"error={schema['error']}")

return ", ".join(part for part in parts if part)
13 changes: 12 additions & 1 deletion src/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pathlib import Path
from typing import TextIO

from .artifact_index import format_artifact_index_for_prompt, write_artifact_index
from .manifest import (
ensure_run_manifest,
format_manifest_status,
Expand Down Expand Up @@ -144,6 +145,7 @@ def _create_run(self, user_goal: str, venue: str | None = None) -> RunPaths:
initialize_memory(paths, user_goal)
config = initialize_run_config(paths, model=self.operator.model, venue=venue)
initialize_run_manifest(paths)
write_artifact_index(paths)
append_log_entry(paths.logs, "run_start", f"Run root: {paths.run_root}")
append_log_entry(
paths.logs,
Expand Down Expand Up @@ -418,10 +420,11 @@ def _run_stage(self, paths: RunPaths, stage: StageSpec) -> bool:
self._stage_file_paths(stage_markdown),
)
write_stage_handoff(paths, stage, stage_markdown)
write_artifact_index(paths)
append_log_entry(
paths.logs,
f"{stage.slug} approved",
"Stage approved and appended to memory.",
f"Stage approved and appended to memory.\nUpdated artifact index: {paths.artifact_index}",
)
self._print(f"Approved {stage.stage_title}.")
return True
Expand Down Expand Up @@ -451,6 +454,14 @@ def _build_stage_prompt(
+ format_venue_for_prompt(paths)
+ "\n"
)
artifact_index = write_artifact_index(paths)
stage_template = (
stage_template.rstrip()
+ "\n\n## Structured Artifact Index\n\n"
+ f"Run-wide artifact index: `{paths.artifact_index.resolve()}`\n\n"
+ format_artifact_index_for_prompt(artifact_index)
+ "\n"
)
if stage.slug == "07_writing":
manifest = build_writing_manifest(paths)
stage_template = (
Expand Down
2 changes: 2 additions & 0 deletions src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class RunPaths:
memory: Path
run_config: Path
run_manifest: Path
artifact_index: Path
logs: Path
logs_raw: Path
prompt_cache_dir: Path
Expand Down Expand Up @@ -151,6 +152,7 @@ def build_run_paths(run_root: Path) -> RunPaths:
memory=run_root / "memory.md",
run_config=run_root / "run_config.json",
run_manifest=run_root / "run_manifest.json",
artifact_index=run_root / "artifact_index.json",
logs=run_root / "logs.txt",
logs_raw=run_root / "logs_raw.jsonl",
prompt_cache_dir=run_root / "prompt_cache",
Expand Down
Loading