diff --git a/apps/backend/alembic/versions/0033_sbom_conformance.py b/apps/backend/alembic/versions/0033_sbom_conformance.py new file mode 100644 index 00000000..d951a5ff --- /dev/null +++ b/apps/backend/alembic/versions/0033_sbom_conformance.py @@ -0,0 +1,133 @@ +"""sbom_conformance — received-SBOM quality verdict (model 3) + +Revision ID: 0033 +Revises: 0032 +Created: 2026-06-14 + +Phase: sbom-ingest (conformance) +Kind: schema +Forward-only: yes + +What: + - Create table ``sbom_conformance``:: + id UUID PK DEFAULT gen_random_uuid() + scan_id UUID NOT NULL UNIQUE REFERENCES scans(id) ON DELETE CASCADE + project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE + source_format VARCHAR(16) NOT NULL -- cyclonedx|spdx-json|spdx-tv|unknown + result VARCHAR(8) NOT NULL -- pass|warn|fail + n_fail INTEGER NOT NULL DEFAULT 0 + n_warn INTEGER NOT NULL DEFAULT 0 + component_count INTEGER NOT NULL DEFAULT 0 + purl_coverage_pct INTEGER -- NULL for SPDX Tag-Value + license_coverage_pct INTEGER + hash_coverage_pct INTEGER + checks JSONB NOT NULL DEFAULT '[]'::jsonb + created_at TIMESTAMPTZ NOT NULL DEFAULT now() + - Index ``ix_sbom_conformance_project_id`` (project_id) for the + "Received SBOM" tenant-scoped list. The UNIQUE on ``scan_id`` already + backs the per-scan lookup (the conformance read endpoint) with an index. + +Why: + - An uploaded SBOM is scored for quality (services/sbom_conformance.py) before + and regardless of CVE matching, so the portal can show a pass/warn/fail + badge + per-check table and a supplier can be rejected with concrete + reasons. ``result`` / coverage are queryable columns (list filtering / + sorting); the full per-check array lives in ``checks`` (JSONB) for the + detail view. A dedicated table (vs. scan_metadata JSONB) keeps those + columns first-class and avoids the 16 KiB metadata ceiling. + +Re-run idempotency: + - ``scan_id`` is UNIQUE; the ingest Celery task deletes any prior row for the + scan before inserting, so a Celery ``acks_late`` re-entry replaces rather + than duplicates. No UPDATE path; a correction is a fresh row. + +Cascade policy: + - ``scan_id`` CASCADE — the verdict is meaningless without its scan. + - ``project_id`` CASCADE — mirrors the scan→project lifecycle; a project + delete cleans its verdicts. + +Notes: + - Forward-only per CLAUDE.md §6: ``downgrade()`` raises NotImplementedError. + - No native ENUM: ``result`` / ``source_format`` vocabularies are owned by the + scorer (VARCHAR keeps tweaks migration-free; the FE mirrors the check-id + catalogue under a contract test). + - JSONB ``checks`` is read whole per row (detail view), never filtered by its + interior → no GIN index needed. + - No data migration — new empty table. +""" + +from __future__ import annotations + +from collections.abc import Sequence + +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +from alembic import op + +revision: str = "0033" +down_revision: str | None = "0032" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + +UUID_PK = postgresql.UUID(as_uuid=True) +GEN_UUID = sa.text("gen_random_uuid()") +NOW = sa.text("now()") +EMPTY_JSONB_ARR = sa.text("'[]'::jsonb") + + +def upgrade() -> None: + op.create_table( + "sbom_conformance", + sa.Column("id", UUID_PK, primary_key=True, server_default=GEN_UUID), + sa.Column("scan_id", UUID_PK, nullable=False), + sa.Column("project_id", UUID_PK, nullable=False), + sa.Column("source_format", sa.String(length=16), nullable=False), + sa.Column("result", sa.String(length=8), nullable=False), + sa.Column("n_fail", sa.Integer(), nullable=False, server_default=sa.text("0")), + sa.Column("n_warn", sa.Integer(), nullable=False, server_default=sa.text("0")), + sa.Column( + "component_count", sa.Integer(), nullable=False, server_default=sa.text("0") + ), + sa.Column("purl_coverage_pct", sa.Integer(), nullable=True), + sa.Column("license_coverage_pct", sa.Integer(), nullable=True), + sa.Column("hash_coverage_pct", sa.Integer(), nullable=True), + sa.Column( + "checks", + postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + server_default=EMPTY_JSONB_ARR, + ), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=NOW, + ), + sa.ForeignKeyConstraint( + ["scan_id"], + ["scans.id"], + name="fk_sbom_conformance_scan_id", + ondelete="CASCADE", + ), + sa.ForeignKeyConstraint( + ["project_id"], + ["projects.id"], + name="fk_sbom_conformance_project_id", + ondelete="CASCADE", + ), + # One verdict per scan; also backs the per-scan read endpoint with an + # index (UNIQUE implies an index). + sa.UniqueConstraint("scan_id", name="uq_sbom_conformance_scan_id"), + ) + + # Tenant-scoped "Received SBOM" list ("conformance verdicts for my project"). + op.create_index( + "ix_sbom_conformance_project_id", + "sbom_conformance", + ["project_id"], + ) + + +def downgrade() -> None: + raise NotImplementedError("downgrade is not supported (forward-only policy)") diff --git a/apps/backend/api/v1/sbom.py b/apps/backend/api/v1/sbom.py index 543559d7..0c6a08cb 100644 --- a/apps/backend/api/v1/sbom.py +++ b/apps/backend/api/v1/sbom.py @@ -32,6 +32,7 @@ UploadFile, status, ) +from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from core.api_key_auth import require_role_or_api_key @@ -41,7 +42,8 @@ from core.errors import problem_response from core.ratelimit import _authenticated_user_key, limiter from core.security import CurrentUser, require_role -from models import Project +from models import Project, SbomConformance +from schemas.sbom import SbomConformanceRead from schemas.scan import ScanPublic from services.project_service import ( ProjectError, @@ -820,6 +822,77 @@ async def ingest_sbom_endpoint( ) +@router.get( + "/projects/{project_id}/scans/{scan_id}/conformance", + response_model=SbomConformanceRead, + summary="Get the conformance verdict for an ingested SBOM scan", + responses={ + 200: { + "description": "The SBOM conformance verdict (pass/warn/fail + per-check detail).", + "content": {"application/json": {}}, + }, + 401: {"description": "Authentication required"}, + 404: { + "description": ( + "Project not accessible (existence-hidden), or no conformance verdict " + "exists for this scan (not an ingested SBOM scan, or still queued). " + "RFC 7807." + ), + "content": {"application/problem+json": {}}, + }, + }, +) +async def get_sbom_conformance_endpoint( + request: Request, + project_id: uuid.UUID, + scan_id: uuid.UUID, + session: AsyncSession = Depends(get_db), + actor: CurrentUser = Depends(require_role_or_api_key("developer")), +) -> Response: + # Same auth + IDOR guard as the rest of the SBOM surface: an outsider sees + # 404 (existence-hide), and the cross-team audit entry fires. + project = await _resolve_project_or_problem( + request, + project_id=project_id, + session=session, + actor=actor, + resource="sbom_conformance", + ) + if isinstance(project, Response): + return project + + # The verdict row carries the denormalised project_id, so the scan-belongs- + # to-project check is a single predicate (a cross-project scan_id yields no + # row → the same 404 as an unknown scan). No conformance row also means the + # scan is not an ingested SBOM scan, or its ingest task has not reached the + # conformance stage yet. + row = ( + await session.execute( + select(SbomConformance).where( + SbomConformance.scan_id == scan_id, + SbomConformance.project_id == project_id, + ) + ) + ).scalar_one_or_none() + if row is None: + return problem_response( + status_code=status.HTTP_404_NOT_FOUND, + title="Conformance Verdict Not Found", + detail=( + "No SBOM conformance verdict exists for this scan. The scan may not be " + "an ingested SBOM scan, or its ingest may still be in progress." + ), + instance=request.url.path, + ) + + body = SbomConformanceRead.model_validate(row) + return Response( + content=body.model_dump_json(), + status_code=status.HTTP_200_OK, + media_type="application/json", + ) + + # slowapi's ``@limiter.shared_limit`` wraps the endpoint with functools.wraps, # whose ``__globals__`` is slowapi's module. Under ``from __future__ import # annotations`` FastAPI's ``get_type_hints()`` on the wrapper cannot resolve our diff --git a/apps/backend/models/__init__.py b/apps/backend/models/__init__.py index 34558092..cb909c38 100644 --- a/apps/backend/models/__init__.py +++ b/apps/backend/models/__init__.py @@ -64,6 +64,9 @@ class Base(DeclarativeBase): REPORT_TYPE_VALUES, ReportDownload, ) +from .sbom_conformance import ( # noqa: E402,F401 (imported for metadata side effects) + SbomConformance, +) from .scan import ( # noqa: E402,F401 (imported for metadata side effects) Component, ComponentDependencyEdge, @@ -109,6 +112,7 @@ class Base(DeclarativeBase): "RefreshToken", "RemediationPullRequest", "ReportDownload", + "SbomConformance", "Scan", "ScanArtifact", "ScanComponent", diff --git a/apps/backend/models/sbom_conformance.py b/apps/backend/models/sbom_conformance.py new file mode 100644 index 00000000..bf0fc9c2 --- /dev/null +++ b/apps/backend/models/sbom_conformance.py @@ -0,0 +1,134 @@ +""" +Received-SBOM conformance result model — model 3 (supplier-submitted SBOM ingest). + +Table: + - ``sbom_conformance`` — exactly one row per ingested ``Scan`` (``kind='sbom'``), + holding the quality-scoring verdict computed by + :mod:`services.sbom_conformance` from the uploaded document's ORIGINAL bytes + (before any CycloneDX normalisation). The portal renders this as a + pass / warn / fail badge plus a per-check table on the scan, and a supplier + can be sent a rejection citing the failed mandatory checks. + +Why a dedicated table (vs. ``scan_metadata`` JSONB): + - The "Received SBOM" surface filters / sorts by ``result`` and coverage, and + those are first-class queryable columns here rather than buried in the + polymorphic scan metadata blob (which also has a 16 KiB ceiling the full + per-check detail with capped ``missing[]`` lists can approach). The raw + per-check array still lives in ``checks`` (JSONB) for the detail view. + +Lifecycle: + - INSERT (or REPLACE — the ingest task deletes any prior row for the scan + before inserting, so a re-run under Celery ``acks_late`` stays idempotent + against the ``uq_sbom_conformance_scan_id`` unique constraint). No UPDATE. + - Hard delete only via ON DELETE CASCADE when the parent ``scans`` row (or, + transitively, the project) is removed. + +Conventions (CLAUDE.md core rules + neighbouring model files): + - PostgreSQL only. UUID PK defaults to ``gen_random_uuid()`` (pgcrypto). + - TIMESTAMPTZ ``created_at``; append-only, no ``updated_at`` (a re-run + replaces the row, it does not mutate it in place). + - ``result`` / ``source_format`` are short closed vocabularies but kept as + VARCHAR (not native ENUM): the scorer in services/sbom_conformance.py owns + the value set, the FE mirrors the check-id catalogue (a contract test keeps + them in lockstep), and an ``ALTER TYPE ADD VALUE`` per tweak is more + friction than value here. + - No environment access at import time (CLAUDE.md core rule #11). + +Cross-domain relationships: + - FK columns reference ``scans.id`` / ``projects.id`` but this module adds no + ORM ``relationship()`` edge back into ``scan.py`` (one-way dependency, same + pattern as report_download.py). Callers query explicitly. +""" + +from __future__ import annotations + +import uuid +from datetime import datetime +from typing import Any + +from sqlalchemy import ( + DateTime, + ForeignKey, + Integer, + String, + text, +) +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.dialects.postgresql import UUID as PG_UUID +from sqlalchemy.orm import Mapped, mapped_column + +from . import Base + +UUID_PK = PG_UUID(as_uuid=True) +GEN_UUID = text("gen_random_uuid()") +NOW = text("now()") +EMPTY_JSONB_ARR = text("'[]'::jsonb") + +# Closed vocabularies owned by services.sbom_conformance (kept as VARCHAR, see +# module docstring). Mirrored here only for documentation / test reference. +RESULT_VALUES = ("pass", "warn", "fail") +SOURCE_FORMAT_VALUES = ("cyclonedx", "spdx-json", "spdx-tv", "unknown") + + +class SbomConformance(Base): + """One conformance verdict for an ingested SBOM scan.""" + + __tablename__ = "sbom_conformance" + + id: Mapped[uuid.UUID] = mapped_column( + UUID_PK, primary_key=True, server_default=GEN_UUID + ) + + # One verdict per scan. UNIQUE so the ingest task's delete-then-insert + # re-run path can rely on at-most-one row, and a stray double-insert is a + # DB-level error rather than a silent duplicate. + scan_id: Mapped[uuid.UUID] = mapped_column( + UUID_PK, + ForeignKey("scans.id", ondelete="CASCADE"), + nullable=False, + unique=True, + ) + + # Denormalised tenant/owner pointer (mirrors the scan's project) so the + # "Received SBOM" list can filter without joining scans. + project_id: Mapped[uuid.UUID] = mapped_column( + UUID_PK, + ForeignKey("projects.id", ondelete="CASCADE"), + nullable=False, + ) + + # Detected serialisation: cyclonedx | spdx-json | spdx-tv | unknown. + source_format: Mapped[str] = mapped_column(String(16), nullable=False) + + # Overall verdict: pass | warn | fail. ``warn`` = all mandatory checks pass + # but a recommended (license / hash coverage) check fell short. + result: Mapped[str] = mapped_column(String(8), nullable=False) + + n_fail: Mapped[int] = mapped_column(Integer, nullable=False, server_default=text("0")) + n_warn: Mapped[int] = mapped_column(Integer, nullable=False, server_default=text("0")) + component_count: Mapped[int] = mapped_column( + Integer, nullable=False, server_default=text("0") + ) + + # Coverage percentages (0-100). NULL for SPDX Tag-Value, which is scored on + # presence only (per-package coverage is not computed for Tag-Value). + purl_coverage_pct: Mapped[int | None] = mapped_column(Integer, nullable=True) + license_coverage_pct: Mapped[int | None] = mapped_column(Integer, nullable=True) + hash_coverage_pct: Mapped[int | None] = mapped_column(Integer, nullable=True) + + # Per-check detail array (each: id/label/required/status/detail/missing[]), + # ``missing[]`` capped at 50 by the scorer. Drives the detail table. + checks: Mapped[list[dict[str, Any]]] = mapped_column( + JSONB, nullable=False, server_default=EMPTY_JSONB_ARR + ) + + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=NOW + ) + + +__all__ = [ + "RESULT_VALUES", + "SOURCE_FORMAT_VALUES", + "SbomConformance", +] diff --git a/apps/backend/schemas/sbom.py b/apps/backend/schemas/sbom.py new file mode 100644 index 00000000..25139dd6 --- /dev/null +++ b/apps/backend/schemas/sbom.py @@ -0,0 +1,60 @@ +""" +Pydantic schemas for the received-SBOM surface — model 3. + +``SbomConformanceRead`` is the wire shape of a :class:`models.SbomConformance` +row: the quality verdict the ingest pipeline computed for an uploaded SBOM. The +portal renders ``result`` as a pass / warn / fail badge and ``checks`` as the +per-requirement detail table. The check-id set mirrors +``services.sbom_conformance.CHECK_IDS`` (a contract test keeps the FE mirror in +lockstep). +""" + +from __future__ import annotations + +import uuid +from datetime import datetime +from typing import Literal + +from pydantic import BaseModel, ConfigDict, Field + + +class SbomConformanceCheck(BaseModel): + """One requirement's verdict within a conformance result.""" + + id: str = Field(description="Stable check id (see sbom_conformance.CHECK_IDS).") + label: str = Field(description="Human-readable requirement label.") + required: bool = Field( + description="True for a mandatory check, False for a recommended (warn-only) one." + ) + status: Literal["pass", "fail", "warn"] + detail: str = Field(description="Short evidence string (e.g. '96% (32/33)').") + missing: list[str] = Field( + default_factory=list, + description="Offending item names for a failed check, capped at 50.", + ) + + +class SbomConformanceRead(BaseModel): + """The conformance verdict for an ingested SBOM scan.""" + + model_config = ConfigDict(from_attributes=True) + + scan_id: uuid.UUID + project_id: uuid.UUID + source_format: Literal["cyclonedx", "spdx-json", "spdx-tv", "unknown"] + result: Literal["pass", "warn", "fail"] + n_fail: int + n_warn: int + component_count: int + # NULL for SPDX Tag-Value (scored on presence; per-package coverage absent). + purl_coverage_pct: int | None = None + license_coverage_pct: int | None = None + hash_coverage_pct: int | None = None + checks: list[SbomConformanceCheck] = Field(default_factory=list) + created_at: datetime + + +__all__ = [ + "SbomConformanceCheck", + "SbomConformanceRead", +] diff --git a/apps/backend/tasks/ingest_sbom.py b/apps/backend/tasks/ingest_sbom.py index ac4cc8d8..00af16ba 100644 --- a/apps/backend/tasks/ingest_sbom.py +++ b/apps/backend/tasks/ingest_sbom.py @@ -49,6 +49,8 @@ import structlog from celery.exceptions import SoftTimeLimitExceeded +from sqlalchemy import delete +from sqlalchemy.orm import Session from core.config import scan_soft_time_limit_seconds, workspace_root from core.db import sync_session_scope @@ -59,7 +61,8 @@ TrivyTimeout, run_trivy_sbom, ) -from models import Project, Scan +from models import Project, SbomConformance, Scan +from services import sbom_conformance from services.vulnerability_matching import persist_trivy_findings from tasks._progress import ( close_log_file, @@ -93,6 +96,7 @@ # render source scans. _STAGE_PROGRESS: dict[str, int] = { "bootstrap": 0, + "conformance": 20, "components": 40, "trivy": 80, "finalize": 100, @@ -257,6 +261,23 @@ def _run_pipeline( # absent file, a path that escaped the workspace root, or non-JSON content. sbom_path, sbom_dict = _load_uploaded_sbom(scan_metadata) + # Stage 1.5 — conformance scoring. Scored on the ORIGINAL uploaded bytes + # (before any normalisation) so SPDX-specific metadata is judged + # accurately. Quality-gating is advisory: a "fail" verdict is recorded and + # surfaced as a badge but does NOT abort the match (the supplier may still + # want the partial result + the rejection reasons). The scorer never raises; + # the persist is delete-then-insert so an acks_late re-entry replaces the + # prior row (uq_sbom_conformance_scan_id). + _set_stage(scan_uuid, "conformance") + with sync_session_scope() as session: + _persist_conformance( + session, + scan_uuid=scan_uuid, + project_id=project_id, + raw=sbom_path.read_bytes(), + ) + session.commit() + # Stage 2 — persist components + declared licenses. MUST run before Trivy: # ``persist_trivy_findings`` matches each finding to a ``ComponentVersion`` # by PURL, so the component graph has to exist first. ``source_dir=None`` @@ -318,6 +339,50 @@ def _run_pipeline( mark_succeeded(scan_uuid) +def _persist_conformance( + session: Session, + *, + scan_uuid: uuid.UUID, + project_id: uuid.UUID, + raw: bytes, +) -> None: + """Score the uploaded bytes and (re)write the scan's conformance verdict. + + Delete-then-insert keeps the at-most-one ``sbom_conformance`` row per scan + invariant under a Celery ``acks_late`` re-entry (``_reset_scan_for_rerun`` + does not touch this table). ``evaluate`` never raises — an unparseable + upload yields ``result="fail"``, which is a legitimate persisted verdict. + """ + result = sbom_conformance.evaluate(raw) + session.execute( + delete(SbomConformance).where(SbomConformance.scan_id == scan_uuid) + ) + session.add( + SbomConformance( + scan_id=scan_uuid, + project_id=project_id, + source_format=result.source_format, + result=result.result, + n_fail=result.n_fail, + n_warn=result.n_warn, + component_count=result.component_count, + purl_coverage_pct=result.purl_coverage_pct, + license_coverage_pct=result.license_coverage_pct, + hash_coverage_pct=result.hash_coverage_pct, + checks=[c.as_dict() for c in result.checks], + ) + ) + log.info( + "ingest_sbom_conformance", + scan_id=str(scan_uuid), + source_format=result.source_format, + result=result.result, + n_fail=result.n_fail, + n_warn=result.n_warn, + component_count=result.component_count, + ) + + def _load_uploaded_sbom(scan_metadata: dict[str, Any]) -> tuple[Path, dict[str, Any]]: """Resolve, containment-check, and parse the uploaded CycloneDX SBOM. diff --git a/apps/backend/tests/integration/scan/test_ingest_sbom_pipeline.py b/apps/backend/tests/integration/scan/test_ingest_sbom_pipeline.py index 28f57b9d..47786b07 100644 --- a/apps/backend/tests/integration/scan/test_ingest_sbom_pipeline.py +++ b/apps/backend/tests/integration/scan/test_ingest_sbom_pipeline.py @@ -43,6 +43,7 @@ Component, ComponentVersion, LicenseFinding, + SbomConformance, Scan, ScanArtifact, ScanComponent, @@ -265,6 +266,27 @@ def test_ingest_pipeline_persists_components_and_dense_findings( } assert "sbom_cyclonedx" in kinds + # Conformance: scored on the ORIGINAL uploaded bytes. The realistic fixture + # is a well-formed CycloneDX (full PURLs + graph + licenses) but carries no + # component hashes → all mandatory checks pass, the recommended hash check + # warns → overall verdict 'warn'. Exactly one verdict row per scan. + verdicts = list( + sync_session.execute( + select(SbomConformance).where(SbomConformance.scan_id == scan_id) + ).scalars() + ) + assert len(verdicts) == 1, "exactly one conformance verdict per ingested scan" + verdict = verdicts[0] + assert verdict.source_format == "cyclonedx" + assert verdict.result == "warn" + assert verdict.n_fail == 0 + assert verdict.purl_coverage_pct == 100 + assert verdict.component_count == 4 + assert verdict.checks, "the per-check detail array is persisted" + # The denormalised project pointer matches the scan's project (used by the + # tenant-scoped read endpoint's belongs-to-project predicate). + assert verdict.project_id == _project_id + # --------------------------------------------------------------------------- # Idempotency — re-running a succeeded ingest is a no-op @@ -355,3 +377,52 @@ async def _seed_prior() -> uuid.UUID: assert new_scan.status == "succeeded" assert new_scan.superseded_at is None, "the newest succeeded scan is live" assert prior.superseded_at is not None, "the prior same-ref scan was superseded" + + +# --------------------------------------------------------------------------- +# Lifecycle — a forced re-entry REPLACES the conformance verdict (no dupe) +# --------------------------------------------------------------------------- + + +def test_ingest_rerun_replaces_conformance_verdict( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path, sync_session: Session +) -> None: + """``_reset_scan_for_rerun`` does not touch ``sbom_conformance``; the verdict + persist is delete-then-insert, so a Celery acks_late re-entry on a scan that + is NOT yet succeeded replaces the row rather than tripping the + ``uq_sbom_conformance_scan_id`` unique constraint.""" + monkeypatch.setenv("WORKSPACE_HOST_PATH", str(tmp_path)) + _stub_trivy_from_fixture(monkeypatch) + + scan_id, _ = _seed_queued_sbom_scan(tmp_path) + + from tasks.ingest_sbom import ingest_sbom_task + + # First run → one verdict, scan succeeded. + ingest_sbom_task.apply(args=[str(scan_id)]) + sync_session.expire_all() + first = sync_session.execute( + select(SbomConformance).where(SbomConformance.scan_id == scan_id) + ).scalar_one() + first_id = first.id + + # Force a genuine re-entry: flip the scan back to queued so the task re-runs + # the pipeline (instead of the succeeded-skip) and re-scores conformance. + scan = sync_session.execute(select(Scan).where(Scan.id == scan_id)).scalar_one() + scan.status = "queued" + scan.completed_at = None + scan.superseded_at = None + sync_session.commit() + + ingest_sbom_task.apply(args=[str(scan_id)]) + sync_session.expire_all() + + # Still exactly one verdict (replaced, not duplicated) — and a fresh row. + rows = list( + sync_session.execute( + select(SbomConformance).where(SbomConformance.scan_id == scan_id) + ).scalars() + ) + assert len(rows) == 1, "re-entry must REPLACE the verdict, not duplicate it" + assert rows[0].id != first_id, "the verdict row was re-created (delete-then-insert)" + assert rows[0].result == "warn" diff --git a/apps/backend/tests/integration/test_sbom_ingest_api.py b/apps/backend/tests/integration/test_sbom_ingest_api.py index 4024d0d6..5134618b 100644 --- a/apps/backend/tests/integration/test_sbom_ingest_api.py +++ b/apps/backend/tests/integration/test_sbom_ingest_api.py @@ -660,3 +660,138 @@ def _boom(scan): # type: ignore[no-untyped-def] assert scan.status == "failed" assert scan.error_message is not None assert scan.error_message.startswith("enqueue_failed:") + + +# --------------------------------------------------------------------------- +# GET /v1/projects/{project_id}/scans/{scan_id}/conformance +# +# Conformance read surface. Permission×state ordering (CLAUDE.md §2 rule 1): a +# non-member sees 404 (existence-hide) regardless of whether a verdict exists — +# the project authz gate fires before any per-scan lookup. A member with no +# verdict row (or a cross-project scan_id) also sees 404, never a leak. +# --------------------------------------------------------------------------- + + +async def _seed_sbom_scan_with_verdict( + client: AsyncClient, + *, + project_id: uuid.UUID, + result: str = "warn", + source_format: str = "cyclonedx", +): + """Insert a succeeded sbom scan + its conformance verdict; return scan_id.""" + factory = await _factory(client) + async with factory() as session: + from sqlalchemy import select + + from models import Project, SbomConformance + + project = ( + await session.execute(select(Project).where(Project.id == project_id)) + ).scalar_one() + scan = await make_scan(session, project=project, kind="sbom", status="succeeded") + session.add( + SbomConformance( + scan_id=scan.id, + project_id=project_id, + source_format=source_format, + result=result, + n_fail=0, + n_warn=1, + component_count=4, + purl_coverage_pct=100, + license_coverage_pct=100, + hash_coverage_pct=0, + checks=[ + { + "id": "purl", + "label": "PURL coverage (>= 90%)", + "required": True, + "status": "pass", + "detail": "100% (4/4)", + "missing": [], + }, + { + "id": "hash", + "label": "Hash coverage (>= 50%, recommended)", + "required": False, + "status": "warn", + "detail": "0% (0/4)", + "missing": [], + }, + ], + ) + ) + await session.commit() + return scan.id + + +async def test_get_conformance_returns_verdict(client) -> None: + team, user, project = await _seed(client, role="developer") + scan_id = await _seed_sbom_scan_with_verdict(client, project_id=project.id) + + resp = await client.get( + f"/v1/projects/{project.id}/scans/{scan_id}/conformance", + headers=_bearer_for(user), + ) + assert resp.status_code == 200, resp.text + body = resp.json() + assert body["result"] == "warn" + assert body["source_format"] == "cyclonedx" + assert body["purl_coverage_pct"] == 100 + assert body["component_count"] == 4 + assert {c["id"] for c in body["checks"]} == {"purl", "hash"} + + +async def test_get_conformance_cross_team_is_404(client) -> None: + # Project + verdict owned by team A. + _team_a, _user_a, project = await _seed(client, role="developer") + scan_id = await _seed_sbom_scan_with_verdict(client, project_id=project.id) + + # A developer on an unrelated team B must see 404 (existence-hide): the + # project authz gate fires before the per-scan verdict lookup. + _team_b, user_b, _project_b = await _seed(client, role="developer") + resp = await client.get( + f"/v1/projects/{project.id}/scans/{scan_id}/conformance", + headers=_bearer_for(user_b), + ) + assert resp.status_code == 404, resp.text + assert resp.headers["content-type"].startswith(PROBLEM_JSON) + + +async def test_get_conformance_missing_verdict_is_404(client) -> None: + # A scan that exists in the project but has no verdict row (e.g. ingest + # still queued, or a non-sbom scan) → 404, never a 500/empty 200. + team, user, project = await _seed(client, role="developer") + factory = await _factory(client) + async with factory() as session: + from sqlalchemy import select + + from models import Project + + p = ( + await session.execute(select(Project).where(Project.id == project.id)) + ).scalar_one() + scan = await make_scan(session, project=p, kind="sbom", status="queued") + await session.commit() + scan_id = scan.id + + resp = await client.get( + f"/v1/projects/{project.id}/scans/{scan_id}/conformance", + headers=_bearer_for(user), + ) + assert resp.status_code == 404, resp.text + + +async def test_get_conformance_scan_in_other_project_is_404(client) -> None: + # The (scan_id, project_id) predicate must reject a verdict whose scan lives + # in a DIFFERENT project of the same team — no cross-project read. + team, user, project = await _seed(client, role="developer") + scan_id = await _seed_sbom_scan_with_verdict(client, project_id=project.id) + other = await _seed_extra_project(client, team_id=team.id) + + resp = await client.get( + f"/v1/projects/{other.id}/scans/{scan_id}/conformance", + headers=_bearer_for(user), + ) + assert resp.status_code == 404, resp.text diff --git a/apps/backend/tests/unit/openapi_endpoints.json b/apps/backend/tests/unit/openapi_endpoints.json index 5093530d..78a05965 100644 --- a/apps/backend/tests/unit/openapi_endpoints.json +++ b/apps/backend/tests/unit/openapi_endpoints.json @@ -297,6 +297,10 @@ "project_id", "size" ], + "GET /v1/projects/{project_id}/scans/{scan_id}/conformance": [ + "project_id", + "scan_id" + ], "GET /v1/projects/{project_id}/source-file": [ "path", "project_id", diff --git a/docs-site/docs/ci-integration/sbom-upload.md b/docs-site/docs/ci-integration/sbom-upload.md index 712a15a8..332e31d5 100644 --- a/docs-site/docs/ci-integration/sbom-upload.md +++ b/docs-site/docs/ci-integration/sbom-upload.md @@ -77,6 +77,44 @@ curl https://trustedoss.example.com/v1/scans/ \ `status` moves `queued → running → succeeded`. A reasonable cadence is one poll every 30 seconds. Once `status` is `succeeded`, open the project in the portal to read components, vulnerabilities, and licenses. +## Read the conformance verdict + +When you upload an SBOM, TRUSCA scores its **quality** against a fixed bar before (and regardless of) matching — so a "shell" SBOM with no versions, no package URLs, or no dependency graph is flagged rather than silently producing an empty result. Read the verdict with: + +```bash +curl -H "Authorization: Bearer $TRUSTEDOSS_API_KEY" \ + https://trustedoss.example.com/v1/projects//scans//conformance +``` + +The response is the verdict for that scan: + +```json +{ + "scan_id": "", + "project_id": "", + "source_format": "cyclonedx", + "result": "warn", + "n_fail": 0, + "n_warn": 1, + "component_count": 42, + "purl_coverage_pct": 100, + "license_coverage_pct": 96, + "hash_coverage_pct": 0, + "checks": [ + { "id": "purl", "label": "PURL coverage (>= 90%)", "required": true, "status": "pass", "detail": "100% (42/42)", "missing": [] }, + { "id": "hash", "label": "Hash coverage (>= 50%, recommended)", "required": false, "status": "warn", "detail": "0% (0/42)", "missing": [] } + ] +} +``` + +- **`result`** is `pass`, `warn`, or `fail`. `fail` means a **mandatory** check failed; `warn` means every mandatory check passed but a **recommended** one (license or hash coverage) fell short; `pass` means all checks passed. +- **Mandatory checks**: a timestamp, tool info, a top-level component with name and version, 100% component name+version, PURL coverage at or above `SBOM_CONFORMANCE_PURL_MIN_PCT` (default `90`), no `pkg:generic` placeholders, and a transitive dependency graph. +- **Recommended checks** (warn only): license coverage at or above `SBOM_CONFORMANCE_LICENSE_MIN_PCT` (default `80`) and hash coverage at or above `SBOM_CONFORMANCE_HASH_MIN_PCT` (default `50`). +- A `fail` verdict does **not** abort the ingest — TRUSCA still matches CVEs and classifies licenses so you get the partial result alongside the concrete reasons. Use the verdict to decide whether to accept a supplier's SBOM or send it back. +- `purl_coverage_pct`, `license_coverage_pct`, and `hash_coverage_pct` are `null` for SPDX Tag-Value documents, which are scored on presence rather than per-package coverage. + +A `404` here means the project is not accessible to you, or the scan has no verdict yet (it is not an ingested SBOM scan, or its ingest has not reached the conformance stage). + ## Verify it worked After the scan reaches `succeeded`: diff --git a/docs-site/i18n/ko/docusaurus-plugin-content-docs/current/ci-integration/sbom-upload.md b/docs-site/i18n/ko/docusaurus-plugin-content-docs/current/ci-integration/sbom-upload.md index 532c69fe..e4d8d3db 100644 --- a/docs-site/i18n/ko/docusaurus-plugin-content-docs/current/ci-integration/sbom-upload.md +++ b/docs-site/i18n/ko/docusaurus-plugin-content-docs/current/ci-integration/sbom-upload.md @@ -77,6 +77,44 @@ curl https://trustedoss.example.com/v1/scans/ \ `status`는 `queued → running → succeeded`로 이동합니다. 30초에 한 번 폴링하는 주기가 적당합니다. `status`가 `succeeded`가 되면 포털에서 프로젝트를 열어 컴포넌트·취약점·라이선스를 확인합니다. +## 적합성(conformance) 결과 읽기 + +SBOM을 업로드하면 TRUSCA는 매칭 이전에(그리고 매칭 여부와 무관하게) SBOM의 **품질**을 정해진 기준으로 채점합니다. 버전·패키지 URL·의존성 그래프가 없는 "껍데기" SBOM이 조용히 빈 결과를 내는 대신 드러나게 하기 위해서입니다. 결과는 다음으로 읽습니다. + +```bash +curl -H "Authorization: Bearer $TRUSTEDOSS_API_KEY" \ + https://trustedoss.example.com/v1/projects//scans//conformance +``` + +응답은 해당 스캔의 채점 결과입니다. + +```json +{ + "scan_id": "", + "project_id": "", + "source_format": "cyclonedx", + "result": "warn", + "n_fail": 0, + "n_warn": 1, + "component_count": 42, + "purl_coverage_pct": 100, + "license_coverage_pct": 96, + "hash_coverage_pct": 0, + "checks": [ + { "id": "purl", "label": "PURL coverage (>= 90%)", "required": true, "status": "pass", "detail": "100% (42/42)", "missing": [] }, + { "id": "hash", "label": "Hash coverage (>= 50%, recommended)", "required": false, "status": "warn", "detail": "0% (0/42)", "missing": [] } + ] +} +``` + +- **`result`**는 `pass`·`warn`·`fail` 중 하나입니다. `fail`은 **필수** 검사가 실패했다는 뜻이고, `warn`은 필수 검사는 모두 통과했으나 **권장** 검사(라이선스 또는 해시 커버리지)가 기준에 못 미친 경우이며, `pass`는 모든 검사를 통과한 경우입니다. +- **필수 검사**: 타임스탬프, 도구 정보, name·version을 가진 최상위 컴포넌트, 컴포넌트 name+version 100%, PURL 커버리지가 `SBOM_CONFORMANCE_PURL_MIN_PCT`(기본 `90`) 이상, `pkg:generic` 자리표시자 없음, 전이 의존성 그래프 존재. +- **권장 검사**(warn만): 라이선스 커버리지가 `SBOM_CONFORMANCE_LICENSE_MIN_PCT`(기본 `80`) 이상, 해시 커버리지가 `SBOM_CONFORMANCE_HASH_MIN_PCT`(기본 `50`) 이상. +- `fail` 결과여도 인제스트를 **중단하지 않습니다** — TRUSCA는 CVE 매칭과 라이선스 분류를 그대로 수행하므로 구체적 사유와 함께 부분 결과를 얻습니다. 공급사의 SBOM을 받아들일지 반려할지 판단하는 근거로 씁니다. +- `purl_coverage_pct`·`license_coverage_pct`·`hash_coverage_pct`는 SPDX Tag-Value 문서에서는 `null`입니다. Tag-Value는 패키지별 커버리지가 아니라 존재 여부로 채점하기 때문입니다. + +여기서 `404`는 프로젝트에 접근할 수 없거나, 해당 스캔에 아직 결과가 없다는 뜻입니다(인제스트된 SBOM 스캔이 아니거나, 인제스트가 적합성 단계에 도달하지 않음). + ## 동작 확인 스캔이 `succeeded`에 도달한 다음: