Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions apps/backend/tasks/_scan_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""
Shared scan-pipeline orchestration helpers.

These are the self-contained terminal-state writers and the per-stage
progress writer that the source scan pipeline (:mod:`tasks.scan_source`)
uses to drive a :class:`models.Scan` row through ``running`` →
``succeeded`` / ``failed`` while fanning out WebSocket progress frames.

They were extracted verbatim from ``tasks.scan_source`` so a future
SBOM-ingest Celery task can reuse them through a clean public seam — no
``from tasks.scan_source import _private_name`` cross-module reach into a
sibling task module.

Behaviour is byte-identical to the original ``scan_source`` privates:

- ``mark_failed`` (was ``_mark_failed``)
- ``record_terminal_failure`` (was ``_record_terminal_failure``)
- ``mark_succeeded`` (was ``_mark_succeeded``)
- ``set_stage`` (was ``_set_stage``)

``set_stage`` is the one generalisation: the original ``_set_stage`` pulled
its percent from ``scan_source._STAGE_PROGRESS`` — a mapping that is specific
to the source pipeline and does not belong in a shared module. ``set_stage``
takes ``percent`` as an explicit argument instead. The caller passes
``_STAGE_PROGRESS.get(stage)`` so the original behaviour is preserved exactly:

- a *known* stage → its mapped int percent (DB + log + publish);
- an *unknown* stage → ``percent=None`` → the row keeps its prior
``progress_percent`` (matching the original ``.get(stage,
scan.progress_percent)`` fallback), the log line carries the raw ``None``
(matching the original ``_STAGE_PROGRESS.get(stage)`` log value), and the
published frame carries the committed (prior) percent.

Import-cycle note: this module depends only on ``models``, ``core.db``,
``tasks._progress`` and ``tasks.scan_retention``. None of those import this
module or ``tasks.scan_source`` at module top, so importing this from
``scan_source`` does not create a cycle (verified at extraction time).
"""

from __future__ import annotations

import uuid
from datetime import UTC, datetime

import structlog
from sqlalchemy.orm import Session

from core.db import sync_session_scope
from models import Scan
from tasks._progress import publish_progress
from tasks.scan_retention import supersede_prior_ref_scans

log = structlog.get_logger("tasks.scan_pipeline")


def mark_failed(session: Session, scan: Scan, message: str) -> None:
scan.status = "failed"
scan.error_message = message
scan.completed_at = datetime.now(UTC)
session.commit()
# Snapshot the percent under the row (defaults to 0 when None — protects
# against an early-failure path where progress was never initialised).
last_percent = scan.progress_percent or 0
publish_progress(scan.id, step="failed", percent=last_percent)


def record_terminal_failure(scan_uuid: uuid.UUID, message: str) -> None:
with sync_session_scope() as session:
scan = session.get(Scan, scan_uuid)
if scan is None:
return
mark_failed(session, scan, message)


def mark_succeeded(scan_uuid: uuid.UUID) -> None:
with sync_session_scope() as session:
scan = session.get(Scan, scan_uuid)
if scan is None:
return
scan.status = "succeeded"
scan.progress_percent = 100
scan.current_step = "finalize"
scan.completed_at = datetime.now(UTC)
# scan-retention Layer 1: this scan is now the live snapshot for its
# ref, so prior succeeded same-ref scans (without an explicit release
# label) are superseded in the same transaction. No-op when the scan
# carries no ref — those are reclaimed by the keep-last/max-age sweep.
supersede_prior_ref_scans(
session,
project_id=scan.project_id,
winner_scan_id=scan.id,
ref=scan.ref,
)
session.commit()
publish_progress(scan_uuid, step="succeeded", percent=100)


def set_stage(scan_uuid: uuid.UUID, stage: str, percent: int | None) -> None:
"""Advance a scan to ``stage`` and fan out the progress frame.

