From fc4d8dd091c3d606a1009bb56b59b99f7cf18c11 Mon Sep 17 00:00:00 2001 From: John Erickson Date: Wed, 24 Jan 2024 14:39:44 -0800 Subject: [PATCH 1/2] WIP --- .../PipelineCachingCacheClient.cs | 160 ++++++++++++++---- 1 file changed, 128 insertions(+), 32 deletions(-) diff --git a/src/AzurePipelines/PipelineCachingCacheClient.cs b/src/AzurePipelines/PipelineCachingCacheClient.cs index afe816c..5d401d9 100644 --- a/src/AzurePipelines/PipelineCachingCacheClient.cs +++ b/src/AzurePipelines/PipelineCachingCacheClient.cs @@ -26,6 +26,7 @@ using Microsoft.VisualStudio.Services.BlobStore.Common.Telemetry; using Microsoft.VisualStudio.Services.BlobStore.WebApi; using Microsoft.VisualStudio.Services.BlobStore.WebApi.Cache; +using Microsoft.VisualStudio.Services.CircuitBreaker; using Microsoft.VisualStudio.Services.Common; using Microsoft.VisualStudio.Services.Content.Common; using Microsoft.VisualStudio.Services.Content.Common.Tracing; @@ -86,6 +87,8 @@ internal sealed class PipelineCachingCacheClient : CacheClient private readonly DedupStoreClientWithDataport _dedupClient; private readonly DedupManifestArtifactClient _manifestClient; private readonly Task _startupTask; + private readonly KeepUntilBlobReference _keepUntil = new KeepUntilBlobReference(DateTimeOffset.Now.AddHours(4)); + private readonly HashType _hashType; public PipelineCachingCacheClient( Context rootContext, @@ -153,6 +156,7 @@ string s when int.TryParse(s, out int i) => i, _dedupHttpClient.SetRedirectTimeout(timeoutSeconds); // https://dev.azure.com/mseng/1ES/_workitems/edit/2060777 + _hashType = hasher.Info.HashType; if (hasher.Info.HashType == HashType.Dedup1024K) { _dedupHttpClient.RecommendedChunkCountPerCall = 8; @@ -219,64 +223,153 @@ protected override async Task AddNodeAsync( // If we are async publishing, then we need to grab content from the L1 and remap it. // If we are sync publishing, then we can point directly to it. - FileInfo[] infos; + PublishResult publishResult; if (EnableAsyncPublishing) { - infos = Array.Empty(); + // map the hash types + Dictionary dedupToHash = outputs.Values.ToDictionaryFirstKeyWins( + hash => hash.ToBlobIdentifier().ToDedupIdentifier(), + hash => hash); + + // open a stream to get the length of all content + Dictionary dedupToSize = new(); + foreach (ContentHash hash in dedupToHash.Values) + { + StreamWithLength? streamWithLength = await LocalCacheSession + .OpenStreamAsync(context, hash, cancellationToken) + .ThrowIfFailureAsync(r => r.StreamWithLength)!; + DedupIdentifier dedupId = hash.ToBlobIdentifier().ToDedupIdentifier(); + dedupToSize.Add(dedupId, streamWithLength.Value.Length); + dedupToHash.Add(dedupId, hash); + } - // 2. Link out unique content to the temp folder + // create the manifest and add extras to local cache + Manifest manifest; + { + var items = new List(outputs.Count + extras.Count); - Dictionary tempFilesPerHash = outputs.Values.Distinct().ToDictionary( - hash => hash, - hash => + // put extras in local cache to simplify the code below + foreach (KeyValuePair extra in extras) { - string tempFilePath = Path.Combine(TempFolder, Guid.NewGuid().ToString("N") + ".tmp"); - tempFilePaths.Add(tempFilePath); - return tempFilePath; - }); + DedupNode node = await ChunkerHelper.CreateFromFileAsync(FileSystem.Instance, extra.Key, cancellationToken, configureAwait: false); + DedupIdentifier dedupId = node.GetDedupIdentifier(); + dedupToSize[dedupId] = extra.Value.Length; + dedupToHash[dedupId] = node.ToContentHash(_hashType); + await LocalCacheSession.PutFileAsync(context, node.ToContentHash(_hashType), new AbsolutePath(extra.Value.FullName), FileRealizationMode.Any, cancellationToken); + items.Add(new ManifestItem(extra.Key, new DedupInfo(dedupId.ValueString, node.TransitiveContentBytes))); + } - List tempFiles = tempFilesPerHash - .Select(kvp => new ContentHashWithPath(kvp.Key, new AbsolutePath(kvp.Value))) - .ToList(); + foreach (KeyValuePair output in outputs) + { + string relativePath = output.Key.MakePathRelativeTo(RepoRoot)!.Replace("\\", "/", StringComparison.Ordinal); + DedupIdentifier dedupId = output.Value.ToBlobIdentifier().ToDedupIdentifier(); + items.Add(new ManifestItem( + relativePath, + new DedupInfo(dedupId.ValueString, (ulong)dedupToSize[dedupId]))); + } + items.Sort((i1, i2) => StringComparer.Ordinal.Compare(i1.Path, i2.Path)); - Dictionary placeResults = await TryPlaceFilesFromCacheAsync(context, tempFiles, cancellationToken); - foreach (PlaceFileResult placeResult in placeResults.Values) - { - placeResult.ThrowIfFailure(); + manifest = new Manifest(items); } - // 3. map all the relative paths to the temp files - foreach (KeyValuePair output in outputs) + // Store the manifest in local cache to simplify the code below + using MemoryStream manifestStream = new(JsonSerializer.Serialize(manifest).GetUTF8Bytes()); + PutResult manifestResult = await LocalCacheSession.PutStreamAsync(context, _hashType, manifestStream, cancellationToken); + manifestResult.ThrowIfFailure(); + ContentHash manifestHash = manifestResult.ContentHash; + DedupIdentifier manifestId = manifestHash.ToBlobIdentifier().ToDedupIdentifier(); + dedupToSize[manifestId] = manifestStream.Length; + dedupToHash[manifestId] = manifestHash; + + // now that we have the hashes and sizes, we can efficiently ask the service what it already has + IDedupUploadSession uploadSession = _dedupClient.CreateUploadSession( + _keepUntil, + tracer: _azureDevopsTracer, + FileSystem.Instance); + + // upload whatever is needed (outputs, extras, and manifest) { - string relativePath = output.Key.MakePathRelativeTo(RepoRoot)!; - extras.Add(relativePath.Replace("\\", "/", StringComparison.Ordinal), new FileInfo(tempFilesPerHash[output.Value])); + Dictionary uploadCheckResults = + await uploadSession.CheckIfUploadIsNeededAsync(dedupToSize, cancellationToken); + + IEnumerable hashesToupload = uploadCheckResults + .Where(kvp => kvp.Value == CheckIfUploadNeededResult.UploadNeeded) + .Select(kvp => kvp.Key); + + // what it doesn't have, we'll need to materialize to upload + Dictionary tempFilesPerHash = hashesToupload.ToDictionary( + hash => dedupToHash[hash], + hash => + { + string tempFilePath = Path.Combine(TempFolder, Guid.NewGuid().ToString("N") + ".tmp"); + tempFilePaths.Add(tempFilePath); + return tempFilePath; + }); + + List tempFiles = tempFilesPerHash + .Select(kvp => new ContentHashWithPath(kvp.Key, new AbsolutePath(kvp.Value))) + .ToList(); + + Dictionary placeResults = await TryPlaceFilesFromCacheAsync(context, tempFiles, cancellationToken); + foreach (KeyValuePair placeResult in placeResults) + { + // Everything should already be in the L1 + placeResult.Value.ThrowIfFailure(); + } + + // upload the files in batches of DedupNode.MaxDirectChildrenPerNode == 512 + foreach (List> page in tempFilesPerHash.GetPages(DedupNode.MaxDirectChildrenPerNode)) + { + Dictionary paths = page.ToDictionary(kvp => kvp.Key.ToBlobIdentifier().ToDedupIdentifier(), kvp => kvp.Value); + var files = new List(page.Count); + foreach (KeyValuePair kvp in page) + { + // UploadAsync requires "filled" nodes. + // For single-chunk files, they are already filled as they have no children nodes. + // For multi-chunk files, we need to re-chunk them here as the LocalCAS + // only stores the hash of the top node and not the inner node tree that upload needs. + DedupIdentifier dedupId = kvp.Key.ToBlobIdentifier().ToDedupIdentifier(); + if (dedupId.AlgorithmId == ChunkDedupIdentifier.ChunkAlgorithmId) + { + files.Add(new DedupNode(new ChunkInfo(0, (uint)dedupToSize[dedupId], dedupId.AlgorithmResult))); + } + else + { + DedupNode node = await ChunkFileAsync(kvp.Value, cancellationToken); + files.Add(node); + } + } + var rootNode = new DedupNode(files); + await uploadSession.UploadAsync(rootNode, paths, cancellationToken); + } } + + publishResult } else { - infos = outputs.Keys.Select(f => new FileInfo(f)).ToArray(); + FileInfo[] infos = outputs.Keys.Select(f => new FileInfo(f)).ToArray(); + publishResult = await WithHttpRetries( + () => _manifestClient.PublishAsync(RepoRoot, infos, extras, new ArtifactPublishOptions(), manifestFileOutputPath: null, cancellationToken), + context: $"Publishing content for {fingerprint}", + cancellationToken); } - var result = await WithHttpRetries( - () => _manifestClient.PublishAsync(RepoRoot, infos, extras, new ArtifactPublishOptions(), manifestFileOutputPath: null, cancellationToken), - context: $"Publishing content for {fingerprint}", - cancellationToken); - // double check { - using var manifestStream = new MemoryStream(await GetBytes(context, result.ManifestId, cancellationToken)); + using var manifestStream = new MemoryStream(await GetBytes(context, publishResult.ManifestId, cancellationToken)); Manifest manifest = JsonSerializer.Deserialize(manifestStream)!; var manifestFiles = CreateNormalizedManifest(manifest); var outputFiles = CreateNormalizedManifest(outputs); - ThrowIfDifferent(manifestFiles, outputFiles, $"With {nameof(EnableAsyncPublishing)}:{EnableAsyncPublishing}, Manifest `{result.ManifestId}` and Outputs don't match:"); + ThrowIfDifferent(manifestFiles, outputFiles, $"With {nameof(EnableAsyncPublishing)}:{EnableAsyncPublishing}, Manifest `{publishResult.ManifestId}` and Outputs don't match:"); } var key = ComputeKey(fingerprint, forWrite: true); var entry = new CreatePipelineCacheArtifactContract( new VisualStudio.Services.PipelineCache.WebApi.Fingerprint(key.Split(KeySegmentSeperator)), - result.ManifestId, - result.RootId, - result.ProofNodes, + publishResult.ManifestId, + publishResult.RootId, + publishResult.ProofNodes, ContentFormatConstants.Files); CreateResult createResult = await WithHttpRetries( @@ -377,6 +470,9 @@ protected override async Task AddNodeAsync( } } + private static Task ChunkFileAsync(string path, CancellationToken cancellationToken) => + ChunkerHelper.CreateFromFileAsync(FileSystem.Instance, path, cancellationToken, configureAwait: false); + private static byte GetAlgorithmId(ContentHash hash) { switch (hash._hashType) From ff794acf6cf52eadab68de6feae8f181f4be1755 Mon Sep 17 00:00:00 2001 From: John Erickson Date: Thu, 25 Jan 2024 17:05:05 -0800 Subject: [PATCH 2/2] compiles --- .../PipelineCachingCacheClient.cs | 91 ++++++++++++------- 1 file changed, 57 insertions(+), 34 deletions(-) diff --git a/src/AzurePipelines/PipelineCachingCacheClient.cs b/src/AzurePipelines/PipelineCachingCacheClient.cs index 5d401d9..66f5b81 100644 --- a/src/AzurePipelines/PipelineCachingCacheClient.cs +++ b/src/AzurePipelines/PipelineCachingCacheClient.cs @@ -26,7 +26,6 @@ using Microsoft.VisualStudio.Services.BlobStore.Common.Telemetry; using Microsoft.VisualStudio.Services.BlobStore.WebApi; using Microsoft.VisualStudio.Services.BlobStore.WebApi.Cache; -using Microsoft.VisualStudio.Services.CircuitBreaker; using Microsoft.VisualStudio.Services.Common; using Microsoft.VisualStudio.Services.Content.Common; using Microsoft.VisualStudio.Services.Content.Common.Tracing; @@ -240,7 +239,6 @@ protected override async Task AddNodeAsync( .ThrowIfFailureAsync(r => r.StreamWithLength)!; DedupIdentifier dedupId = hash.ToBlobIdentifier().ToDedupIdentifier(); dedupToSize.Add(dedupId, streamWithLength.Value.Length); - dedupToHash.Add(dedupId, hash); } // create the manifest and add extras to local cache @@ -281,23 +279,26 @@ protected override async Task AddNodeAsync( dedupToSize[manifestId] = manifestStream.Length; dedupToHash[manifestId] = manifestHash; - // now that we have the hashes and sizes, we can efficiently ask the service what it already has + // now that we have everything in the L1, we can efficiently ask the service what it already has IDedupUploadSession uploadSession = _dedupClient.CreateUploadSession( _keepUntil, tracer: _azureDevopsTracer, FileSystem.Instance); - // upload whatever is needed (outputs, extras, and manifest) - { - Dictionary uploadCheckResults = - await uploadSession.CheckIfUploadIsNeededAsync(dedupToSize, cancellationToken); + // upload whatever (outputs, extras, and manifest) is needed + Dictionary uploadCheckResults = + await uploadSession.CheckIfUploadIsNeededAsync(dedupToSize, cancellationToken); - IEnumerable hashesToupload = uploadCheckResults - .Where(kvp => kvp.Value == CheckIfUploadNeededResult.UploadNeeded) - .Select(kvp => kvp.Key); + IEnumerable hashesToupload = uploadCheckResults + .Where(kvp => kvp.Value == CheckIfUploadNeededResult.UploadNeeded) + .Select(kvp => kvp.Key); - // what it doesn't have, we'll need to materialize to upload - Dictionary tempFilesPerHash = hashesToupload.ToDictionary( + var pageRoots = new List(); + // upload the files in batches of DedupNode.MaxDirectChildrenPerNode == 512 + foreach (List hashPage in hashesToupload.GetPages(DedupNode.MaxDirectChildrenPerNode)) + { + // we'll need to materialize to upload because the cache won't give us its path to the content + Dictionary tempFilesPerHash = hashPage.ToDictionary( hash => dedupToHash[hash], hash => { @@ -306,10 +307,12 @@ protected override async Task AddNodeAsync( return tempFilePath; }); + // munge to a different format List tempFiles = tempFilesPerHash .Select(kvp => new ContentHashWithPath(kvp.Key, new AbsolutePath(kvp.Value))) .ToList(); + // materialize the files Dictionary placeResults = await TryPlaceFilesFromCacheAsync(context, tempFiles, cancellationToken); foreach (KeyValuePair placeResult in placeResults) { @@ -317,34 +320,54 @@ protected override async Task AddNodeAsync( placeResult.Value.ThrowIfFailure(); } - // upload the files in batches of DedupNode.MaxDirectChildrenPerNode == 512 - foreach (List> page in tempFilesPerHash.GetPages(DedupNode.MaxDirectChildrenPerNode)) + // compute the merkle tree + Dictionary paths = tempFilesPerHash.ToDictionary(kvp => kvp.Key.ToBlobIdentifier().ToDedupIdentifier(), kvp => kvp.Value); + var files = new List(tempFilesPerHash.Count); + foreach (KeyValuePair kvp in tempFilesPerHash) { - Dictionary paths = page.ToDictionary(kvp => kvp.Key.ToBlobIdentifier().ToDedupIdentifier(), kvp => kvp.Value); - var files = new List(page.Count); - foreach (KeyValuePair kvp in page) + // UploadAsync requires "filled" nodes. + // For single-chunk files, they are already filled as they have no children nodes. + // For multi-chunk files, we need to re-chunk them here as the LocalCAS + // only stores the hash of the top node and not the inner node tree that upload needs. + DedupIdentifier dedupId = kvp.Key.ToBlobIdentifier().ToDedupIdentifier(); + if (dedupId.AlgorithmId == ChunkDedupIdentifier.ChunkAlgorithmId) + { + files.Add(new DedupNode(new ChunkInfo(0, (uint)dedupToSize[dedupId], dedupId.AlgorithmResult))); + } + else { - // UploadAsync requires "filled" nodes. - // For single-chunk files, they are already filled as they have no children nodes. - // For multi-chunk files, we need to re-chunk them here as the LocalCAS - // only stores the hash of the top node and not the inner node tree that upload needs. - DedupIdentifier dedupId = kvp.Key.ToBlobIdentifier().ToDedupIdentifier(); - if (dedupId.AlgorithmId == ChunkDedupIdentifier.ChunkAlgorithmId) - { - files.Add(new DedupNode(new ChunkInfo(0, (uint)dedupToSize[dedupId], dedupId.AlgorithmResult))); - } - else - { - DedupNode node = await ChunkFileAsync(kvp.Value, cancellationToken); - files.Add(node); - } + DedupNode node = await ChunkFileAsync(kvp.Value, cancellationToken); + files.Add(node); } - var rootNode = new DedupNode(files); - await uploadSession.UploadAsync(rootNode, paths, cancellationToken); } + + // create the root node and upload + var pageRootNode = new DedupNode(files); + await uploadSession.UploadAsync(pageRootNode, paths, cancellationToken); + } + + + while (pageRoots.Count > 1) + { + var newPageRoots = new List(); + foreach (List page in pageRoots.GetPages(DedupNode.MaxDirectChildrenPerNode)) + { + var pageRootNode = new DedupNode(page); + newPageRoots.Add(pageRootNode); + } + pageRoots = newPageRoots; } - publishResult + DedupNode root = pageRoots.Single(); + + HashSet proofNodes = ProofHelper.CreateProofNodes( + uploadSession.AllNodes, + uploadSession.ParentLookup, + dedupToSize.Keys); + + string[] proofNodesSerialized = proofNodes.Select(n => Convert.ToBase64String(n.Serialize())).ToArray(); + + publishResult = new PublishResult(manifestId, root.GetDedupIdentifier(), proofNodesSerialized, manifest.Items.Count, (long)root.TransitiveContentBytes); } else {