Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions apps/backend/alembic/versions/0033_sbom_conformance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""sbom_conformance — received-SBOM quality verdict (model 3)

Revision ID: 0033
Revises: 0032
Created: 2026-06-14

Phase: sbom-ingest (conformance)
Kind: schema
Forward-only: yes

What:
- Create table ``sbom_conformance``::
id UUID PK DEFAULT gen_random_uuid()
scan_id UUID NOT NULL UNIQUE REFERENCES scans(id) ON DELETE CASCADE
project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE
source_format VARCHAR(16) NOT NULL -- cyclonedx|spdx-json|spdx-tv|unknown
result VARCHAR(8) NOT NULL -- pass|warn|fail
n_fail INTEGER NOT NULL DEFAULT 0
n_warn INTEGER NOT NULL DEFAULT 0
component_count INTEGER NOT NULL DEFAULT 0
purl_coverage_pct INTEGER -- NULL for SPDX Tag-Value
license_coverage_pct INTEGER
hash_coverage_pct INTEGER
checks JSONB NOT NULL DEFAULT '[]'::jsonb
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
- Index ``ix_sbom_conformance_project_id`` (project_id) for the
"Received SBOM" tenant-scoped list. The UNIQUE on ``scan_id`` already
backs the per-scan lookup (the conformance read endpoint) with an index.

Why:
- An uploaded SBOM is scored for quality (services/sbom_conformance.py) before
and regardless of CVE matching, so the portal can show a pass/warn/fail
badge + per-check table and a supplier can be rejected with concrete
reasons. ``result`` / coverage are queryable columns (list filtering /
sorting); the full per-check array lives in ``checks`` (JSONB) for the
detail view. A dedicated table (vs. scan_metadata JSONB) keeps those
columns first-class and avoids the 16 KiB metadata ceiling.

Re-run idempotency:
- ``scan_id`` is UNIQUE; the ingest Celery task deletes any prior row for the
scan before inserting, so a Celery ``acks_late`` re-entry replaces rather
than duplicates. No UPDATE path; a correction is a fresh row.

Cascade policy:
- ``scan_id`` CASCADE — the verdict is meaningless without its scan.
- ``project_id`` CASCADE — mirrors the scan→project lifecycle; a project
delete cleans its verdicts.

