Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/switchyard/sync_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,28 @@ async def sync_one(
else:
log.warning("Blob {} referenced by manifest but missing locally", digest[:19])

# For image indexes, push child manifests before the index itself
for child_digest in _extract_child_manifests(body):
child = await storage.get_manifest(name, child_digest)
if child is None:
log.warning(
"Child manifest {} referenced by index but missing locally",
child_digest[:19],
)
continue
child_body, child_ct = child

# Push blobs referenced by the child manifest
for blob_digest in _extract_blob_digests(child_body):
if await storage.has_blob(blob_digest):
await upstream.push_blob_streaming(
name, blob_digest, storage.stream_blob(blob_digest)
)
else:
log.warning("Blob {} referenced by child manifest but missing locally", blob_digest[:19])

await upstream.push_manifest(name, child_digest, child_body, child_ct)

# Push the manifest
await upstream.push_manifest(name, reference, body, content_type)

Expand Down Expand Up @@ -79,6 +101,30 @@ def _extract_blob_digests(manifest_body: bytes) -> list[str]:
return digests


_INDEX_MEDIA_TYPES = {
"application/vnd.oci.image.index.v1+json",
"application/vnd.docker.distribution.manifest.list.v2+json",
}


def _extract_child_manifests(manifest_body: bytes) -> list[str]:
"""Extract child manifest digests from an image index / manifest list."""
try:
manifest = json.loads(manifest_body)
except json.JSONDecodeError:
return []

media_type = manifest.get("mediaType", "")
if media_type not in _INDEX_MEDIA_TYPES:
return []

return [
m["digest"]
for m in manifest.get("manifests", [])
if isinstance(m, dict) and "digest" in m
]


async def run_sync_loop(
storage: Storage,
queue: SyncQueue,
Expand Down
99 changes: 98 additions & 1 deletion tests/test_sync_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,29 @@

from switchyard.storage import Storage
from switchyard.sync_queue import SyncQueue
from switchyard.sync_worker import _extract_blob_digests, sync_one
from switchyard.sync_worker import _extract_blob_digests, _extract_child_manifests, sync_one
from switchyard.upstream import UpstreamClient

BASE = "https://central:5000"


def _make_index(child_digests: list[str]) -> bytes:
"""Build a minimal OCI image index referencing child manifests."""
return json.dumps({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.index.v1+json",
"manifests": [
{
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"digest": d,
"size": 100,
"platform": {"architecture": "amd64", "os": "linux"},
}
for d in child_digests
],
}).encode()


def _make_manifest(layer_digests: list[str], config_digest: str = "") -> bytes:
manifest: dict[str, object] = {"schemaVersion": 2}
if config_digest:
Expand Down Expand Up @@ -146,3 +163,83 @@ async def test_sync_one_missing_manifest(tmp_path: Path) -> None:
# Marker should still be cleared (manifest gone, nothing to sync)
remaining = await queue.list_pending()
assert len(remaining) == 0


def test_extract_child_manifests_from_index() -> None:
child1 = "sha256:aaaa"
child2 = "sha256:bbbb"
body = _make_index([child1, child2])
assert _extract_child_manifests(body) == [child1, child2]


def test_extract_child_manifests_from_regular_manifest() -> None:
body = _make_manifest(["sha256:layer1"], "sha256:config1")
assert _extract_child_manifests(body) == []


def test_extract_child_manifests_invalid_json() -> None:
assert _extract_child_manifests(b"not json") == []


@respx.mock
async def test_sync_one_pushes_child_manifests_before_index(tmp_path: Path) -> None:
"""Image index sync must push child manifests to upstream before the index itself."""
storage = Storage(str(tmp_path))
await storage.init()
queue = SyncQueue(str(tmp_path))
await queue.init()

# Store a blob referenced by the child manifest
blob_data = b"layer content"
blob_digest = f"sha256:{hashlib.sha256(blob_data).hexdigest()}"
upload_id = await storage.create_upload()
await storage.append_upload(upload_id, blob_data)
await storage.store_blob_from_upload(upload_id, blob_digest)

# Store a child platform manifest
child_body = _make_manifest([blob_digest])
child_ct = "application/vnd.oci.image.manifest.v1+json"
child_digest = f"sha256:{hashlib.sha256(child_body).hexdigest()}"
await storage.store_manifest("myapp", child_digest, child_body, child_ct)

# Store an image index referencing the child
index_body = _make_index([child_digest])
index_ct = "application/vnd.oci.image.index.v1+json"
await storage.store_manifest("myapp", "latest", index_body, index_ct)

await queue.enqueue("myapp", "latest")
pending = await queue.list_pending()

# Mock upstream
respx.head(f"{BASE}/v2/myapp/blobs/{blob_digest}").mock(
return_value=Response(404)
)
respx.post(f"{BASE}/v2/myapp/blobs/uploads/").mock(
return_value=Response(202, headers={"Location": f"{BASE}/v2/myapp/blobs/uploads/u1"})
)
respx.put(f"{BASE}/v2/myapp/blobs/uploads/u1").mock(
return_value=Response(201)
)
manifest_puts: list[str] = []

def _record_manifest_put(request: respx.MockRequest) -> Response:
# Extract the reference from the URL path
ref = request.url.path.rsplit("/", 1)[-1]
manifest_puts.append(ref)
return Response(201)

respx.put(url__regex=rf"{BASE}/v2/myapp/manifests/.+").mock(
side_effect=_record_manifest_put
)

upstream = UpstreamClient(BASE)
await sync_one(pending[0], storage, queue, upstream)
await upstream.close()

# Child manifest must be pushed before the index
assert len(manifest_puts) == 2
assert manifest_puts[0] == child_digest
assert manifest_puts[1] == "latest"

remaining = await queue.list_pending()
assert len(remaining) == 0
Loading