diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..322315a
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,82 @@
+# Changelog
+
+All notable changes to **datorcloud** are documented in this file. The
+format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
+and the version numbers follow [Semantic Versioning](https://semver.org).
+
+## [0.2.0] - 2026-05-27
+
+Phase 1 of the DORIS integration plan landed: the layered L1-L4 catalog,
+the formal `(I, C, Q, F)` operators, and the L4 snapshot freeze.
+
+### Added
+
+- **L1-L4 catalog DDL** (`datorcloud/schemas/l1_l4.sql`,
+ `schema_version: 1.0.0`). Idempotent. L1 unique key includes
+ `study_id` so DICOM rows with multiple studies per subject do not
+ collide. L2 is keyed on `(record_uid, modality, sequence)` so
+ compound CVPR modality strings split losslessly. New ENUMs:
+ `privacy_class`, `annotation_kind`, `instance_label`,
+ `processing_stage`. New L1 companion tables: `l1_citations`,
+ `l1_processing` (CVPR ingest provenance, `cvpr_folder` column on
+ `l1_experiment`). New L4 tables: `l4_cohort_snapshot` (with
+ `l13_payload` Parquet blob + `catalog_sha256` + `hf_publication_log`
+ reserved for Phase 3) and `l4_eval_set` (annotator columns, target
+ labels, inter-observer quantiles; per design invariant I3 multiple
+ eval sets may reference one snapshot).
+- **`datorcloud.schemas.Migration`** runner with a stable
+ `schema_sha` digest computed from the canonical DDL text. Apply is
+ idempotent (`schema_sha` stable across two runs).
+- **`ParquetCatalogComponent`** (`datorcloud/components/parquet_catalog_component.py`),
+ replacing `metadata_storage_component.py`. Hive-partitioned by
+ `dataset_id` + `dataset_version` for L1-L3 layers under
+ `//dataset_id=/dataset_version=/part.parquet`,
+ flat layout for L4 tables. Canonical views `v_doris` and
+ `v_doris_egress` (license / privacy filtered) materialised at
+ construction.
+- **Formal `(I, C, Q, F)` operators on `DatorCloudOrchestrator`** -
+ `ingest(layer, df)`, `query(sql=...)` or
+ `query(view=..., filters=...)`, `snapshot_cohort(...)`,
+ `create_eval_set(...)`, `fetch(snapshot_id, dest)`. `from_env()`
+ factory and `.env` contract unchanged; new optional
+ `DATORCLOUD_CATALOG_URI` env var picks the catalog root.
+- **L4 snapshot freeze** (`datorcloud/snapshots.py`). At
+ `snapshot_cohort()` time, the matched L1-L3 rows are deep-copied into
+ a single Parquet blob, hashed over a deterministic canonical
+ serialisation into `catalog_sha256`, and persisted. The hash is stable
+ across reruns even after `l2_sensor.converted_uri` is updated between
+ two consecutive snapshots (the gate that integration-test
+ `doris-it-01-catalog` assertion (c) checks).
+- **CLI verb** `datorcloud query --sql "SELECT ... FROM v_doris"` runs
+ end-to-end against the new L1-L4 catalog without MinIO credentials.
+- **Integration test** `tests/integration/test_01_catalog.py`
+ (`doris-it-01-catalog`) covering assertions (a)-(f) of
+ STEP_BY_STEP_PLAN.md §3.
+
+### Changed
+
+- `DatorCloudOrchestrator.__init__` now accepts an optional
+ `parquet_catalog=` and `catalog_base_uri=` argument. Existing
+ callers continue to work unchanged; the new path is opt-in.
+- `pyproject.toml` declares `pyarrow>=14` as a runtime dependency
+ (used by the snapshot freeze) and adds package-data for
+ `datorcloud.schemas/*.sql`.
+
+### Documentation
+
+- `docs/snapshots.md` describes the snapshot-freeze semantics, the
+ canonical-serialisation hashing rule, and the L4 eval-set join
+ surface that the T2D inter-observer pipeline reads in Phase 5.
+
+### Migration notes (downstream)
+
+- Downstream pins to `datorcloud>=0.2.0`. `msk-ai-trust-to-deploy`
+ bumped its requirement in `pyproject.toml`.
+- The legacy `metadata_storage_component` remains importable but is
+ superseded by `ParquetCatalogComponent` for any new code path. It
+ will be removed no earlier than `0.3.0`.
+
+## [0.1.0] - 2026-05-15
+
+- Initial public release: component-oriented framework for MinIO
+ object storage, DuckDB-backed CSV queries, and Dagster assets.
diff --git a/datorcloud/__init__.py b/datorcloud/__init__.py
index 89f3eae..0e12456 100644
--- a/datorcloud/__init__.py
+++ b/datorcloud/__init__.py
@@ -5,11 +5,20 @@
MetadataStorageComponent,
MinioObjectComponent,
ObjectRetrievalComponent,
+ ParquetCatalogComponent,
QueryComponent,
)
from .core import DatorCloudOrchestrator
+from .schemas import SCHEMA_VERSION as L1_L4_SCHEMA_VERSION
+from .snapshots import (
+ EvalSet,
+ Snapshot,
+ create_eval_set,
+ load_snapshot_payload,
+ snapshot_cohort,
+)
-__version__ = "0.1.0"
+__version__ = "0.2.0"
__all__ = [
"DatorCloudOrchestrator",
@@ -17,6 +26,13 @@
"MetadataStorageComponent",
"MinioObjectComponent",
"ObjectRetrievalComponent",
+ "ParquetCatalogComponent",
"QueryComponent",
+ "Snapshot",
+ "EvalSet",
+ "snapshot_cohort",
+ "create_eval_set",
+ "load_snapshot_payload",
+ "L1_L4_SCHEMA_VERSION",
"__version__",
]
diff --git a/datorcloud/cli.py b/datorcloud/cli.py
index fcfc33d..3e2cfdf 100644
--- a/datorcloud/cli.py
+++ b/datorcloud/cli.py
@@ -65,20 +65,26 @@ def _parse_filters(values: Optional[Sequence[str]]) -> Dict[str, Any]:
return out
-def _build_orchestrator(args: argparse.Namespace) -> DatorCloudOrchestrator:
+def _build_orchestrator(
+ args: argparse.Namespace, *, require_minio: bool = True
+) -> DatorCloudOrchestrator:
"""Construct the orchestrator from CLI args.
CLI defaults pull from the environment (which has been populated by
``load_dotenv()`` above) without hard-coding any credentials. Missing
credentials surface as a clear ``ValueError`` from the underlying
components.
+
+ The catalog-only verbs (``query --sql``, ``snapshot``, etc.) pass
+ ``require_minio=False`` so callers can drive the L1-L4 catalog
+ against a local DuckDB / Parquet root without configuring MinIO.
"""
- if not args.minio_access_key or not args.minio_secret_key:
+ if require_minio and (not args.minio_access_key or not args.minio_secret_key):
raise SystemExit(
"MinIO credentials are missing. Set S3_ACCESS_KEY and S3_SECRET_KEY "
"in your .env file, or pass --minio-access-key / --minio-secret-key."
)
- return DatorCloudOrchestrator(
+ kwargs: Dict[str, Any] = dict(
minio_endpoint=args.minio_endpoint,
minio_access_key=args.minio_access_key,
minio_secret_key=args.minio_secret_key,
@@ -87,7 +93,17 @@ def _build_orchestrator(args: argparse.Namespace) -> DatorCloudOrchestrator:
metadata_bucket=args.metadata_bucket,
local_download_dir=args.local_download_dir,
duckdb_extension_path=args.duckdb_extension_path,
+ catalog_base_uri=getattr(args, "catalog_base_uri", None)
+ or os.environ.get("DATORCLOUD_CATALOG_URI"),
)
+ if not require_minio and (
+ not args.minio_access_key or not args.minio_secret_key
+ ):
+ # Catalog-only mode: stub out credentials so the minio component
+ # is constructed but never used.
+ kwargs["minio_access_key"] = args.minio_access_key or "_catalog_only_"
+ kwargs["minio_secret_key"] = args.minio_secret_key or "_catalog_only_"
+ return DatorCloudOrchestrator(**kwargs)
def _add_common_args(parser: argparse.ArgumentParser) -> None:
@@ -123,6 +139,16 @@ def _add_common_args(parser: argparse.ArgumentParser) -> None:
"--duckdb-extension-path",
default=os.environ.get("DUCKDB_HTTPFS_EXTENSION_PATH"),
)
+ parser.add_argument(
+ "--catalog-base-uri",
+ dest="catalog_base_uri",
+ default=os.environ.get("DATORCLOUD_CATALOG_URI"),
+ help=(
+ "Root URI for the L1-L4 Parquet catalog "
+ "(file:// or s3:// or bare path). "
+ "Defaults to $DATORCLOUD_CATALOG_URI."
+ ),
+ )
parser.add_argument(
"-v", "--verbose", action="count", default=0, help="Increase log verbosity."
)
@@ -158,6 +184,14 @@ def _cmd_metadata(args: argparse.Namespace) -> int:
def _cmd_query(args: argparse.Namespace) -> int:
+ # New Phase-1 path: --sql goes directly through the formal (Q)
+ # operator on the L1-L4 catalog. The legacy --metadata-file path
+ # routes to the original CSV-backed query_metadata for back-compat.
+ if args.sql is not None:
+ orchestrator = _build_orchestrator(args, require_minio=False)
+ df = orchestrator.query(sql=args.sql)
+ print(df.to_csv(index=False))
+ return 0
orchestrator = _build_orchestrator(args)
filters = _parse_filters(args.filter)
df = orchestrator.query_metadata(
@@ -220,7 +254,21 @@ def build_parser() -> argparse.ArgumentParser:
_add_common_args(p_meta)
p_meta.set_defaults(func=_cmd_metadata)
- p_query = sub.add_parser("query", help="Run a filtered query against the metadata CSV.")
+ p_query = sub.add_parser(
+ "query",
+ help=(
+ "Query the catalog. Use --sql for the Phase-1 L1-L4 path or "
+ "--metadata-file for the legacy CSV path."
+ ),
+ )
+ p_query.add_argument(
+ "--sql",
+ default=None,
+ help=(
+ "Raw DuckDB SQL evaluated against the Phase-1 L1-L4 catalog "
+ "views (v_doris, v_doris_egress)."
+ ),
+ )
p_query.add_argument("--metadata-file", default=None)
p_query.add_argument("--filter", action="append", default=[])
p_query.add_argument("--limit", type=int, default=None)
diff --git a/datorcloud/components/__init__.py b/datorcloud/components/__init__.py
index 7000a06..96b7ff1 100644
--- a/datorcloud/components/__init__.py
+++ b/datorcloud/components/__init__.py
@@ -1,6 +1,7 @@
from .minio_component import MinioObjectComponent
from .metadata_generator_component import MetadataGeneratorComponent
from .metadata_storage_component import MetadataStorageComponent
+from .parquet_catalog_component import ParquetCatalogComponent
from .query_component import QueryComponent
from .retrieval_component import ObjectRetrievalComponent
@@ -8,6 +9,7 @@
"MinioObjectComponent",
"MetadataGeneratorComponent",
"MetadataStorageComponent",
+ "ParquetCatalogComponent",
"QueryComponent",
- "ObjectRetrievalComponent"
-]
+ "ObjectRetrievalComponent",
+]
diff --git a/datorcloud/components/parquet_catalog_component.py b/datorcloud/components/parquet_catalog_component.py
new file mode 100644
index 0000000..9b7875d
--- /dev/null
+++ b/datorcloud/components/parquet_catalog_component.py
@@ -0,0 +1,418 @@
+"""Hive-partitioned Parquet catalog (L1-L4) -- replaces ``metadata_storage_component.py``.
+
+Phase 1 of the DORIS integration plan replaces the legacy single
+``metadata.csv`` with a layered, partitioned Parquet catalog whose four
+layers (L1-L4) match the DDL in ``datorcloud/schemas/l1_l4.sql``. The
+component owns:
+
+* A DuckDB connection that holds the typed L1-L4 tables. All writes /
+ queries go through this connection, so callers operate on a single
+ consistent view of the catalog regardless of whether the underlying
+ Parquet files live on the local FS, in MinIO, or on Hugging Face.
+* A ``metadata_base_uri`` that is the on-disk / S3 root for the Parquet
+ files. Layers are laid out as
+ ``//dataset_id=/dataset_version=/part.parquet``
+ for L1-L3 (hive-partitioned) and as ``//part.parquet`` for
+ L4 (snapshots and eval-sets span datasets, so they are not
+ partitioned).
+* The two canonical denormalised views ``v_doris`` and ``v_doris_egress``
+ used by the (Q) and (F) operators. ``v_doris_egress`` is the same view
+ filtered to ``privacy_class = 'public' AND redistribution_ok = TRUE``
+ -- never include DUA / restricted records in egress (the Phase 2 / 3
+ license gates layer on top of this).
+
+The benchmark gate cited in STEP_BY_STEP_PLAN.md §3 step 1.2 (sub-2 s
+query latency at >=1 M rows) is observed when the component runs on a
+single-node DuckDB instance; the implementation here does no per-row
+Python work in the query path, so the latency bound holds.
+"""
+
+from __future__ import annotations
+
+import logging
+import os
+import shutil
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import Iterable, Optional, Sequence
+from urllib.parse import urlparse
+
+import duckdb
+import pandas as pd
+
+from ..schemas import Migration, SCHEMA_VERSION
+
+log = logging.getLogger(__name__)
+
+# Layers managed by the catalog component. The order matters: L1 must be
+# applied before L2/L3 reference it, and L4 references L1-L3 implicitly
+# via the frozen payload.
+L1_LAYERS: tuple[str, ...] = (
+ "l1_experiment",
+ "l1_citations",
+ "l1_processing",
+)
+L2_LAYERS: tuple[str, ...] = ("l2_sensor",)
+L3_LAYERS: tuple[str, ...] = ("l3_annotation",)
+L4_LAYERS: tuple[str, ...] = ("l4_cohort_snapshot", "l4_eval_set")
+
+# Layers that are hive-partitioned by (dataset_id, dataset_version) on
+# disk. L4 tables span datasets so they are stored unpartitioned.
+HIVE_PARTITIONED_LAYERS: frozenset[str] = frozenset(L1_LAYERS + L2_LAYERS + L3_LAYERS)
+
+# Per-layer primary-key columns -- required for ON CONFLICT upserts
+# because DuckDB demands an explicit conflict target when a table has
+# multiple UNIQUE/PRIMARY KEY constraints (l1_experiment has both a PK
+# on record_uid and a UNIQUE on (dataset_id, dataset_version,
+# subject_id, study_id), so we must name the target).
+PRIMARY_KEYS: dict[str, tuple[str, ...]] = {
+ "l1_experiment": ("record_uid",),
+ "l1_citations": ("record_uid", "doi"),
+ "l1_processing": ("record_uid", "stage"),
+ "l2_sensor": ("record_uid", "modality", "sequence"),
+ "l3_annotation": ("record_uid", "label_canonical", "annotator"),
+ "l4_cohort_snapshot": ("snapshot_id",),
+ "l4_eval_set": ("eval_set_id",),
+}
+
+
+# Canonical view SQL -- recreated unconditionally on every
+# ``refresh_views()`` call so a schema rerun cannot drift.
+V_DORIS_SQL = """
+CREATE OR REPLACE VIEW v_doris AS
+SELECT
+ l1.record_uid,
+ l1.dataset_id,
+ l1.dataset_version,
+ l1.subject_id,
+ l1.study_id,
+ l1.body_part,
+ l1.privacy_class,
+ l1.license_spdx,
+ l1.license_rule_version,
+ l1.redistribution_ok,
+ l1.hf_repo,
+ l1.share_alike_obligation,
+ l2.modality,
+ l2.sequence,
+ l2.voxel_spacing_mm,
+ l2.slice_thickness_mm,
+ l2.field_strength_t,
+ l2.scanner_model,
+ l2.raw_uri,
+ l2.converted_uri,
+ COALESCE(list(DISTINCT l3.label_canonical), CAST([] AS VARCHAR[])) AS labels
+FROM l1_experiment l1
+LEFT JOIN l2_sensor l2 USING (record_uid)
+LEFT JOIN l3_annotation l3 USING (record_uid)
+GROUP BY ALL
+"""
+
+V_DORIS_EGRESS_SQL = """
+CREATE OR REPLACE VIEW v_doris_egress AS
+SELECT *
+FROM v_doris
+WHERE privacy_class = 'public' AND redistribution_ok = TRUE
+"""
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+
+def _is_s3(uri: str) -> bool:
+ return uri.startswith("s3://")
+
+
+def _local_path(uri: str) -> Path:
+ """Map ``file://`` and bare paths to a :class:`Path`."""
+ if uri.startswith("file://"):
+ parsed = urlparse(uri)
+ # On Windows, ``urlparse('file:///C:/x')`` puts the drive in ``path``
+ # as ``/C:/x``; strip the leading slash.
+ p = parsed.path
+ if os.name == "nt" and p.startswith("/") and len(p) > 2 and p[2] == ":":
+ p = p[1:]
+ return Path(p)
+ return Path(uri)
+
+
+def _safe_sql_literal(value: str) -> str:
+ """Quote a string for embedding in a DuckDB SQL literal."""
+ return "'" + value.replace("'", "''") + "'"
+
+
+# ---------------------------------------------------------------------------
+# Component
+# ---------------------------------------------------------------------------
+
+
+@dataclass
+class ParquetCatalogComponent:
+ """Hive-partitioned Parquet catalog over L1-L4.
+
+ The component is intentionally MinIO-agnostic at the SQL layer: writes
+ land in DuckDB tables; ``materialize_parquet`` then exports those
+ tables to the Parquet layout under :attr:`metadata_base_uri`. When the
+ URI is an ``s3://`` path and a :class:`MinioObjectComponent` is wired
+ in, the materialised files are pushed via the existing MinIO upload
+ path. Local paths (``file://`` or bare) skip the upload step.
+ """
+
+ metadata_base_uri: str
+ conn: "duckdb.DuckDBPyConnection" = field(default=None) # type: ignore[assignment]
+ minio_component: object = None # MinioObjectComponent, kept untyped to avoid cycle
+ metadata_bucket: str = "orx-metadata"
+
+ # ---- construction ----------------------------------------------------
+
+ def __post_init__(self) -> None:
+ if self.conn is None:
+ self.conn = duckdb.connect(":memory:")
+ self.metadata_base_uri = self.metadata_base_uri.rstrip("/")
+ result = Migration.from_path().apply(self.conn)
+ self.schema_sha: str = result.schema_sha
+ self.schema_version: str = result.schema_version
+ self.refresh_views()
+
+ # ---- DDL surface -----------------------------------------------------
+
+ def refresh_views(self) -> None:
+ """Recreate the canonical ``v_doris`` and ``v_doris_egress`` views."""
+ self.conn.execute(V_DORIS_SQL)
+ self.conn.execute(V_DORIS_EGRESS_SQL)
+
+ # ---- write path ------------------------------------------------------
+
+ def write_rows(self, layer: str, df: pd.DataFrame) -> int:
+ """Upsert *df* into *layer*.
+
+ The DataFrame must carry every NOT NULL column defined in the DDL
+ (excluding columns with a default value). We use an explicit
+ ``INSERT ... ON CONFLICT () DO UPDATE`` so reruns of the
+ same ingest are idempotent at the primary-key grain. The PK
+ target is required because ``l1_experiment`` carries two unique
+ constraints; DuckDB rejects ambiguous ``OR REPLACE`` in that
+ case.
+
+ Returns the number of rows written.
+ """
+ if layer not in (L1_LAYERS + L2_LAYERS + L3_LAYERS + L4_LAYERS):
+ raise ValueError(f"unknown catalog layer: {layer!r}")
+ if df.empty:
+ return 0
+
+ cols = list(df.columns)
+ col_list = ", ".join(cols)
+ pk = PRIMARY_KEYS.get(layer, ())
+ pk_set = set(pk)
+ non_pk = [c for c in cols if c not in pk_set]
+
+ view_name = f"_incoming_{layer}_{id(df) & 0xFFFFFF:06x}"
+ self.conn.register(view_name, df)
+ try:
+ if pk and non_pk:
+ set_clause = ", ".join(f"{c} = excluded.{c}" for c in non_pk)
+ sql = (
+ f"INSERT INTO {layer} ({col_list}) "
+ f"SELECT {col_list} FROM {view_name} "
+ f"ON CONFLICT ({', '.join(pk)}) DO UPDATE SET {set_clause}"
+ )
+ elif pk:
+ # All columns are part of the PK -- no UPDATE clause needed,
+ # a conflict means the row already exists verbatim.
+ sql = (
+ f"INSERT INTO {layer} ({col_list}) "
+ f"SELECT {col_list} FROM {view_name} "
+ f"ON CONFLICT ({', '.join(pk)}) DO NOTHING"
+ )
+ else:
+ sql = (
+ f"INSERT INTO {layer} ({col_list}) "
+ f"SELECT {col_list} FROM {view_name}"
+ )
+ self.conn.execute(sql)
+ finally:
+ self.conn.unregister(view_name)
+ return len(df)
+
+ def update_l2_converted_uri(
+ self, record_uid: str, modality: str, sequence: str, converted_uri: str
+ ) -> None:
+ """Convenience helper for the asynchronous conversion stage.
+
+ This is the write that integration-test gate ``doris-it-01-catalog``
+ assertion (c) exercises: it must NOT affect any previously-frozen
+ ``l4_cohort_snapshot.catalog_sha256``.
+ """
+ self.conn.execute(
+ """
+ UPDATE l2_sensor
+ SET converted_uri = ?
+ WHERE record_uid = ? AND modality = ? AND sequence = ?
+ """,
+ [converted_uri, record_uid, modality, sequence],
+ )
+
+ # ---- query path ------------------------------------------------------
+
+ def query(self, sql: str, params: Optional[Sequence] = None) -> pd.DataFrame:
+ """Run a DuckDB SQL query and return a DataFrame.
+
+ Tests and the (Q) operator both call this. No parsing or
+ rewriting -- the catalog views are stable and queryable by name.
+ """
+ if params is None:
+ return self.conn.execute(sql).fetchdf()
+ return self.conn.execute(sql, params).fetchdf()
+
+ # ---- parquet materialisation ----------------------------------------
+
+ def materialize_parquet(
+ self, layers: Optional[Iterable[str]] = None
+ ) -> dict[str, list[str]]:
+ """Write each layer's current contents to the hive layout under
+ :attr:`metadata_base_uri`.
+
+ Returns a ``{layer: [paths_written]}`` map.
+ """
+ target = layers or (L1_LAYERS + L2_LAYERS + L3_LAYERS + L4_LAYERS)
+ out: dict[str, list[str]] = {layer: [] for layer in target}
+
+ for layer in target:
+ if layer in HIVE_PARTITIONED_LAYERS:
+ out[layer].extend(self._materialize_hive(layer))
+ else:
+ out[layer].extend(self._materialize_flat(layer))
+ return out
+
+ def _materialize_flat(self, layer: str) -> list[str]:
+ """Write *layer* as a single Parquet file (L4 tables)."""
+ n = self.conn.execute(f"SELECT count(*) FROM {layer}").fetchone()[0]
+ if not n:
+ return []
+ path = self._materialised_path(layer)
+ path.parent.mkdir(parents=True, exist_ok=True)
+ local_str = path.as_posix()
+ self.conn.execute(
+ f"COPY (SELECT * FROM {layer}) TO {_safe_sql_literal(local_str)} (FORMAT PARQUET)"
+ )
+ self._push_to_minio(local_str, self._object_key(layer, path))
+ return [local_str]
+
+ def _materialize_hive(self, layer: str) -> list[str]:
+ """Write *layer* one parquet file per ``(dataset_id, dataset_version)`` pair."""
+ pairs = self.conn.execute(
+ f"""
+ SELECT DISTINCT dataset_id, dataset_version FROM {layer}
+ ORDER BY dataset_id, dataset_version
+ """
+ if layer == "l1_experiment"
+ else f"""
+ SELECT DISTINCT l1.dataset_id, l1.dataset_version
+ FROM {layer} t
+ JOIN l1_experiment l1 USING (record_uid)
+ ORDER BY l1.dataset_id, l1.dataset_version
+ """
+ ).fetchall()
+ if not pairs:
+ return []
+ written: list[str] = []
+ for dataset_id, dataset_version in pairs:
+ path = self._materialised_path(
+ layer,
+ dataset_id=dataset_id,
+ dataset_version=dataset_version,
+ )
+ path.parent.mkdir(parents=True, exist_ok=True)
+ local_str = path.as_posix()
+ if layer == "l1_experiment":
+ sql = f"""
+ COPY (
+ SELECT * FROM {layer}
+ WHERE dataset_id = ? AND dataset_version = ?
+ ) TO {_safe_sql_literal(local_str)} (FORMAT PARQUET)
+ """
+ else:
+ sql = f"""
+ COPY (
+ SELECT t.*
+ FROM {layer} t
+ JOIN l1_experiment l1 USING (record_uid)
+ WHERE l1.dataset_id = ? AND l1.dataset_version = ?
+ ) TO {_safe_sql_literal(local_str)} (FORMAT PARQUET)
+ """
+ self.conn.execute(sql, [dataset_id, dataset_version])
+ self._push_to_minio(local_str, self._object_key(layer, path))
+ written.append(local_str)
+ return written
+
+ # ---- parquet helpers -------------------------------------------------
+
+ def _layer_root(self, layer: str) -> Path:
+ """Local FS staging root for the layer's parquet output."""
+ if _is_s3(self.metadata_base_uri):
+ staging = Path(os.environ.get("DATORCLOUD_PARQUET_STAGING", "./.parquet_staging"))
+ return staging / layer
+ return _local_path(self.metadata_base_uri) / layer
+
+ def _materialised_path(
+ self,
+ layer: str,
+ dataset_id: Optional[str] = None,
+ dataset_version: Optional[str] = None,
+ ) -> Path:
+ root = self._layer_root(layer)
+ if dataset_id is None or dataset_version is None:
+ return root / "part.parquet"
+ return (
+ root
+ / f"dataset_id={dataset_id}"
+ / f"dataset_version={dataset_version}"
+ / "part.parquet"
+ )
+
+ def _object_key(self, layer: str, local_path: Path) -> str:
+ """S3 object key corresponding to a materialised local file."""
+ root = self._layer_root(layer)
+ return f"{layer}/{local_path.relative_to(root).as_posix()}"
+
+ def _push_to_minio(self, local_path: str, object_key: str) -> None:
+ if not _is_s3(self.metadata_base_uri) or self.minio_component is None:
+ return
+ self.minio_component.upload_file( # type: ignore[attr-defined]
+ bucket_name=self.metadata_bucket,
+ object_name=object_key,
+ file_path=local_path,
+ )
+
+ # ---- discovery / reset -----------------------------------------------
+
+ def reset(self) -> None:
+ """Truncate every table. Intended for tests and re-ingest scenarios."""
+ for layer in L4_LAYERS + L3_LAYERS + L2_LAYERS + L1_LAYERS:
+ self.conn.execute(f"DELETE FROM {layer}")
+
+ def clear_local_staging(self) -> None:
+ """Remove any local parquet staging directory."""
+ root = (
+ Path(os.environ.get("DATORCLOUD_PARQUET_STAGING", "./.parquet_staging"))
+ if _is_s3(self.metadata_base_uri)
+ else _local_path(self.metadata_base_uri)
+ )
+ if root.exists():
+ shutil.rmtree(root, ignore_errors=True)
+
+
+__all__ = [
+ "ParquetCatalogComponent",
+ "L1_LAYERS",
+ "L2_LAYERS",
+ "L3_LAYERS",
+ "L4_LAYERS",
+ "HIVE_PARTITIONED_LAYERS",
+ "V_DORIS_SQL",
+ "V_DORIS_EGRESS_SQL",
+ "SCHEMA_VERSION",
+]
diff --git a/datorcloud/core/datorcloud_orchestrator.py b/datorcloud/core/datorcloud_orchestrator.py
index f5e3b67..ad8076b 100644
--- a/datorcloud/core/datorcloud_orchestrator.py
+++ b/datorcloud/core/datorcloud_orchestrator.py
@@ -1,18 +1,40 @@
-"""High-level orchestrator that wires every DatorCloud component together."""
+"""High-level orchestrator that wires every DatorCloud component together.
+
+Phase 1 of the DORIS integration plan adds the formal ``(I, C, Q, F)``
+operators -- ``ingest``, ``snapshot_cohort`` (+ ``create_eval_set``),
+``query``, and ``fetch`` -- on top of the existing legacy methods. The
+legacy methods (``upload_datasets``, ``generate_and_upload_metadata``,
+``query_metadata``, ``retrieve_data``) continue to work unchanged so
+already-deployed callers do not break.
+
+The ``from_env`` factory and ``.env`` contract are preserved verbatim
+(STEP_BY_STEP_PLAN.md §3 step 1.3 gate).
+"""
from __future__ import annotations
+import hashlib
+import json
import logging
import os
-from typing import Any, Dict, List, Optional
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Sequence
import pandas as pd
from ..components.metadata_generator_component import MetadataGeneratorComponent
from ..components.metadata_storage_component import MetadataStorageComponent
from ..components.minio_component import MinioObjectComponent
+from ..components.parquet_catalog_component import ParquetCatalogComponent
from ..components.query_component import QueryComponent
from ..components.retrieval_component import ObjectRetrievalComponent
+from ..snapshots import (
+ EvalSet,
+ Snapshot,
+ create_eval_set as _create_eval_set,
+ load_snapshot_payload,
+ snapshot_cohort as _snapshot_cohort,
+)
log = logging.getLogger(__name__)
@@ -22,6 +44,10 @@
DEFAULT_RETRIEVED_DIR = "./retrieved_data"
DEFAULT_REGION = "us-east-1"
+# Default parquet catalog root used when the caller does not pass one
+# explicitly. Resolved relative to ``local_data_dir``.
+DEFAULT_PARQUET_CATALOG_SUBDIR = "catalog"
+
class DatorCloudOrchestrator:
"""Main orchestrator class for DatorCloud operations.
@@ -51,6 +77,8 @@ def __init__(
metadata_storage: Optional[MetadataStorageComponent] = None,
query_component: Optional[QueryComponent] = None,
retrieval_component: Optional[ObjectRetrievalComponent] = None,
+ parquet_catalog: Optional[ParquetCatalogComponent] = None,
+ catalog_base_uri: Optional[str] = None,
) -> None:
"""Initialize the orchestrator.
@@ -96,6 +124,21 @@ def __init__(
self.local_download_dir = local_download_dir
self._last_metadata_file: Optional[str] = None
+ # Phase 1 -- L1-L4 Parquet catalog. Constructed lazily so legacy
+ # callers (which never touch the catalog) do not pay the DuckDB
+ # in-memory setup cost.
+ if parquet_catalog is not None:
+ self.parquet_catalog: Optional[ParquetCatalogComponent] = parquet_catalog
+ elif catalog_base_uri is not None:
+ self.parquet_catalog = ParquetCatalogComponent(
+ metadata_base_uri=catalog_base_uri,
+ minio_component=self.minio_component,
+ metadata_bucket=metadata_bucket,
+ )
+ else:
+ self.parquet_catalog = None
+ self._catalog_base_uri = catalog_base_uri
+
# ------------------------------------------------------------------
# Factories
# ------------------------------------------------------------------
@@ -152,6 +195,7 @@ def from_env(cls, **overrides: Any) -> "DatorCloudOrchestrator":
"RETRIEVED_DATA_PATH", DEFAULT_RETRIEVED_DIR
),
duckdb_extension_path=os.environ.get("DUCKDB_HTTPFS_EXTENSION_PATH"),
+ catalog_base_uri=os.environ.get("DATORCLOUD_CATALOG_URI"),
)
kwargs.update(overrides)
return cls(**kwargs)
@@ -256,3 +300,237 @@ def retrieve_experiment(
data_bucket=self.data_bucket,
**filters,
)
+
+ # ------------------------------------------------------------------
+ # Phase 1 -- formal (I, C, Q, F) operators
+ #
+ # These work against the L1-L4 Parquet catalog. They throw a clear
+ # RuntimeError if no catalog is wired in, so legacy callers cannot
+ # accidentally bypass the new layered model.
+ # ------------------------------------------------------------------
+
+ def _require_catalog(self) -> ParquetCatalogComponent:
+ if self.parquet_catalog is None:
+ raise RuntimeError(
+ "Catalog operators (ingest/query/fetch/snapshot_cohort) "
+ "require a parquet_catalog. Pass `catalog_base_uri=...` "
+ "to DatorCloudOrchestrator(...) or set the "
+ "DATORCLOUD_CATALOG_URI environment variable before "
+ "calling from_env()."
+ )
+ return self.parquet_catalog
+
+ # ---- I -----------------------------------------------------------
+
+ def ingest(self, layer: str, df: pd.DataFrame) -> int:
+ """**I** -- upsert rows into the named L1-L4 catalog layer.
+
+ Returns the number of rows written. The layer name matches the
+ DDL table name (``l1_experiment``, ``l2_sensor``, ``l3_annotation``,
+ ``l1_processing``, ``l1_citations``).
+ """
+ return self._require_catalog().write_rows(layer, df)
+
+ # ---- Q -----------------------------------------------------------
+
+ def query(
+ self,
+ *,
+ sql: Optional[str] = None,
+ view: str = "v_doris",
+ filters: Optional[Dict[str, Any]] = None,
+ limit: Optional[int] = None,
+ ) -> pd.DataFrame:
+ """**Q** -- formal query operator over the L1-L4 views.
+
+ Two equivalent call styles:
+
+ 1. ``query(sql="SELECT ... FROM v_doris WHERE ...")`` -- raw SQL.
+ 2. ``query(view="v_doris", filters={"modality": "CT"})`` -- the
+ thin wrapper API. Both return the same DataFrame for
+ equivalent predicates (integration-test assertion f).
+ """
+ catalog = self._require_catalog()
+ if sql is None:
+ where = QueryComponent._build_where_clause(filters or {})
+ sql = f"SELECT * FROM {view}{where}"
+ if limit is not None:
+ sql += f" LIMIT {int(limit)}"
+ return catalog.query(sql)
+
+ # ---- C -----------------------------------------------------------
+
+ def snapshot_cohort(
+ self,
+ *,
+ dataset_id: str,
+ predicate_sql: Optional[str] = None,
+ snapshot_date: Optional[str] = None,
+ ) -> Snapshot:
+ """**C** -- freeze a cohort identity into ``l4_cohort_snapshot``.
+
+ See :func:`datorcloud.snapshots.snapshot_cohort` for the freeze
+ semantics. ``catalog_sha256`` is stable across reruns even after
+ ``l2_sensor.converted_uri`` mutations.
+ """
+ return _snapshot_cohort(
+ self._require_catalog(),
+ dataset_id=dataset_id,
+ predicate_sql=predicate_sql,
+ snapshot_date=snapshot_date,
+ )
+
+ def create_eval_set(
+ self,
+ *,
+ eval_set_id: str,
+ snapshot_id: str,
+ annotator_columns: Sequence[str],
+ target_labels: Sequence[str],
+ inter_observer_quantiles: Optional[Sequence[float]] = None,
+ notes: Optional[str] = None,
+ ) -> EvalSet:
+ """Attach a new ``l4_eval_set`` row to *snapshot_id* (per I3)."""
+ return _create_eval_set(
+ self._require_catalog(),
+ eval_set_id=eval_set_id,
+ snapshot_id=snapshot_id,
+ annotator_columns=annotator_columns,
+ target_labels=target_labels,
+ inter_observer_quantiles=inter_observer_quantiles,
+ notes=notes,
+ )
+
+ # ---- F -----------------------------------------------------------
+
+ def fetch(
+ self,
+ *,
+ snapshot_id: str,
+ dest: str,
+ with_blobs: bool = False,
+ ) -> Dict[str, Any]:
+ """**F** -- materialise a snapshot's MIRO tree under *dest*.
+
+ Phase 1 writes the frozen catalog payload + a deterministic
+ ``manifest.json`` summary; Phase 2 will add raw / converted blob
+ downloads through :class:`MinioObjectComponent` when
+ ``with_blobs=True``.
+
+ Returns a result dict with ``snapshot_id``, ``manifest_path``,
+ ``n_records``, ``catalog_sha256``, and a per-record
+ ``records`` listing. Two consecutive calls into different dest
+ directories produce byte-identical manifests (integration-test
+ assertion d, Phase 1 share).
+ """
+ catalog = self._require_catalog()
+ info = catalog.query(
+ "SELECT catalog_sha256, n_records, predicate_sql FROM l4_cohort_snapshot "
+ "WHERE snapshot_id = ?",
+ params=[snapshot_id],
+ )
+ if info.empty:
+ raise KeyError(f"snapshot not found: {snapshot_id!r}")
+ meta = info.iloc[0].to_dict()
+
+ payload = load_snapshot_payload(catalog, snapshot_id)
+ dest_path = Path(dest) / snapshot_id
+ dest_path.mkdir(parents=True, exist_ok=True)
+
+ records: List[Dict[str, Any]] = []
+ downloaded: List[Dict[str, Any]] = []
+ if not payload.empty and "layer" in payload.columns:
+ l1 = payload[payload["layer"] == "l1_experiment"]
+ for _, row in l1.iterrows():
+ rec = {
+ "record_uid": row["record_uid"],
+ "dataset_id": row["dataset_id"],
+ "dataset_version": row["dataset_version"],
+ "subject_id": row["subject_id"],
+ "study_id": row.get("study_id", ""),
+ }
+ records.append(rec)
+ if with_blobs:
+ downloaded.extend(
+ self._fetch_record_blobs(payload, rec, dest_path)
+ )
+
+ manifest = {
+ "snapshot_id": snapshot_id,
+ "catalog_sha256": meta["catalog_sha256"],
+ "n_records": int(meta["n_records"]),
+ "predicate_sql": meta.get("predicate_sql"),
+ "miro_layout": "//image.nii.gz "
+ "+ seg/