Notes:
- Forward-only per CLAUDE.md §6: ``downgrade()`` raises NotImplementedError.
- No native ENUM: ``result`` / ``source_format`` vocabularies are owned by the
scorer (VARCHAR keeps tweaks migration-free; the FE mirrors the check-id
catalogue under a contract test).
- JSONB ``checks`` is read whole per row (detail view), never filtered by its
interior → no GIN index needed.
- No data migration — new empty table.
"""

from __future__ import annotations

from collections.abc import Sequence

import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

from alembic import op

revision: str = "0033"
down_revision: str | None = "0032"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None

UUID_PK = postgresql.UUID(as_uuid=True)
GEN_UUID = sa.text("gen_random_uuid()")
NOW = sa.text("now()")
EMPTY_JSONB_ARR = sa.text("'[]'::jsonb")


def upgrade() -> None:
op.create_table(
"sbom_conformance",
sa.Column("id", UUID_PK, primary_key=True, server_default=GEN_UUID),
sa.Column("scan_id", UUID_PK, nullable=False),
sa.Column("project_id", UUID_PK, nullable=False),
sa.Column("source_format", sa.String(length=16), nullable=False),
sa.Column("result", sa.String(length=8), nullable=False),
sa.Column("n_fail", sa.Integer(), nullable=False, server_default=sa.text("0")),
sa.Column("n_warn", sa.Integer(), nullable=False, server_default=sa.text("0")),
sa.Column(
"component_count", sa.Integer(), nullable=False, server_default=sa.text("0")
),
sa.Column("purl_coverage_pct", sa.Integer(), nullable=True),
sa.Column("license_coverage_pct", sa.Integer(), nullable=True),
sa.Column("hash_coverage_pct", sa.Integer(), nullable=True),
sa.Column(
"checks",
postgresql.JSONB(astext_type=sa.Text()),
nullable=False,
server_default=EMPTY_JSONB_ARR,
),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=NOW,
),
sa.ForeignKeyConstraint(
["scan_id"],
["scans.id"],
name="fk_sbom_conformance_scan_id",
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["project_id"],
["projects.id"],
name="fk_sbom_conformance_project_id",
ondelete="CASCADE",
),
# One verdict per scan; also backs the per-scan read endpoint with an
# index (UNIQUE implies an index).
sa.UniqueConstraint("scan_id", name="uq_sbom_conformance_scan_id"),
)

# Tenant-scoped "Received SBOM" list ("conformance verdicts for my project").
op.create_index(
"ix_sbom_conformance_project_id",
"sbom_conformance",
["project_id"],
)


def downgrade() -> None:
raise NotImplementedError("downgrade is not supported (forward-only policy)")
75 changes: 74 additions & 1 deletion apps/backend/api/v1/sbom.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
UploadFile,
status,
)
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from core.api_key_auth import require_role_or_api_key
Expand All @@ -41,7 +42,8 @@
from core.errors import problem_response
from core.ratelimit import _authenticated_user_key, limiter
from core.security import CurrentUser, require_role
from models import Project
from models import Project, SbomConformance
from schemas.sbom import SbomConformanceRead
from schemas.scan import ScanPublic
from services.project_service import (
ProjectError,
Expand Down Expand Up @@ -820,6 +822,77 @@ async def ingest_sbom_endpoint(
)


@router.get(
"/projects/{project_id}/scans/{scan_id}/conformance",
response_model=SbomConformanceRead,
summary="Get the conformance verdict for an ingested SBOM scan",
responses={
200: {
"description": "The SBOM conformance verdict (pass/warn/fail + per-check detail).",
"content": {"application/json": {}},
},
401: {"description": "Authentication required"},
404: {
"description": (
"Project not accessible (existence-hidden), or no conformance verdict "
"exists for this scan (not an ingested SBOM scan, or still queued). "
"RFC 7807."
),
"content": {"application/problem+json": {}},
},
},
)
async def get_sbom_conformance_endpoint(
request: Request,
project_id: uuid.UUID,
scan_id: uuid.UUID,
session: AsyncSession = Depends(get_db),
actor: CurrentUser = Depends(require_role_or_api_key("developer")),
) -> Response:
# Same auth + IDOR guard as the rest of the SBOM surface: an outsider sees
# 404 (existence-hide), and the cross-team audit entry fires.
project = await _resolve_project_or_problem(
request,
project_id=project_id,
session=session,
actor=actor,
resource="sbom_conformance",
)
if isinstance(project, Response):
return project

# The verdict row carries the denormalised project_id, so the scan-belongs-
# to-project check is a single predicate (a cross-project scan_id yields no
# row → the same 404 as an unknown scan). No conformance row also means the
# scan is not an ingested SBOM scan, or its ingest task has not reached the
# conformance stage yet.
row = (
await session.execute(
select(SbomConformance).where(
SbomConformance.scan_id == scan_id,
SbomConformance.project_id == project_id,
)
)
).scalar_one_or_none()
if row is None:
return problem_response(
status_code=status.HTTP_404_NOT_FOUND,
title="Conformance Verdict Not Found",
detail=(
"No SBOM conformance verdict exists for this scan. The scan may not be "
"an ingested SBOM scan, or its ingest may still be in progress."
),
instance=request.url.path,
)

body = SbomConformanceRead.model_validate(row)
return Response(
content=body.model_dump_json(),
status_code=status.HTTP_200_OK,
media_type="application/json",
)


# slowapi's ``@limiter.shared_limit`` wraps the endpoint with functools.wraps,
# whose ``__globals__`` is slowapi's module. Under ``from __future__ import
# annotations`` FastAPI's ``get_type_hints()`` on the wrapper cannot resolve our
Expand Down
4 changes: 4 additions & 0 deletions apps/backend/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class Base(DeclarativeBase):
REPORT_TYPE_VALUES,
ReportDownload,
)
from .sbom_conformance import ( # noqa: E402,F401 (imported for metadata side effects)
SbomConformance,
)
from .scan import ( # noqa: E402,F401 (imported for metadata side effects)
Component,
ComponentDependencyEdge,
Expand Down Expand Up @@ -109,6 +112,7 @@ class Base(DeclarativeBase):
"RefreshToken",
"RemediationPullRequest",
"ReportDownload",
"SbomConformance",
"Scan",
"ScanArtifact",
"ScanComponent",
Expand Down
134 changes: 134 additions & 0 deletions apps/backend/models/sbom_conformance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""
Received-SBOM conformance result model — model 3 (supplier-submitted SBOM ingest).

Table:
- ``sbom_conformance`` — exactly one row per ingested ``Scan`` (``kind='sbom'``),
holding the quality-scoring verdict computed by
:mod:`services.sbom_conformance` from the uploaded document's ORIGINAL bytes
(before any CycloneDX normalisation). The portal renders this as a
pass / warn / fail badge plus a per-check table on the scan, and a supplier
can be sent a rejection citing the failed mandatory checks.

Why a dedicated table (vs. ``scan_metadata`` JSONB):
- The "Received SBOM" surface filters / sorts by ``result`` and coverage, and
those are first-class queryable columns here rather than buried in the
polymorphic scan metadata blob (which also has a 16 KiB ceiling the full
per-check detail with capped ``missing[]`` lists can approach). The raw
per-check array still lives in ``checks`` (JSONB) for the detail view.

Lifecycle:
- INSERT (or REPLACE — the ingest task deletes any prior row for the scan
before inserting, so a re-run under Celery ``acks_late`` stays idempotent
against the ``uq_sbom_conformance_scan_id`` unique constraint). No UPDATE.
- Hard delete only via ON DELETE CASCADE when the parent ``scans`` row (or,
transitively, the project) is removed.

Conventions (CLAUDE.md core rules + neighbouring model files):
- PostgreSQL only. UUID PK defaults to ``gen_random_uuid()`` (pgcrypto).
- TIMESTAMPTZ ``created_at``; append-only, no ``updated_at`` (a re-run
replaces the row, it does not mutate it in place).
- ``result`` / ``source_format`` are short closed vocabularies but kept as
VARCHAR (not native ENUM): the scorer in services/sbom_conformance.py owns
the value set, the FE mirrors the check-id catalogue (a contract test keeps
them in lockstep), and an ``ALTER TYPE ADD VALUE`` per tweak is more
friction than value here.
- No environment access at import time (CLAUDE.md core rule #11).

Cross-domain relationships:
- FK columns reference ``scans.id`` / ``projects.id`` but this module adds no
ORM ``relationship()`` edge back into ``scan.py`` (one-way dependency, same
pattern as report_download.py). Callers query explicitly.
"""

