From 034d531ae24ce7ea6cb465e5681b331eee0d44c5 Mon Sep 17 00:00:00 2001 From: David Poblador i Garcia Date: Thu, 2 Apr 2026 16:42:03 +0200 Subject: [PATCH] fix: abort sync when child manifest blobs are missing locally Previously, the sync worker would silently skip missing blobs and push child manifests upstream without them, resulting in broken images. Now it validates all blobs exist locally before pushing anything, and raises SyncMissingBlobsError so the sync retries with backoff. Also fixes the blob count log to include child manifest blobs. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/switchyard/sync_worker.py | 50 +++++++++++++++++++++-------------- tests/test_sync_worker.py | 41 +++++++++++++++++++++++++++- uv.lock | 2 +- 3 files changed, 71 insertions(+), 22 deletions(-) diff --git a/src/switchyard/sync_worker.py b/src/switchyard/sync_worker.py index 47e0a7d..3731007 100644 --- a/src/switchyard/sync_worker.py +++ b/src/switchyard/sync_worker.py @@ -14,6 +14,14 @@ log = logger.bind(component="sync") +class SyncMissingBlobsError(Exception): + """Raised when blobs referenced by a manifest are not available locally.""" + + def __init__(self, missing: list[str]) -> None: + self.missing = missing + super().__init__(f"Missing {len(missing)} blob(s): {', '.join(d[:19] for d in missing)}") + + async def sync_one( marker: SyncMarker, storage: Storage, @@ -39,17 +47,11 @@ async def sync_one( body, content_type = result - # Parse manifest to extract layer digests - digests = _extract_blob_digests(body) + # Collect all blobs and child manifests, checking for missing blobs before + # pushing anything upstream. + all_blobs: list[str] = _extract_blob_digests(body) + children: list[tuple[str, bytes, str]] = [] - # Push each blob that doesn't exist upstream - for digest in digests: - if await storage.has_blob(digest): - await upstream.push_blob_streaming(name, digest, storage.stream_blob(digest)) - else: - log.warning("Blob {} referenced by manifest but missing locally", digest[:19]) - - # For image indexes, push child manifests before the index itself for child_digest in _extract_child_manifests(body): child = await storage.get_manifest(name, child_digest) if child is None: @@ -59,23 +61,31 @@ async def sync_one( ) continue child_body, child_ct = child + children.append((child_digest, child_body, child_ct)) + all_blobs.extend(_extract_blob_digests(child_body)) + + missing = [d for d in all_blobs if not await storage.has_blob(d)] + if missing: + for digest in missing: + log.warning("Blob {} referenced by manifest but missing locally", digest[:19]) + raise SyncMissingBlobsError(missing) - # Push blobs referenced by the child manifest - for blob_digest in _extract_blob_digests(child_body): - if await storage.has_blob(blob_digest): - await upstream.push_blob_streaming( - name, blob_digest, storage.stream_blob(blob_digest) - ) - else: - log.warning("Blob {} referenced by child manifest but missing locally", blob_digest[:19]) + # Push blobs + pushed_blobs: set[str] = set() + for digest in all_blobs: + if digest not in pushed_blobs: + await upstream.push_blob_streaming(name, digest, storage.stream_blob(digest)) + pushed_blobs.add(digest) + # Push child manifests before the index + for child_digest, child_body, child_ct in children: await upstream.push_manifest(name, child_digest, child_body, child_ct) - # Push the manifest + # Push the top-level manifest await upstream.push_manifest(name, reference, body, content_type) await queue.mark_done(marker) - log.info("Synced {name}:{ref} ({n} blobs)", name=name, ref=reference, n=len(digests)) + log.info("Synced {name}:{ref} ({n} blobs)", name=name, ref=reference, n=len(pushed_blobs)) def _extract_blob_digests(manifest_body: bytes) -> list[str]: diff --git a/tests/test_sync_worker.py b/tests/test_sync_worker.py index 2fa7ce6..246cd2d 100644 --- a/tests/test_sync_worker.py +++ b/tests/test_sync_worker.py @@ -6,12 +6,18 @@ import json from pathlib import Path +import pytest import respx from httpx import Response from switchyard.storage import Storage from switchyard.sync_queue import SyncQueue -from switchyard.sync_worker import _extract_blob_digests, _extract_child_manifests, sync_one +from switchyard.sync_worker import ( + SyncMissingBlobsError, + _extract_blob_digests, + _extract_child_manifests, + sync_one, +) from switchyard.upstream import UpstreamClient BASE = "https://central:5000" @@ -243,3 +249,36 @@ def _record_manifest_put(request: respx.MockRequest) -> Response: remaining = await queue.list_pending() assert len(remaining) == 0 + + +@respx.mock +async def test_sync_one_fails_when_child_manifest_blobs_missing(tmp_path: Path) -> None: + """Sync must fail when a child manifest references blobs not stored locally.""" + storage = Storage(str(tmp_path)) + await storage.init() + queue = SyncQueue(str(tmp_path)) + await queue.init() + + # Store a child manifest referencing a blob we DON'T store locally + missing_blob = "sha256:deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef" + child_body = _make_manifest([missing_blob]) + child_ct = "application/vnd.oci.image.manifest.v1+json" + child_digest = f"sha256:{hashlib.sha256(child_body).hexdigest()}" + await storage.store_manifest("myapp", child_digest, child_body, child_ct) + + # Store an image index referencing the child + index_body = _make_index([child_digest]) + index_ct = "application/vnd.oci.image.index.v1+json" + await storage.store_manifest("myapp", "latest", index_body, index_ct) + + await queue.enqueue("myapp", "latest") + pending = await queue.list_pending() + + upstream = UpstreamClient(BASE) + with pytest.raises(SyncMissingBlobsError): + await sync_one(pending[0], storage, queue, upstream) + await upstream.close() + + # Marker should NOT be cleared (sync failed) + remaining = await queue.list_pending() + assert len(remaining) == 1 diff --git a/uv.lock b/uv.lock index 9539af3..ec800bb 100644 --- a/uv.lock +++ b/uv.lock @@ -340,7 +340,7 @@ wheels = [ [[package]] name = "switchyard" -version = "0.1.7" +version = "0.1.8" source = { editable = "." } dependencies = [ { name = "granian", extra = ["uvloop"] },