diff --git a/src/switchyard/sync_worker.py b/src/switchyard/sync_worker.py index 47e0a7d..3731007 100644 --- a/src/switchyard/sync_worker.py +++ b/src/switchyard/sync_worker.py @@ -14,6 +14,14 @@ log = logger.bind(component="sync") +class SyncMissingBlobsError(Exception): + """Raised when blobs referenced by a manifest are not available locally.""" + + def __init__(self, missing: list[str]) -> None: + self.missing = missing + super().__init__(f"Missing {len(missing)} blob(s): {', '.join(d[:19] for d in missing)}") + + async def sync_one( marker: SyncMarker, storage: Storage, @@ -39,17 +47,11 @@ async def sync_one( body, content_type = result - # Parse manifest to extract layer digests - digests = _extract_blob_digests(body) + # Collect all blobs and child manifests, checking for missing blobs before + # pushing anything upstream. + all_blobs: list[str] = _extract_blob_digests(body) + children: list[tuple[str, bytes, str]] = [] - # Push each blob that doesn't exist upstream - for digest in digests: - if await storage.has_blob(digest): - await upstream.push_blob_streaming(name, digest, storage.stream_blob(digest)) - else: - log.warning("Blob {} referenced by manifest but missing locally", digest[:19]) - - # For image indexes, push child manifests before the index itself for child_digest in _extract_child_manifests(body): child = await storage.get_manifest(name, child_digest) if child is None: @@ -59,23 +61,31 @@ async def sync_one( ) continue child_body, child_ct = child + children.append((child_digest, child_body, child_ct)) + all_blobs.extend(_extract_blob_digests(child_body)) + + missing = [d for d in all_blobs if not await storage.has_blob(d)] + if missing: + for digest in missing: + log.warning("Blob {} referenced by manifest but missing locally", digest[:19]) + raise SyncMissingBlobsError(missing) - # Push blobs referenced by the child manifest - for blob_digest in _extract_blob_digests(child_body): - if await storage.has_blob(blob_digest): - await upstream.push_blob_streaming( - name, blob_digest, storage.stream_blob(blob_digest) - ) - else: - log.warning("Blob {} referenced by child manifest but missing locally", blob_digest[:19]) + # Push blobs + pushed_blobs: set[str] = set() + for digest in all_blobs: + if digest not in pushed_blobs: + await upstream.push_blob_streaming(name, digest, storage.stream_blob(digest)) + pushed_blobs.add(digest) + # Push child manifests before the index + for child_digest, child_body, child_ct in children: await upstream.push_manifest(name, child_digest, child_body, child_ct) - # Push the manifest + # Push the top-level manifest await upstream.push_manifest(name, reference, body, content_type) await queue.mark_done(marker) - log.info("Synced {name}:{ref} ({n} blobs)", name=name, ref=reference, n=len(digests)) + log.info("Synced {name}:{ref} ({n} blobs)", name=name, ref=reference, n=len(pushed_blobs)) def _extract_blob_digests(manifest_body: bytes) -> list[str]: diff --git a/tests/test_sync_worker.py b/tests/test_sync_worker.py index 2fa7ce6..246cd2d 100644 --- a/tests/test_sync_worker.py +++ b/tests/test_sync_worker.py @@ -6,12 +6,18 @@ import json from pathlib import Path +import pytest import respx from httpx import Response from switchyard.storage import Storage from switchyard.sync_queue import SyncQueue -from switchyard.sync_worker import _extract_blob_digests, _extract_child_manifests, sync_one +from switchyard.sync_worker import ( + SyncMissingBlobsError, + _extract_blob_digests, + _extract_child_manifests, + sync_one, +) from switchyard.upstream import UpstreamClient BASE = "https://central:5000" @@ -243,3 +249,36 @@ def _record_manifest_put(request: respx.MockRequest) -> Response: remaining = await queue.list_pending() assert len(remaining) == 0 + + +@respx.mock +async def test_sync_one_fails_when_child_manifest_blobs_missing(tmp_path: Path) -> None: + """Sync must fail when a child manifest references blobs not stored locally.""" + storage = Storage(str(tmp_path)) + await storage.init() + queue = SyncQueue(str(tmp_path)) + await queue.init() + + # Store a child manifest referencing a blob we DON'T store locally + missing_blob = "sha256:deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef" + child_body = _make_manifest([missing_blob]) + child_ct = "application/vnd.oci.image.manifest.v1+json" + child_digest = f"sha256:{hashlib.sha256(child_body).hexdigest()}" + await storage.store_manifest("myapp", child_digest, child_body, child_ct) + + # Store an image index referencing the child + index_body = _make_index([child_digest]) + index_ct = "application/vnd.oci.image.index.v1+json" + await storage.store_manifest("myapp", "latest", index_body, index_ct) + + await queue.enqueue("myapp", "latest") + pending = await queue.list_pending() + + upstream = UpstreamClient(BASE) + with pytest.raises(SyncMissingBlobsError): + await sync_one(pending[0], storage, queue, upstream) + await upstream.close() + + # Marker should NOT be cleared (sync failed) + remaining = await queue.list_pending() + assert len(remaining) == 1 diff --git a/uv.lock b/uv.lock index 9539af3..ec800bb 100644 --- a/uv.lock +++ b/uv.lock @@ -340,7 +340,7 @@ wheels = [ [[package]] name = "switchyard" -version = "0.1.7" +version = "0.1.8" source = { editable = "." } dependencies = [ { name = "granian", extra = ["uvloop"] },