From 0b5a7428ed6288a43f103f76c6ec7cace068a4c8 Mon Sep 17 00:00:00 2001 From: David Poblador i Garcia Date: Thu, 2 Apr 2026 14:06:07 +0200 Subject: [PATCH] fix: push child manifests before image index during upstream sync When syncing an OCI image index (multi-arch manifest), the upstream registry rejects the index with 400 if referenced child manifests don't exist yet. Now sync_one detects image indexes and pushes each child manifest (and its blobs) before the index itself. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/switchyard/sync_worker.py | 46 ++++++++++++++++ tests/test_sync_worker.py | 99 ++++++++++++++++++++++++++++++++++- 2 files changed, 144 insertions(+), 1 deletion(-) diff --git a/src/switchyard/sync_worker.py b/src/switchyard/sync_worker.py index 6fa1636..47e0a7d 100644 --- a/src/switchyard/sync_worker.py +++ b/src/switchyard/sync_worker.py @@ -49,6 +49,28 @@ async def sync_one( else: log.warning("Blob {} referenced by manifest but missing locally", digest[:19]) + # For image indexes, push child manifests before the index itself + for child_digest in _extract_child_manifests(body): + child = await storage.get_manifest(name, child_digest) + if child is None: + log.warning( + "Child manifest {} referenced by index but missing locally", + child_digest[:19], + ) + continue + child_body, child_ct = child + + # Push blobs referenced by the child manifest + for blob_digest in _extract_blob_digests(child_body): + if await storage.has_blob(blob_digest): + await upstream.push_blob_streaming( + name, blob_digest, storage.stream_blob(blob_digest) + ) + else: + log.warning("Blob {} referenced by child manifest but missing locally", blob_digest[:19]) + + await upstream.push_manifest(name, child_digest, child_body, child_ct) + # Push the manifest await upstream.push_manifest(name, reference, body, content_type) @@ -79,6 +101,30 @@ def _extract_blob_digests(manifest_body: bytes) -> list[str]: return digests +_INDEX_MEDIA_TYPES = { + "application/vnd.oci.image.index.v1+json", + "application/vnd.docker.distribution.manifest.list.v2+json", +} + + +def _extract_child_manifests(manifest_body: bytes) -> list[str]: + """Extract child manifest digests from an image index / manifest list.""" + try: + manifest = json.loads(manifest_body) + except json.JSONDecodeError: + return [] + + media_type = manifest.get("mediaType", "") + if media_type not in _INDEX_MEDIA_TYPES: + return [] + + return [ + m["digest"] + for m in manifest.get("manifests", []) + if isinstance(m, dict) and "digest" in m + ] + + async def run_sync_loop( storage: Storage, queue: SyncQueue, diff --git a/tests/test_sync_worker.py b/tests/test_sync_worker.py index eb42426..2fa7ce6 100644 --- a/tests/test_sync_worker.py +++ b/tests/test_sync_worker.py @@ -11,12 +11,29 @@ from switchyard.storage import Storage from switchyard.sync_queue import SyncQueue -from switchyard.sync_worker import _extract_blob_digests, sync_one +from switchyard.sync_worker import _extract_blob_digests, _extract_child_manifests, sync_one from switchyard.upstream import UpstreamClient BASE = "https://central:5000" +def _make_index(child_digests: list[str]) -> bytes: + """Build a minimal OCI image index referencing child manifests.""" + return json.dumps({ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.index.v1+json", + "manifests": [ + { + "mediaType": "application/vnd.oci.image.manifest.v1+json", + "digest": d, + "size": 100, + "platform": {"architecture": "amd64", "os": "linux"}, + } + for d in child_digests + ], + }).encode() + + def _make_manifest(layer_digests: list[str], config_digest: str = "") -> bytes: manifest: dict[str, object] = {"schemaVersion": 2} if config_digest: @@ -146,3 +163,83 @@ async def test_sync_one_missing_manifest(tmp_path: Path) -> None: # Marker should still be cleared (manifest gone, nothing to sync) remaining = await queue.list_pending() assert len(remaining) == 0 + + +def test_extract_child_manifests_from_index() -> None: + child1 = "sha256:aaaa" + child2 = "sha256:bbbb" + body = _make_index([child1, child2]) + assert _extract_child_manifests(body) == [child1, child2] + + +def test_extract_child_manifests_from_regular_manifest() -> None: + body = _make_manifest(["sha256:layer1"], "sha256:config1") + assert _extract_child_manifests(body) == [] + + +def test_extract_child_manifests_invalid_json() -> None: + assert _extract_child_manifests(b"not json") == [] + + +@respx.mock +async def test_sync_one_pushes_child_manifests_before_index(tmp_path: Path) -> None: + """Image index sync must push child manifests to upstream before the index itself.""" + storage = Storage(str(tmp_path)) + await storage.init() + queue = SyncQueue(str(tmp_path)) + await queue.init() + + # Store a blob referenced by the child manifest + blob_data = b"layer content" + blob_digest = f"sha256:{hashlib.sha256(blob_data).hexdigest()}" + upload_id = await storage.create_upload() + await storage.append_upload(upload_id, blob_data) + await storage.store_blob_from_upload(upload_id, blob_digest) + + # Store a child platform manifest + child_body = _make_manifest([blob_digest]) + child_ct = "application/vnd.oci.image.manifest.v1+json" + child_digest = f"sha256:{hashlib.sha256(child_body).hexdigest()}" + await storage.store_manifest("myapp", child_digest, child_body, child_ct) + + # Store an image index referencing the child + index_body = _make_index([child_digest]) + index_ct = "application/vnd.oci.image.index.v1+json" + await storage.store_manifest("myapp", "latest", index_body, index_ct) + + await queue.enqueue("myapp", "latest") + pending = await queue.list_pending() + + # Mock upstream + respx.head(f"{BASE}/v2/myapp/blobs/{blob_digest}").mock( + return_value=Response(404) + ) + respx.post(f"{BASE}/v2/myapp/blobs/uploads/").mock( + return_value=Response(202, headers={"Location": f"{BASE}/v2/myapp/blobs/uploads/u1"}) + ) + respx.put(f"{BASE}/v2/myapp/blobs/uploads/u1").mock( + return_value=Response(201) + ) + manifest_puts: list[str] = [] + + def _record_manifest_put(request: respx.MockRequest) -> Response: + # Extract the reference from the URL path + ref = request.url.path.rsplit("/", 1)[-1] + manifest_puts.append(ref) + return Response(201) + + respx.put(url__regex=rf"{BASE}/v2/myapp/manifests/.+").mock( + side_effect=_record_manifest_put + ) + + upstream = UpstreamClient(BASE) + await sync_one(pending[0], storage, queue, upstream) + await upstream.close() + + # Child manifest must be pushed before the index + assert len(manifest_puts) == 2 + assert manifest_puts[0] == child_digest + assert manifest_puts[1] == "latest" + + remaining = await queue.list_pending() + assert len(remaining) == 0