``percent`` is the stage's progress percent, supplied explicitly by the
caller (the source pipeline derives it from ``_STAGE_PROGRESS.get(stage)``).
When ``percent`` is ``None`` the row keeps its existing ``progress_percent``
— this preserves the original ``_set_stage`` fallback for an unmapped
stage. The log line carries the raw ``percent`` value (``None`` for an
unmapped stage, mirroring the original ``_STAGE_PROGRESS.get(stage)`` log
value). The publish happens AFTER the DB commit so a subscriber that reads
the row on receipt sees the same state as the published payload.
"""
with sync_session_scope() as session:
scan = session.get(Scan, scan_uuid)
if scan is None:
return
scan.current_step = stage
scan.progress_percent = percent if percent is not None else scan.progress_percent
session.commit()
committed_percent = scan.progress_percent or 0
log.info("scan_stage", stage=stage, percent=percent)
# Publish AFTER the DB commit so a subscriber that reads the row on
# receipt sees the same state as the published payload.
publish_progress(scan_uuid, step=stage, percent=committed_percent)


__all__ = [
"mark_failed",
"mark_succeeded",
"record_terminal_failure",
"set_stage",
]
87 changes: 29 additions & 58 deletions apps/backend/tasks/scan_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
CycloneDX SBOM to ``ort evaluate --ort-file``, which expects an OrtResult JSON,
and aborted every scan with a KotlinInvalidNullException; we had been swallowing
that with a try/except). License classification for *third-party* dependencies
remains *declared* (cdxgen package metadata, persisted in ``_persist_components``
remains *declared* (cdxgen package metadata, persisted in ``persist_sbom_components``
→ ``_persist_component_licenses``). PR-A2 adds *detected* license findings for
*first-party* source via scancode — third-party dependency sources are NOT
downloaded (that deep-scan path is out of scope; it would blow the budget).
Expand Down Expand Up @@ -121,14 +121,18 @@
from services.vulnerability_matching import persist_trivy_findings
from tasks._progress import (
close_log_file,
publish_progress,
reset_log_counter,
)
from tasks._progress import (
make_line_callback as _make_line_callback,
)
from tasks._scan_pipeline import (
mark_failed,
mark_succeeded,
record_terminal_failure,
set_stage,
)
from tasks.celery_app import celery_app
from tasks.scan_retention import supersede_prior_ref_scans

log = structlog.get_logger("tasks.scan_source")

Expand Down Expand Up @@ -403,7 +407,7 @@ def _run_pipeline(
# never raise from this stage onto the terminal-failure path — a missing
# detected-license set is a degraded-output scenario, not a fatal one
# (same philosophy as the prep stage). Third-party dependency sources are
# NOT downloaded; their licenses stay declared via _persist_components.
# NOT downloaded; their licenses stay declared via persist_sbom_components.
_set_stage(scan_uuid, "scancode")
scancode_detections: list[scancode_adapter.DetectedLicense] = []
# G3.1: capture the scancode result JSON path so the preservation stage can
Expand Down Expand Up @@ -445,7 +449,7 @@ def _run_pipeline(
# findings — the declared findings and component graph still commit. A
# detected-license failure is degraded, never fatal.
with sync_session_scope() as session:
_persist_components(
persist_sbom_components(
session,
scan_uuid=scan_uuid,
sbom=cdxgen_result.sbom,
Expand Down Expand Up @@ -1008,61 +1012,28 @@ def _mark_running(session: Session, scan: Scan) -> None:
session.commit()


def _mark_failed(session: Session, scan: Scan, message: str) -> None:
scan.status = "failed"
scan.error_message = message
scan.completed_at = datetime.now(UTC)
session.commit()
# Snapshot the percent under the row (defaults to 0 when None — protects
# against an early-failure path where progress was never initialised).
last_percent = scan.progress_percent or 0
publish_progress(scan.id, step="failed", percent=last_percent)


def _record_terminal_failure(scan_uuid: uuid.UUID, message: str) -> None:
with sync_session_scope() as session:
scan = session.get(Scan, scan_uuid)
if scan is None:
return
_mark_failed(session, scan, message)


def _mark_succeeded(scan_uuid: uuid.UUID) -> None:
with sync_session_scope() as session:
scan = session.get(Scan, scan_uuid)
if scan is None:
return
scan.status = "succeeded"
scan.progress_percent = 100
scan.current_step = "finalize"
scan.completed_at = datetime.now(UTC)
# scan-retention Layer 1: this scan is now the live snapshot for its
# ref, so prior succeeded same-ref scans (without an explicit release
# label) are superseded in the same transaction. No-op when the scan
# carries no ref — those are reclaimed by the keep-last/max-age sweep.
supersede_prior_ref_scans(
session,
project_id=scan.project_id,
winner_scan_id=scan.id,
ref=scan.ref,
)
session.commit()
publish_progress(scan_uuid, step="succeeded", percent=100)
# The terminal-state writers and the per-stage progress writer were extracted
# verbatim to ``tasks._scan_pipeline`` so a future SBOM-ingest task can reuse
# them through a public seam (no cross-module private import). We keep these
# thin module-level aliases so this module's own call sites — and the existing
# tests that ``monkeypatch.setattr(scan_source, "_record_terminal_failure", …)``
# — keep working unchanged. ``_set_stage`` stays a wrapper because it owns the
# source-pipeline-specific ``_STAGE_PROGRESS`` mapping that the shared
# ``set_stage`` deliberately does not know about.
_mark_failed = mark_failed
_record_terminal_failure = record_terminal_failure
_mark_succeeded = mark_succeeded


def _set_stage(scan_uuid: uuid.UUID, stage: str) -> None:
with sync_session_scope() as session:
scan = session.get(Scan, scan_uuid)
if scan is None:
return
scan.current_step = stage
scan.progress_percent = _STAGE_PROGRESS.get(stage, scan.progress_percent)
session.commit()
committed_percent = scan.progress_percent or 0
log.info("scan_stage", stage=stage, percent=_STAGE_PROGRESS.get(stage))
# Publish AFTER the DB commit so a subscriber that reads the row on
# receipt sees the same state as the published payload.
publish_progress(scan_uuid, step=stage, percent=committed_percent)
"""Advance a scan to ``stage`` using this pipeline's percent mapping.

