diff --git a/apps/backend/tasks/_scan_pipeline.py b/apps/backend/tasks/_scan_pipeline.py new file mode 100644 index 00000000..82f98489 --- /dev/null +++ b/apps/backend/tasks/_scan_pipeline.py @@ -0,0 +1,129 @@ +""" +Shared scan-pipeline orchestration helpers. + +These are the self-contained terminal-state writers and the per-stage +progress writer that the source scan pipeline (:mod:`tasks.scan_source`) +uses to drive a :class:`models.Scan` row through ``running`` → +``succeeded`` / ``failed`` while fanning out WebSocket progress frames. + +They were extracted verbatim from ``tasks.scan_source`` so a future +SBOM-ingest Celery task can reuse them through a clean public seam — no +``from tasks.scan_source import _private_name`` cross-module reach into a +sibling task module. + +Behaviour is byte-identical to the original ``scan_source`` privates: + + - ``mark_failed`` (was ``_mark_failed``) + - ``record_terminal_failure`` (was ``_record_terminal_failure``) + - ``mark_succeeded`` (was ``_mark_succeeded``) + - ``set_stage`` (was ``_set_stage``) + +``set_stage`` is the one generalisation: the original ``_set_stage`` pulled +its percent from ``scan_source._STAGE_PROGRESS`` — a mapping that is specific +to the source pipeline and does not belong in a shared module. ``set_stage`` +takes ``percent`` as an explicit argument instead. The caller passes +``_STAGE_PROGRESS.get(stage)`` so the original behaviour is preserved exactly: + + - a *known* stage → its mapped int percent (DB + log + publish); + - an *unknown* stage → ``percent=None`` → the row keeps its prior + ``progress_percent`` (matching the original ``.get(stage, + scan.progress_percent)`` fallback), the log line carries the raw ``None`` + (matching the original ``_STAGE_PROGRESS.get(stage)`` log value), and the + published frame carries the committed (prior) percent. + +Import-cycle note: this module depends only on ``models``, ``core.db``, +``tasks._progress`` and ``tasks.scan_retention``. None of those import this +module or ``tasks.scan_source`` at module top, so importing this from +``scan_source`` does not create a cycle (verified at extraction time). +""" + +from __future__ import annotations + +import uuid +from datetime import UTC, datetime + +import structlog +from sqlalchemy.orm import Session + +from core.db import sync_session_scope +from models import Scan +from tasks._progress import publish_progress +from tasks.scan_retention import supersede_prior_ref_scans + +log = structlog.get_logger("tasks.scan_pipeline") + + +def mark_failed(session: Session, scan: Scan, message: str) -> None: + scan.status = "failed" + scan.error_message = message + scan.completed_at = datetime.now(UTC) + session.commit() + # Snapshot the percent under the row (defaults to 0 when None — protects + # against an early-failure path where progress was never initialised). + last_percent = scan.progress_percent or 0 + publish_progress(scan.id, step="failed", percent=last_percent) + + +def record_terminal_failure(scan_uuid: uuid.UUID, message: str) -> None: + with sync_session_scope() as session: + scan = session.get(Scan, scan_uuid) + if scan is None: + return + mark_failed(session, scan, message) + + +def mark_succeeded(scan_uuid: uuid.UUID) -> None: + with sync_session_scope() as session: + scan = session.get(Scan, scan_uuid) + if scan is None: + return + scan.status = "succeeded" + scan.progress_percent = 100 + scan.current_step = "finalize" + scan.completed_at = datetime.now(UTC) + # scan-retention Layer 1: this scan is now the live snapshot for its + # ref, so prior succeeded same-ref scans (without an explicit release + # label) are superseded in the same transaction. No-op when the scan + # carries no ref — those are reclaimed by the keep-last/max-age sweep. + supersede_prior_ref_scans( + session, + project_id=scan.project_id, + winner_scan_id=scan.id, + ref=scan.ref, + ) + session.commit() + publish_progress(scan_uuid, step="succeeded", percent=100) + + +def set_stage(scan_uuid: uuid.UUID, stage: str, percent: int | None) -> None: + """Advance a scan to ``stage`` and fan out the progress frame. + + ``percent`` is the stage's progress percent, supplied explicitly by the + caller (the source pipeline derives it from ``_STAGE_PROGRESS.get(stage)``). + When ``percent`` is ``None`` the row keeps its existing ``progress_percent`` + — this preserves the original ``_set_stage`` fallback for an unmapped + stage. The log line carries the raw ``percent`` value (``None`` for an + unmapped stage, mirroring the original ``_STAGE_PROGRESS.get(stage)`` log + value). The publish happens AFTER the DB commit so a subscriber that reads + the row on receipt sees the same state as the published payload. + """ + with sync_session_scope() as session: + scan = session.get(Scan, scan_uuid) + if scan is None: + return + scan.current_step = stage + scan.progress_percent = percent if percent is not None else scan.progress_percent + session.commit() + committed_percent = scan.progress_percent or 0 + log.info("scan_stage", stage=stage, percent=percent) + # Publish AFTER the DB commit so a subscriber that reads the row on + # receipt sees the same state as the published payload. + publish_progress(scan_uuid, step=stage, percent=committed_percent) + + +__all__ = [ + "mark_failed", + "mark_succeeded", + "record_terminal_failure", + "set_stage", +] diff --git a/apps/backend/tasks/scan_source.py b/apps/backend/tasks/scan_source.py index 8df40b7d..cdf08434 100644 --- a/apps/backend/tasks/scan_source.py +++ b/apps/backend/tasks/scan_source.py @@ -13,7 +13,7 @@ CycloneDX SBOM to ``ort evaluate --ort-file``, which expects an OrtResult JSON, and aborted every scan with a KotlinInvalidNullException; we had been swallowing that with a try/except). License classification for *third-party* dependencies -remains *declared* (cdxgen package metadata, persisted in ``_persist_components`` +remains *declared* (cdxgen package metadata, persisted in ``persist_sbom_components`` → ``_persist_component_licenses``). PR-A2 adds *detected* license findings for *first-party* source via scancode — third-party dependency sources are NOT downloaded (that deep-scan path is out of scope; it would blow the budget). @@ -121,14 +121,18 @@ from services.vulnerability_matching import persist_trivy_findings from tasks._progress import ( close_log_file, - publish_progress, reset_log_counter, ) from tasks._progress import ( make_line_callback as _make_line_callback, ) +from tasks._scan_pipeline import ( + mark_failed, + mark_succeeded, + record_terminal_failure, + set_stage, +) from tasks.celery_app import celery_app -from tasks.scan_retention import supersede_prior_ref_scans log = structlog.get_logger("tasks.scan_source") @@ -403,7 +407,7 @@ def _run_pipeline( # never raise from this stage onto the terminal-failure path — a missing # detected-license set is a degraded-output scenario, not a fatal one # (same philosophy as the prep stage). Third-party dependency sources are - # NOT downloaded; their licenses stay declared via _persist_components. + # NOT downloaded; their licenses stay declared via persist_sbom_components. _set_stage(scan_uuid, "scancode") scancode_detections: list[scancode_adapter.DetectedLicense] = [] # G3.1: capture the scancode result JSON path so the preservation stage can @@ -445,7 +449,7 @@ def _run_pipeline( # findings — the declared findings and component graph still commit. A # detected-license failure is degraded, never fatal. with sync_session_scope() as session: - _persist_components( + persist_sbom_components( session, scan_uuid=scan_uuid, sbom=cdxgen_result.sbom, @@ -1008,61 +1012,28 @@ def _mark_running(session: Session, scan: Scan) -> None: session.commit() -def _mark_failed(session: Session, scan: Scan, message: str) -> None: - scan.status = "failed" - scan.error_message = message - scan.completed_at = datetime.now(UTC) - session.commit() - # Snapshot the percent under the row (defaults to 0 when None — protects - # against an early-failure path where progress was never initialised). - last_percent = scan.progress_percent or 0 - publish_progress(scan.id, step="failed", percent=last_percent) - - -def _record_terminal_failure(scan_uuid: uuid.UUID, message: str) -> None: - with sync_session_scope() as session: - scan = session.get(Scan, scan_uuid) - if scan is None: - return - _mark_failed(session, scan, message) - - -def _mark_succeeded(scan_uuid: uuid.UUID) -> None: - with sync_session_scope() as session: - scan = session.get(Scan, scan_uuid) - if scan is None: - return - scan.status = "succeeded" - scan.progress_percent = 100 - scan.current_step = "finalize" - scan.completed_at = datetime.now(UTC) - # scan-retention Layer 1: this scan is now the live snapshot for its - # ref, so prior succeeded same-ref scans (without an explicit release - # label) are superseded in the same transaction. No-op when the scan - # carries no ref — those are reclaimed by the keep-last/max-age sweep. - supersede_prior_ref_scans( - session, - project_id=scan.project_id, - winner_scan_id=scan.id, - ref=scan.ref, - ) - session.commit() - publish_progress(scan_uuid, step="succeeded", percent=100) +# The terminal-state writers and the per-stage progress writer were extracted +# verbatim to ``tasks._scan_pipeline`` so a future SBOM-ingest task can reuse +# them through a public seam (no cross-module private import). We keep these +# thin module-level aliases so this module's own call sites — and the existing +# tests that ``monkeypatch.setattr(scan_source, "_record_terminal_failure", …)`` +# — keep working unchanged. ``_set_stage`` stays a wrapper because it owns the +# source-pipeline-specific ``_STAGE_PROGRESS`` mapping that the shared +# ``set_stage`` deliberately does not know about. +_mark_failed = mark_failed +_record_terminal_failure = record_terminal_failure +_mark_succeeded = mark_succeeded def _set_stage(scan_uuid: uuid.UUID, stage: str) -> None: - with sync_session_scope() as session: - scan = session.get(Scan, scan_uuid) - if scan is None: - return - scan.current_step = stage - scan.progress_percent = _STAGE_PROGRESS.get(stage, scan.progress_percent) - session.commit() - committed_percent = scan.progress_percent or 0 - log.info("scan_stage", stage=stage, percent=_STAGE_PROGRESS.get(stage)) - # Publish AFTER the DB commit so a subscriber that reads the row on - # receipt sees the same state as the published payload. - publish_progress(scan_uuid, step=stage, percent=committed_percent) + """Advance a scan to ``stage`` using this pipeline's percent mapping. + + Delegates to :func:`tasks._scan_pipeline.set_stage` with the percent + resolved from ``_STAGE_PROGRESS`` — ``None`` for an unmapped stage, which + the shared writer treats as "keep the row's prior percent" (preserving the + original ``_set_stage`` fallback exactly). + """ + set_stage(scan_uuid, stage, _STAGE_PROGRESS.get(stage)) def _persist_artifact( @@ -1872,7 +1843,7 @@ def _run_prep( ) -def _persist_components( +def persist_sbom_components( session: Session, *, scan_uuid: uuid.UUID, diff --git a/apps/backend/tests/integration/scan/test_jsonb_size_guard.py b/apps/backend/tests/integration/scan/test_jsonb_size_guard.py index 831f2fb3..1ddba771 100644 --- a/apps/backend/tests/integration/scan/test_jsonb_size_guard.py +++ b/apps/backend/tests/integration/scan/test_jsonb_size_guard.py @@ -4,7 +4,7 @@ `tasks/scan_source.py` and `tasks/scan_container.py` route every JSONB write through `enforce_jsonb_row_size_limit` before the row reaches the ORM. We don't drive the full Celery pipeline here — we drive the -`_persist_components` helper directly with a fabricated cdxgen SBOM whose +`persist_sbom_components` helper directly with a fabricated cdxgen SBOM whose single component carries a large free-form blob. The guard must replace the blob with the truncation marker before persistence. @@ -142,9 +142,9 @@ def test_oversized_component_raw_data_is_truncated_at_persist( ] } - from tasks.scan_source import _persist_components + from tasks.scan_source import persist_sbom_components - _persist_components(sync_session, scan_uuid=scan_id, sbom=sbom) + persist_sbom_components(sync_session, scan_uuid=scan_id, sbom=sbom) sync_session.commit() rows = ( @@ -183,9 +183,9 @@ def test_under_limit_component_is_persisted_intact( ] } - from tasks.scan_source import _persist_components + from tasks.scan_source import persist_sbom_components - _persist_components(sync_session, scan_uuid=scan_id, sbom=sbom) + persist_sbom_components(sync_session, scan_uuid=scan_id, sbom=sbom) sync_session.commit() rows = ( diff --git a/apps/backend/tests/unit/tasks/test_scan_source_progress_hooks.py b/apps/backend/tests/unit/tasks/test_scan_source_progress_hooks.py index 00adec5d..7eec097a 100644 --- a/apps/backend/tests/unit/tasks/test_scan_source_progress_hooks.py +++ b/apps/backend/tests/unit/tasks/test_scan_source_progress_hooks.py @@ -1,11 +1,19 @@ """ -Unit tests for the four publish_progress hook points in `tasks.scan_source`. +Unit tests for the four publish_progress hook points in the scan pipeline. The integration tests (which require Postgres) cover the full pipeline end-to-end. These unit tests use lightweight mocks to cover the publish hooks specifically, so a regression in the order ("commit then publish") or in the percent value passed to the publisher is caught without a DB. +The hook implementations were extracted to ``tasks._scan_pipeline`` (public +names ``set_stage`` / ``mark_succeeded`` / ``mark_failed`` / +``record_terminal_failure``) so an SBOM-ingest task can reuse them. The +``scan_source`` privates (``_set_stage`` etc.) remain thin aliases / wrappers +over the shared implementations, so the tests still exercise them through the +``scan_source`` seam while monkeypatching the dependencies on the module that +actually owns the implementation (``tasks._scan_pipeline``). + Pinned hook points: - ``_set_stage`` — emits step=, percent=_STAGE_PROGRESS[stage] - ``_mark_succeeded`` — emits step="succeeded", percent=100 @@ -63,7 +71,7 @@ def patch_session(monkeypatch: pytest.MonkeyPatch) -> Any: def _scope() -> Any: yield _FakeSession(scan) - monkeypatch.setattr("tasks.scan_source.sync_session_scope", _scope) + monkeypatch.setattr("tasks._scan_pipeline.sync_session_scope", _scope) return scan @@ -75,7 +83,7 @@ def captured_publishes(monkeypatch: pytest.MonkeyPatch) -> list[dict[str, Any]]: def _capture(scan_id: Any, *, step: str, percent: int) -> None: captured.append({"scan_id": scan_id, "step": step, "percent": percent}) - monkeypatch.setattr("tasks.scan_source.publish_progress", _capture) + monkeypatch.setattr("tasks._scan_pipeline.publish_progress", _capture) return captured diff --git a/apps/backend/tests/unit/tasks/test_scan_source_scope_enrichment.py b/apps/backend/tests/unit/tasks/test_scan_source_scope_enrichment.py index 4c4b34b0..99701dbd 100644 --- a/apps/backend/tests/unit/tasks/test_scan_source_scope_enrichment.py +++ b/apps/backend/tests/unit/tasks/test_scan_source_scope_enrichment.py @@ -1,4 +1,4 @@ -"""Unit tests — npm scope enrichment in ``_persist_components`` (W4-D, 2026-05-27). +"""Unit tests — npm scope enrichment in ``persist_sbom_components`` (W4-D, 2026-05-27). cdxgen 12.3.3 does not emit ``scope`` for npm components, so without intervention the Components tab's USAGE column is dash for every npm row. The @@ -52,7 +52,7 @@ def _scan_components(session: _FakeSession) -> list[ScanComponent]: @pytest.fixture def patched_helpers(monkeypatch: pytest.MonkeyPatch) -> None: - """Stub component / license helpers so ``_persist_components`` runs + """Stub component / license helpers so ``persist_sbom_components`` runs in-memory against the fake session.""" monkeypatch.setattr( "tasks.scan_source._get_or_create_component", @@ -92,7 +92,7 @@ def test_cdxgen_scope_wins_over_lockfile( ) -> None: """A cdxgen-supplied scope (e.g. Maven POM ````) is NOT overridden by the lockfile. Maven emits scope reliably; the lockfile is npm-only.""" - from tasks.scan_source import _persist_components + from tasks.scan_source import persist_sbom_components # cdxgen says compile (Maven); lockfile would (wrongly) say dev. cdxgen wins. monkeypatch.setattr( @@ -114,7 +114,7 @@ def test_cdxgen_scope_wins_over_lockfile( } ] } - _persist_components(session, scan_uuid=uuid.uuid4(), sbom=sbom, source_dir=tmp_path) + persist_sbom_components(session, scan_uuid=uuid.uuid4(), sbom=sbom, source_dir=tmp_path) scs = _scan_components(session) assert len(scs) == 1 assert scs[0].dependency_scope == "compile" @@ -130,7 +130,7 @@ def test_npm_component_uses_lockfile_scope_when_cdxgen_missing( This is the dominant 2026-05-26 P3 #12 gap: cdxgen 12.3.3 emits npm components with no ``scope`` field, leaving the UI USAGE column at dash for every npm row. The lockfile-derived scope closes that gap.""" - from tasks.scan_source import _persist_components + from tasks.scan_source import persist_sbom_components monkeypatch.setattr( "tasks.scan_source.read_lockfile", @@ -160,7 +160,7 @@ def test_npm_component_uses_lockfile_scope_when_cdxgen_missing( }, ] } - _persist_components(session, scan_uuid=uuid.uuid4(), sbom=sbom, source_dir=tmp_path) + persist_sbom_components(session, scan_uuid=uuid.uuid4(), sbom=sbom, source_dir=tmp_path) scs = _scan_components(session) scopes = {sc.dependency_scope for sc in scs} assert scopes == {"required", "dev"} @@ -177,7 +177,7 @@ def test_non_npm_component_never_consults_lockfile( This protects the trust boundary: the lockfile is attacker-controllable but its enrichment scope is bounded to npm purls.""" - from tasks.scan_source import _persist_components + from tasks.scan_source import persist_sbom_components monkeypatch.setattr( "tasks.scan_source.read_lockfile", @@ -200,7 +200,7 @@ def test_non_npm_component_never_consults_lockfile( }, ] } - _persist_components(session, scan_uuid=uuid.uuid4(), sbom=sbom, source_dir=tmp_path) + persist_sbom_components(session, scan_uuid=uuid.uuid4(), sbom=sbom, source_dir=tmp_path) scs = _scan_components(session) assert len(scs) == 1 assert scs[0].dependency_scope is None # NOT "dev" — type-bound @@ -213,7 +213,7 @@ def test_no_lockfile_means_scope_stays_null( ) -> None: """No package-lock.json on disk → ``read_lockfile`` returns None → the behaviour is identical to pre-W4-D: cdxgen scope or NULL.""" - from tasks.scan_source import _persist_components + from tasks.scan_source import persist_sbom_components monkeypatch.setattr("tasks.scan_source.read_lockfile", lambda src: None) session = _FakeSession() @@ -227,7 +227,7 @@ def test_no_lockfile_means_scope_stays_null( }, ] } - _persist_components(session, scan_uuid=uuid.uuid4(), sbom=sbom, source_dir=tmp_path) + persist_sbom_components(session, scan_uuid=uuid.uuid4(), sbom=sbom, source_dir=tmp_path) scs = _scan_components(session) assert scs[0].dependency_scope is None @@ -238,7 +238,7 @@ def test_source_dir_none_skips_lockfile_load( ) -> None: """When the caller does not pass ``source_dir`` (legacy call sites / tests), the lockfile loader is NOT called. Behaviour is identical to pre-W4-D.""" - from tasks.scan_source import _persist_components + from tasks.scan_source import persist_sbom_components calls: list[Any] = [] @@ -258,7 +258,7 @@ def _spy(source_dir: Path) -> None: } ] } - _persist_components(session, scan_uuid=uuid.uuid4(), sbom=sbom) + persist_sbom_components(session, scan_uuid=uuid.uuid4(), sbom=sbom) assert calls == [] # loader never invoked when source_dir is None @@ -267,10 +267,10 @@ def test_lockfile_loaded_exactly_once_per_persist_call( patched_helpers: None, monkeypatch: pytest.MonkeyPatch, ) -> None: - """The lockfile is loaded once at the top of ``_persist_components`` — NOT + """The lockfile is loaded once at the top of ``persist_sbom_components`` — NOT re-loaded per component. A 1000-component scan must not re-parse the lockfile 1000 times.""" - from tasks.scan_source import _persist_components + from tasks.scan_source import persist_sbom_components load_count = 0 @@ -291,7 +291,7 @@ def _counting_loader(source_dir: Path) -> NpmLockfileData | None: } for i in range(50) ] - _persist_components( + persist_sbom_components( session, scan_uuid=uuid.uuid4(), sbom={"components": components}, source_dir=tmp_path ) assert load_count == 1 # parsed once, used for every lookup @@ -306,7 +306,7 @@ def test_empty_string_cdxgen_scope_falls_back_to_lockfile( Some cdxgen versions emit ``scope: ""`` for unresolved entries; an empty string is not a useful classification and the lockfile should fill in.""" - from tasks.scan_source import _persist_components + from tasks.scan_source import persist_sbom_components monkeypatch.setattr( "tasks.scan_source.read_lockfile", @@ -327,6 +327,6 @@ def test_empty_string_cdxgen_scope_falls_back_to_lockfile( } ] } - _persist_components(session, scan_uuid=uuid.uuid4(), sbom=sbom, source_dir=tmp_path) + persist_sbom_components(session, scan_uuid=uuid.uuid4(), sbom=sbom, source_dir=tmp_path) scs = _scan_components(session) assert scs[0].dependency_scope == "required"