from __future__ import annotations

import uuid
from datetime import datetime
from typing import Any

from sqlalchemy import (
DateTime,
ForeignKey,
Integer,
String,
text,
)
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
from sqlalchemy.orm import Mapped, mapped_column

from . import Base

UUID_PK = PG_UUID(as_uuid=True)
GEN_UUID = text("gen_random_uuid()")
NOW = text("now()")
EMPTY_JSONB_ARR = text("'[]'::jsonb")

# Closed vocabularies owned by services.sbom_conformance (kept as VARCHAR, see
# module docstring). Mirrored here only for documentation / test reference.
RESULT_VALUES = ("pass", "warn", "fail")
SOURCE_FORMAT_VALUES = ("cyclonedx", "spdx-json", "spdx-tv", "unknown")


class SbomConformance(Base):
"""One conformance verdict for an ingested SBOM scan."""

__tablename__ = "sbom_conformance"

id: Mapped[uuid.UUID] = mapped_column(
UUID_PK, primary_key=True, server_default=GEN_UUID
)

# One verdict per scan. UNIQUE so the ingest task's delete-then-insert
# re-run path can rely on at-most-one row, and a stray double-insert is a
# DB-level error rather than a silent duplicate.
scan_id: Mapped[uuid.UUID] = mapped_column(
UUID_PK,
ForeignKey("scans.id", ondelete="CASCADE"),
nullable=False,
unique=True,
)

# Denormalised tenant/owner pointer (mirrors the scan's project) so the
# "Received SBOM" list can filter without joining scans.
project_id: Mapped[uuid.UUID] = mapped_column(
UUID_PK,
ForeignKey("projects.id", ondelete="CASCADE"),
nullable=False,
)

# Detected serialisation: cyclonedx | spdx-json | spdx-tv | unknown.
source_format: Mapped[str] = mapped_column(String(16), nullable=False)

# Overall verdict: pass | warn | fail. ``warn`` = all mandatory checks pass
# but a recommended (license / hash coverage) check fell short.
result: Mapped[str] = mapped_column(String(8), nullable=False)

n_fail: Mapped[int] = mapped_column(Integer, nullable=False, server_default=text("0"))
n_warn: Mapped[int] = mapped_column(Integer, nullable=False, server_default=text("0"))
component_count: Mapped[int] = mapped_column(
Integer, nullable=False, server_default=text("0")
)

# Coverage percentages (0-100). NULL for SPDX Tag-Value, which is scored on
# presence only (per-package coverage is not computed for Tag-Value).
purl_coverage_pct: Mapped[int | None] = mapped_column(Integer, nullable=True)
license_coverage_pct: Mapped[int | None] = mapped_column(Integer, nullable=True)
hash_coverage_pct: Mapped[int | None] = mapped_column(Integer, nullable=True)

# Per-check detail array (each: id/label/required/status/detail/missing[]),
# ``missing[]`` capped at 50 by the scorer. Drives the detail table.
checks: Mapped[list[dict[str, Any]]] = mapped_column(
JSONB, nullable=False, server_default=EMPTY_JSONB_ARR
)

created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=NOW
)


__all__ = [
"RESULT_VALUES",
"SOURCE_FORMAT_VALUES",
"SbomConformance",
]
Loading
Loading