Delegates to :func:`tasks._scan_pipeline.set_stage` with the percent
resolved from ``_STAGE_PROGRESS`` — ``None`` for an unmapped stage, which
the shared writer treats as "keep the row's prior percent" (preserving the
original ``_set_stage`` fallback exactly).
"""
set_stage(scan_uuid, stage, _STAGE_PROGRESS.get(stage))


def _persist_artifact(
Expand Down Expand Up @@ -1872,7 +1843,7 @@ def _run_prep(
)


def _persist_components(
def persist_sbom_components(
session: Session,
*,
scan_uuid: uuid.UUID,
Expand Down
10 changes: 5 additions & 5 deletions apps/backend/tests/integration/scan/test_jsonb_size_guard.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
`tasks/scan_source.py` and `tasks/scan_container.py` route every JSONB
write through `enforce_jsonb_row_size_limit` before the row reaches the
ORM. We don't drive the full Celery pipeline here — we drive the
`_persist_components` helper directly with a fabricated cdxgen SBOM whose
`persist_sbom_components` helper directly with a fabricated cdxgen SBOM whose
single component carries a large free-form blob. The guard must replace
the blob with the truncation marker before persistence.

Expand Down Expand Up @@ -142,9 +142,9 @@ def test_oversized_component_raw_data_is_truncated_at_persist(
]
}

from tasks.scan_source import _persist_components
from tasks.scan_source import persist_sbom_components

_persist_components(sync_session, scan_uuid=scan_id, sbom=sbom)
persist_sbom_components(sync_session, scan_uuid=scan_id, sbom=sbom)
sync_session.commit()

rows = (
Expand Down Expand Up @@ -183,9 +183,9 @@ def test_under_limit_component_is_persisted_intact(
]
}

from tasks.scan_source import _persist_components
from tasks.scan_source import persist_sbom_components

_persist_components(sync_session, scan_uuid=scan_id, sbom=sbom)
persist_sbom_components(sync_session, scan_uuid=scan_id, sbom=sbom)
sync_session.commit()

rows = (
Expand Down
14 changes: 11 additions & 3 deletions apps/backend/tests/unit/tasks/test_scan_source_progress_hooks.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
"""
Unit tests for the four publish_progress hook points in `tasks.scan_source`.
Unit tests for the four publish_progress hook points in the scan pipeline.

The integration tests (which require Postgres) cover the full pipeline
end-to-end. These unit tests use lightweight mocks to cover the publish
hooks specifically, so a regression in the order ("commit then publish")
or in the percent value passed to the publisher is caught without a DB.

The hook implementations were extracted to ``tasks._scan_pipeline`` (public
names ``set_stage`` / ``mark_succeeded`` / ``mark_failed`` /
``record_terminal_failure``) so an SBOM-ingest task can reuse them. The
``scan_source`` privates (``_set_stage`` etc.) remain thin aliases / wrappers
over the shared implementations, so the tests still exercise them through the
``scan_source`` seam while monkeypatching the dependencies on the module that
actually owns the implementation (``tasks._scan_pipeline``).

Pinned hook points:
- ``_set_stage`` — emits step=<stage>, percent=_STAGE_PROGRESS[stage]
- ``_mark_succeeded`` — emits step="succeeded", percent=100
Expand Down Expand Up @@ -63,7 +71,7 @@ def patch_session(monkeypatch: pytest.MonkeyPatch) -> Any:
def _scope() -> Any:
yield _FakeSession(scan)

monkeypatch.setattr("tasks.scan_source.sync_session_scope", _scope)
monkeypatch.setattr("tasks._scan_pipeline.sync_session_scope", _scope)
return scan


Expand All @@ -75,7 +83,7 @@ def captured_publishes(monkeypatch: pytest.MonkeyPatch) -> list[dict[str, Any]]:
def _capture(scan_id: Any, *, step: str, percent: int) -> None:
captured.append({"scan_id": scan_id, "step": step, "percent": percent})

monkeypatch.setattr("tasks.scan_source.publish_progress", _capture)
monkeypatch.setattr("tasks._scan_pipeline.publish_progress", _capture)
return captured


Expand Down
Loading
Loading