Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 30 additions & 20 deletions src/switchyard/sync_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
log = logger.bind(component="sync")


class SyncMissingBlobsError(Exception):
"""Raised when blobs referenced by a manifest are not available locally."""

def __init__(self, missing: list[str]) -> None:
self.missing = missing
super().__init__(f"Missing {len(missing)} blob(s): {', '.join(d[:19] for d in missing)}")


async def sync_one(
marker: SyncMarker,
storage: Storage,
Expand All @@ -39,17 +47,11 @@ async def sync_one(

body, content_type = result

# Parse manifest to extract layer digests
digests = _extract_blob_digests(body)
# Collect all blobs and child manifests, checking for missing blobs before
# pushing anything upstream.
all_blobs: list[str] = _extract_blob_digests(body)
children: list[tuple[str, bytes, str]] = []

# Push each blob that doesn't exist upstream
for digest in digests:
if await storage.has_blob(digest):
await upstream.push_blob_streaming(name, digest, storage.stream_blob(digest))
else:
log.warning("Blob {} referenced by manifest but missing locally", digest[:19])

# For image indexes, push child manifests before the index itself
for child_digest in _extract_child_manifests(body):
child = await storage.get_manifest(name, child_digest)
if child is None:
Expand All @@ -59,23 +61,31 @@ async def sync_one(
)
continue
child_body, child_ct = child
children.append((child_digest, child_body, child_ct))
all_blobs.extend(_extract_blob_digests(child_body))

missing = [d for d in all_blobs if not await storage.has_blob(d)]
if missing:
for digest in missing:
log.warning("Blob {} referenced by manifest but missing locally", digest[:19])
raise SyncMissingBlobsError(missing)

# Push blobs referenced by the child manifest
for blob_digest in _extract_blob_digests(child_body):
if await storage.has_blob(blob_digest):
await upstream.push_blob_streaming(
name, blob_digest, storage.stream_blob(blob_digest)
)
else:
log.warning("Blob {} referenced by child manifest but missing locally", blob_digest[:19])
# Push blobs
pushed_blobs: set[str] = set()
for digest in all_blobs:
if digest not in pushed_blobs:
await upstream.push_blob_streaming(name, digest, storage.stream_blob(digest))
pushed_blobs.add(digest)

# Push child manifests before the index
for child_digest, child_body, child_ct in children:
await upstream.push_manifest(name, child_digest, child_body, child_ct)

# Push the manifest
# Push the top-level manifest
await upstream.push_manifest(name, reference, body, content_type)

await queue.mark_done(marker)
log.info("Synced {name}:{ref} ({n} blobs)", name=name, ref=reference, n=len(digests))
log.info("Synced {name}:{ref} ({n} blobs)", name=name, ref=reference, n=len(pushed_blobs))


def _extract_blob_digests(manifest_body: bytes) -> list[str]:
Expand Down
41 changes: 40 additions & 1 deletion tests/test_sync_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@
import json
from pathlib import Path

import pytest
import respx
from httpx import Response

from switchyard.storage import Storage
from switchyard.sync_queue import SyncQueue
from switchyard.sync_worker import _extract_blob_digests, _extract_child_manifests, sync_one
from switchyard.sync_worker import (
SyncMissingBlobsError,
_extract_blob_digests,
_extract_child_manifests,
sync_one,
)
from switchyard.upstream import UpstreamClient

BASE = "https://central:5000"
Expand Down Expand Up @@ -243,3 +249,36 @@ def _record_manifest_put(request: respx.MockRequest) -> Response:

remaining = await queue.list_pending()
assert len(remaining) == 0


@respx.mock
async def test_sync_one_fails_when_child_manifest_blobs_missing(tmp_path: Path) -> None:
"""Sync must fail when a child manifest references blobs not stored locally."""
storage = Storage(str(tmp_path))
await storage.init()
queue = SyncQueue(str(tmp_path))
await queue.init()

# Store a child manifest referencing a blob we DON'T store locally
missing_blob = "sha256:deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"
child_body = _make_manifest([missing_blob])
child_ct = "application/vnd.oci.image.manifest.v1+json"
child_digest = f"sha256:{hashlib.sha256(child_body).hexdigest()}"
await storage.store_manifest("myapp", child_digest, child_body, child_ct)

# Store an image index referencing the child
index_body = _make_index([child_digest])
index_ct = "application/vnd.oci.image.index.v1+json"
await storage.store_manifest("myapp", "latest", index_body, index_ct)

await queue.enqueue("myapp", "latest")
pending = await queue.list_pending()

upstream = UpstreamClient(BASE)
with pytest.raises(SyncMissingBlobsError):
await sync_one(pending[0], storage, queue, upstream)
await upstream.close()

# Marker should NOT be cleared (sync failed)
remaining = await queue.list_pending()
assert len(remaining) == 1
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading