From be410c7bf80f58211360fc19929a81eca1161a0a Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 16:34:13 -0700 Subject: [PATCH 01/13] feat(storage): add rapid bucket template cache Add a feature-gated bucket-backed read-through cache for template artifacts as an alternative to the shared NFS cache. --- .../pkg/sandbox/template/cache.go | 35 ++- packages/shared/pkg/featureflags/flags.go | 1 + packages/shared/pkg/storage/storage.go | 9 + .../shared/pkg/storage/storage_cache_rapid.go | 246 ++++++++++++++++++ packages/shared/pkg/storage/storage_google.go | 17 ++ 5 files changed, 305 insertions(+), 3 deletions(-) create mode 100644 packages/shared/pkg/storage/storage_cache_rapid.go diff --git a/packages/orchestrator/pkg/sandbox/template/cache.go b/packages/orchestrator/pkg/sandbox/template/cache.go index 2296542fb1..bbbead0770 100644 --- a/packages/orchestrator/pkg/sandbox/template/cache.go +++ b/packages/orchestrator/pkg/sandbox/template/cache.go @@ -53,6 +53,8 @@ type Cache struct { flags *featureflags.Client cache *ttlcache.Cache[string, Template] persistence storage.StorageProvider + rapidCache storage.StorageProvider + rapidCacheMu sync.Mutex buildStore *build.DiffStore blockMetrics blockmetrics.Metrics rootCachePath string @@ -169,9 +171,10 @@ func (c *Cache) GetTemplate( defer span.End() persistence := c.persistence - // Because of the template caching, if we enable the NFS cache feature flag, - // it will start working only for new orchestrators or new builds. - if path, enabled := c.useNFSCache(ctx, isBuilding, isSnapshot); enabled { + if cache, enabled := c.useRapidBucketCache(ctx, isBuilding); enabled { + persistence = storage.WrapInRapidBucketCache(ctx, cache, persistence) + span.SetAttributes(attribute.Bool("use_cache", true)) + } else if path, enabled := c.useNFSCache(ctx, isBuilding, isSnapshot); enabled { logger.L().Info(ctx, "using local template cache", zap.String("path", c.rootCachePath)) persistence = storage.WrapInNFSCache(ctx, path, persistence, c.flags) span.SetAttributes(attribute.Bool("use_cache", true)) @@ -305,6 +308,32 @@ func (c *Cache) useNFSCache(ctx context.Context, isBuilding bool, isSnapshot boo return c.rootCachePath, useNFSCache } +func (c *Cache) useRapidBucketCache(ctx context.Context, isBuilding bool) (storage.StorageProvider, bool) { + if isBuilding || !c.flags.BoolFlag(ctx, featureflags.RapidBucketCacheFlag) { + return nil, false + } + c.rapidCacheMu.Lock() + defer c.rapidCacheMu.Unlock() + if c.rapidCache != nil { + return c.rapidCache, true + } + bucketName := storage.RapidBucketCacheStorageConfig.GetBucketName() + if bucketName == "" { + logger.L().Warn(ctx, "rapid bucket cache feature flag is enabled but bucket is not set") + + return nil, false + } + p, err := storage.NewGCPRapid(ctx, bucketName, nil) + if err != nil { + logger.L().Warn(ctx, "failed to initialize rapid bucket cache", zap.Error(err)) + + return nil, false + } + c.rapidCache = p + + return c.rapidCache, true +} + func cleanDir(path string) error { entries, err := os.ReadDir(path) if err != nil { diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index 28b6f8e370..d60d3305ec 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -120,6 +120,7 @@ var ( MetricsReadFlag = NewBoolFlag("sandbox-metrics-read", true) SnapshotFeatureFlag = NewBoolFlag("use-nfs-for-snapshots", env.IsDevelopment()) TemplateFeatureFlag = NewBoolFlag("use-nfs-for-templates", env.IsDevelopment()) + RapidBucketCacheFlag = NewBoolFlag("use-rapid-bucket-cache", false) EnableWriteThroughCacheFlag = NewBoolFlag("write-to-cache-on-writes", false) UseNFSCacheForBuildingTemplatesFlag = NewBoolFlag("use-nfs-for-building-templates", env.IsDevelopment()) BestOfKCanFitFlag = NewBoolFlag("best-of-k-can-fit", true) diff --git a/packages/shared/pkg/storage/storage.go b/packages/shared/pkg/storage/storage.go index 668ca49fb4..6a06bca44a 100644 --- a/packages/shared/pkg/storage/storage.go +++ b/packages/shared/pkg/storage/storage.go @@ -243,6 +243,15 @@ var BuildCacheStorageConfig = StorageConfig{ }, } +var RapidBucketCacheStorageConfig = StorageConfig{ + GetLocalBasePath: func() string { + return "" + }, + GetBucketName: func() string { + return env.GetEnv("RAPID_BUCKET_CACHE_BUCKET_NAME", "") + }, +} + func GetStorageProvider(ctx context.Context, cfg StorageConfig) (StorageProvider, error) { provider := GetProviderType() diff --git a/packages/shared/pkg/storage/storage_cache_rapid.go b/packages/shared/pkg/storage/storage_cache_rapid.go new file mode 100644 index 0000000000..f5ce4d04e5 --- /dev/null +++ b/packages/shared/pkg/storage/storage_cache_rapid.go @@ -0,0 +1,246 @@ +package storage + +import ( + "bytes" + "context" + "fmt" + "io" + "sync" + "time" +) + +type rapidCacheProvider struct { + cache StorageProvider + inner StorageProvider +} + +func WrapInRapidBucketCache(_ context.Context, cache StorageProvider, inner StorageProvider) StorageProvider { + return &rapidCacheProvider{ + cache: cache, + inner: inner, + } +} + +func (p *rapidCacheProvider) DeleteObjectsWithPrefix(ctx context.Context, prefix string) error { + return p.inner.DeleteObjectsWithPrefix(ctx, prefix) +} + +func (p *rapidCacheProvider) UploadSignedURL(ctx context.Context, path string, ttl time.Duration) (string, error) { + return p.inner.UploadSignedURL(ctx, path, ttl) +} + +func (p *rapidCacheProvider) OpenBlob(ctx context.Context, path string, objectType ObjectType) (Blob, error) { + return p.inner.OpenBlob(ctx, path, objectType) +} + +func (p *rapidCacheProvider) OpenSeekable(ctx context.Context, path string, objectType SeekableObjectType) (Seekable, error) { + inner, err := p.inner.OpenSeekable(ctx, path, objectType) + if err != nil { + return nil, err + } + + return &rapidCachedSeekable{ + path: "rapid-cache/" + path, + cache: p.cache, + inner: inner, + }, nil +} + +func (p *rapidCacheProvider) GetDetails() string { + return fmt.Sprintf("[Rapid bucket cache, inner=%s]", p.inner.GetDetails()) +} + +type rapidCachedSeekable struct { + path string + cache StorageProvider + inner Seekable + + wg sync.WaitGroup +} + +func (c *rapidCachedSeekable) OpenRangeReader(ctx context.Context, off int64, length int64, frameTable *FrameTable) (io.ReadCloser, error) { + if frameTable != nil && frameTable.IsCompressed() { + return c.openCompressed(ctx, off, frameTable) + } + + cachePath := fmt.Sprintf("%s/%012d-%d.bin", c.path, off/MemoryChunkSize, length) + if rc, err := c.openCache(ctx, cachePath, length); err == nil { + return rc, nil + } + + rc, err := c.inner.OpenRangeReader(ctx, off, length, nil) + if err != nil { + return nil, err + } + + if !skipCacheWriteback(ctx) { + return newRapidCacheWriteThroughReader(rc, c, ctx, cachePath, length), nil + } + + return rc, nil +} + +func (c *rapidCachedSeekable) openCompressed(ctx context.Context, off int64, frameTable *FrameTable) (io.ReadCloser, error) { + r, err := frameTable.LocateCompressed(off) + if err != nil { + return nil, fmt.Errorf("frame lookup for offset %d: %w", off, err) + } + + cachePath := makeFrameFilename(c.path, r) + if raw, err := c.openCache(ctx, cachePath, int64(r.Length)); err == nil { + dec, err := newDecompressingReadCloser(raw, frameTable.CompressionType()) + if err != nil { + raw.Close() + + return nil, err + } + + return dec, nil + } + + raw, err := c.inner.OpenRangeReader(ctx, r.Offset, int64(r.Length), nil) + if err != nil { + return nil, err + } + + dec, err := newRapidDecompressingCacheReader(raw, frameTable.CompressionType(), c, ctx, cachePath, r.Length) + if err != nil { + raw.Close() + + return nil, err + } + + return dec, nil +} + +func (c *rapidCachedSeekable) Size(ctx context.Context) (int64, error) { + return c.inner.Size(ctx) +} + +func (c *rapidCachedSeekable) StoreFile(ctx context.Context, path string, opts ...PutOption) (*FrameTable, [32]byte, error) { + return c.inner.StoreFile(ctx, path, opts...) +} + +func (c *rapidCachedSeekable) openCache(ctx context.Context, path string, length int64) (io.ReadCloser, error) { + obj, err := c.cache.OpenSeekable(ctx, path, UnknownSeekableObjectType) + if err != nil { + return nil, err + } + + return obj.OpenRangeReader(ctx, 0, length, nil) +} + +func (c *rapidCachedSeekable) writeCache(ctx context.Context, path string, data []byte) error { + blob, err := c.cache.OpenBlob(ctx, path, UnknownObjectType) + if err != nil { + return err + } + + return blob.Put(ctx, data) +} + +func (c *rapidCachedSeekable) goCtx(ctx context.Context, fn func(context.Context)) { + c.wg.Go(func() { + fn(context.WithoutCancel(ctx)) + }) +} + +type rapidCacheWriteThroughReader struct { + inner io.ReadCloser + buf *bytes.Buffer + cache *rapidCachedSeekable + ctx context.Context //nolint:containedctx // needed for async cache write-back in Close + path string + expectedLen int64 +} + +func newRapidCacheWriteThroughReader(inner io.ReadCloser, cache *rapidCachedSeekable, ctx context.Context, path string, expectedLen int64) io.ReadCloser { + return &rapidCacheWriteThroughReader{ + inner: inner, + buf: bytes.NewBuffer(make([]byte, 0, expectedLen)), + cache: cache, + ctx: ctx, + path: path, + expectedLen: expectedLen, + } +} + +func (r *rapidCacheWriteThroughReader) Read(p []byte) (int, error) { + n, err := r.inner.Read(p) + if n > 0 { + r.buf.Write(p[:n]) + } + + return n, err +} + +func (r *rapidCacheWriteThroughReader) Close() error { + closeErr := r.inner.Close() + if isCompleteRead(r.buf.Len(), int(r.expectedLen), nil) { + data := make([]byte, r.buf.Len()) + copy(data, r.buf.Bytes()) + r.cache.goCtx(r.ctx, func(ctx context.Context) { + _ = r.cache.writeCache(ctx, r.path, data) + }) + } + + return closeErr +} + +type rapidDecompressingCacheReader struct { + decompressor io.ReadCloser + raw io.ReadCloser + compressedBuf *bytes.Buffer + cache *rapidCachedSeekable + ctx context.Context //nolint:containedctx // needed for async cache write-back in Close + path string + expectedSize int +} + +func newRapidDecompressingCacheReader(raw io.ReadCloser, ct CompressionType, cache *rapidCachedSeekable, ctx context.Context, path string, expectedSize int) (io.ReadCloser, error) { + var compressedBuf bytes.Buffer + compressedBuf.Grow(expectedSize) + + dec, err := NewDecompressingReader(io.TeeReader(raw, &compressedBuf), ct) + if err != nil { + return nil, err + } + + return &rapidDecompressingCacheReader{ + decompressor: dec, + raw: raw, + compressedBuf: &compressedBuf, + cache: cache, + ctx: ctx, + path: path, + expectedSize: expectedSize, + }, nil +} + +func (r *rapidDecompressingCacheReader) Read(p []byte) (int, error) { + return r.decompressor.Read(p) +} + +func (r *rapidDecompressingCacheReader) Close() error { + _, _ = io.Copy(io.Discard, r.decompressor) + + decErr := r.decompressor.Close() + rawErr := r.raw.Close() + if decErr != nil { + return decErr + } + if rawErr != nil { + return rawErr + } + if skipCacheWriteback(r.ctx) || !isCompleteRead(r.compressedBuf.Len(), r.expectedSize, nil) { + return nil + } + + data := make([]byte, r.compressedBuf.Len()) + copy(data, r.compressedBuf.Bytes()) + r.cache.goCtx(r.ctx, func(ctx context.Context) { + _ = r.cache.writeCache(ctx, r.path, data) + }) + + return nil +} diff --git a/packages/shared/pkg/storage/storage_google.go b/packages/shared/pkg/storage/storage_google.go index 0556cecb73..c040af0355 100644 --- a/packages/shared/pkg/storage/storage_google.go +++ b/packages/shared/pkg/storage/storage_google.go @@ -17,6 +17,7 @@ import ( "time" "cloud.google.com/go/storage" + "cloud.google.com/go/storage/experimental" "github.com/googleapis/gax-go/v2" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -83,6 +84,7 @@ type gcpStorage struct { bucket *storage.BucketHandle limiter *limit.Limiter + zonal bool } var _ StorageProvider = (*gcpStorage)(nil) @@ -102,6 +104,14 @@ var ( ) func NewGCP(ctx context.Context, bucketName string, limiter *limit.Limiter) (StorageProvider, error) { + return newGCP(ctx, bucketName, limiter, false) +} + +func NewGCPRapid(ctx context.Context, bucketName string, limiter *limit.Limiter) (StorageProvider, error) { + return newGCP(ctx, bucketName, limiter, true) +} + +func newGCP(ctx context.Context, bucketName string, limiter *limit.Limiter, zonal bool) (StorageProvider, error) { grpcPoolSize, err := env.GetEnvAsInt("GCS_GRPC_CONNECTION_POOL_SIZE", defaultGRPCConnectionPoolSize) if err != nil { return nil, fmt.Errorf("failed to parse GCS_GRPC_CONNECTION_POOL_SIZE: %w", err) @@ -113,6 +123,9 @@ func NewGCP(ctx context.Context, bucketName string, limiter *limit.Limiter) (Sto option.WithGRPCDialOption(grpc.WithInitialWindowSize(4 * megabyte)), internaloption.EnableDirectPath(defaultGCSEnableDirectPath), } + if zonal { + opts = append(opts, experimental.WithZonalBucketAPIs()) + } client, err := storage.NewGRPCClient(ctx, opts...) if err != nil { @@ -123,6 +136,7 @@ func NewGCP(ctx context.Context, bucketName string, limiter *limit.Limiter) (Sto client: client, bucket: client.Bucket(bucketName), limiter: limiter, + zonal: zonal, }, nil } @@ -325,6 +339,9 @@ func (o *gcpObject) Put(ctx context.Context, data []byte, opts ...PutOption) err timer := googleWriteTimerFactory.Begin(attribute.String(gcsOperationAttr, gcsOperationAttrWrite)) w := o.handle.NewWriter(ctx) + if o.storage.zonal { + w.FinalizeOnClose = true + } if putOpts := ApplyPutOptions(opts); len(putOpts.Metadata) > 0 { w.Metadata = putOpts.Metadata } From 79f733cabf816f5dda13bd4317791833312db85e Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 16:40:26 -0700 Subject: [PATCH 02/13] fix(storage): harden rapid cache hits Validate cached object sizes, clean cache prefixes with canonical deletes, and use plain goroutines for best-effort writeback. --- .../shared/pkg/storage/storage_cache_rapid.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/packages/shared/pkg/storage/storage_cache_rapid.go b/packages/shared/pkg/storage/storage_cache_rapid.go index f5ce4d04e5..4078863ba7 100644 --- a/packages/shared/pkg/storage/storage_cache_rapid.go +++ b/packages/shared/pkg/storage/storage_cache_rapid.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "io" - "sync" "time" ) @@ -22,6 +21,10 @@ func WrapInRapidBucketCache(_ context.Context, cache StorageProvider, inner Stor } func (p *rapidCacheProvider) DeleteObjectsWithPrefix(ctx context.Context, prefix string) error { + if err := p.cache.DeleteObjectsWithPrefix(ctx, "rapid-cache/"+prefix); err != nil { + return err + } + return p.inner.DeleteObjectsWithPrefix(ctx, prefix) } @@ -54,8 +57,6 @@ type rapidCachedSeekable struct { path string cache StorageProvider inner Seekable - - wg sync.WaitGroup } func (c *rapidCachedSeekable) OpenRangeReader(ctx context.Context, off int64, length int64, frameTable *FrameTable) (io.ReadCloser, error) { @@ -126,6 +127,13 @@ func (c *rapidCachedSeekable) openCache(ctx context.Context, path string, length if err != nil { return nil, err } + size, err := obj.Size(ctx) + if err != nil { + return nil, err + } + if size != length { + return nil, fmt.Errorf("rapid cache object %s size %d != expected %d", path, size, length) + } return obj.OpenRangeReader(ctx, 0, length, nil) } @@ -140,9 +148,9 @@ func (c *rapidCachedSeekable) writeCache(ctx context.Context, path string, data } func (c *rapidCachedSeekable) goCtx(ctx context.Context, fn func(context.Context)) { - c.wg.Go(func() { + go func() { fn(context.WithoutCancel(ctx)) - }) + }() } type rapidCacheWriteThroughReader struct { From de1bc4d5c3bcc52cb54a5cc20f624c22e783fdd8 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 19:45:49 -0700 Subject: [PATCH 03/13] fix(storage): keep rapid cache best effort Validate uncompressed rapid cache reads and avoid blocking canonical deletes on cache cleanup failures. --- .../shared/pkg/storage/storage_cache_rapid.go | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/packages/shared/pkg/storage/storage_cache_rapid.go b/packages/shared/pkg/storage/storage_cache_rapid.go index 4078863ba7..d9fb72e339 100644 --- a/packages/shared/pkg/storage/storage_cache_rapid.go +++ b/packages/shared/pkg/storage/storage_cache_rapid.go @@ -21,9 +21,7 @@ func WrapInRapidBucketCache(_ context.Context, cache StorageProvider, inner Stor } func (p *rapidCacheProvider) DeleteObjectsWithPrefix(ctx context.Context, prefix string) error { - if err := p.cache.DeleteObjectsWithPrefix(ctx, "rapid-cache/"+prefix); err != nil { - return err - } + _ = p.cache.DeleteObjectsWithPrefix(ctx, "rapid-cache/"+prefix) return p.inner.DeleteObjectsWithPrefix(ctx, prefix) } @@ -63,6 +61,9 @@ func (c *rapidCachedSeekable) OpenRangeReader(ctx context.Context, off int64, le if frameTable != nil && frameTable.IsCompressed() { return c.openCompressed(ctx, off, frameTable) } + if err := validateRapidReadParams(length, off); err != nil { + return nil, err + } cachePath := fmt.Sprintf("%s/%012d-%d.bin", c.path, off/MemoryChunkSize, length) if rc, err := c.openCache(ctx, cachePath, length); err == nil { @@ -122,6 +123,20 @@ func (c *rapidCachedSeekable) StoreFile(ctx context.Context, path string, opts . return c.inner.StoreFile(ctx, path, opts...) } +func validateRapidReadParams(length, off int64) error { + if length == 0 { + return ErrBufferTooSmall + } + if length > MemoryChunkSize { + return ErrBufferTooLarge + } + if off%MemoryChunkSize != 0 { + return ErrOffsetUnaligned + } + + return nil +} + func (c *rapidCachedSeekable) openCache(ctx context.Context, path string, length int64) (io.ReadCloser, error) { obj, err := c.cache.OpenSeekable(ctx, path, UnknownSeekableObjectType) if err != nil { From 7418f8192d3541991a4bd06d67246efbd2953750 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 20:15:52 -0700 Subject: [PATCH 04/13] fix(storage): add rapid cache cleanup job Add a disabled-by-default periodic GCS cleaner for rapid-cache objects so the cache can be bounded without relying on bucket lifecycle rules. --- Makefile | 4 + iac/provider-gcp/main.tf | 11 ++- .../nomad/jobs/clean-rapid-cache.hcl | 45 +++++++++ iac/provider-gcp/nomad/main.tf | 22 ++++- iac/provider-gcp/nomad/variables.tf | 20 ++++ iac/provider-gcp/variables.tf | 20 ++++ packages/orchestrator/Dockerfile | 1 + packages/orchestrator/Makefile | 13 +++ .../cmd/clean-rapid-cache/main.go | 98 +++++++++++++++++++ 9 files changed, 231 insertions(+), 3 deletions(-) create mode 100644 iac/provider-gcp/nomad/jobs/clean-rapid-cache.hcl create mode 100644 packages/orchestrator/cmd/clean-rapid-cache/main.go diff --git a/Makefile b/Makefile index 267c03fc0b..c6a0a43360 100644 --- a/Makefile +++ b/Makefile @@ -82,6 +82,7 @@ build-and-upload:build-and-upload/client-proxy build-and-upload:build-and-upload/dashboard-api build-and-upload:build-and-upload/docker-reverse-proxy build-and-upload:build-and-upload/clean-nfs-cache +build-and-upload:build-and-upload/clean-rapid-cache build-and-upload:build-and-upload/orchestrator build-and-upload:build-and-upload/template-manager build-and-upload:build-and-upload/envd @@ -90,6 +91,9 @@ build-and-upload:build-and-upload/nomad-nodepool-apm build-and-upload/clean-nfs-cache: ./scripts/confirm.sh $(TERRAFORM_ENVIRONMENT) GCP_PROJECT_ID=$(GCP_PROJECT_ID) $(MAKE) -C packages/orchestrator build-and-upload/clean-nfs-cache +build-and-upload/clean-rapid-cache: + ./scripts/confirm.sh $(TERRAFORM_ENVIRONMENT) + GCP_PROJECT_ID=$(GCP_PROJECT_ID) $(MAKE) -C packages/orchestrator build-and-upload/clean-rapid-cache build-and-upload/template-manager: ./scripts/confirm.sh $(TERRAFORM_ENVIRONMENT) GCP_PROJECT_ID=$(GCP_PROJECT_ID) $(MAKE) -C packages/orchestrator build-and-upload/template-manager diff --git a/iac/provider-gcp/main.tf b/iac/provider-gcp/main.tf index 747fa95db3..277b182dc5 100644 --- a/iac/provider-gcp/main.tf +++ b/iac/provider-gcp/main.tf @@ -315,8 +315,11 @@ module "nomad" { envd_timeout = var.envd_timeout persistent_volume_mounts = { for key, config in local.persistent_volume_types : key => config["local_mount_path"] } default_persistent_volume_type = var.default_persistent_volume_type - orchestrator_env_vars = var.orchestrator_env_vars - orchestrator_enabled = var.orchestrator_enabled + orchestrator_env_vars = merge( + var.orchestrator_env_vars, + var.rapid_bucket_cache_bucket_name != "" ? { RAPID_BUCKET_CACHE_BUCKET_NAME = var.rapid_bucket_cache_bucket_name } : {}, + ) + orchestrator_enabled = var.orchestrator_enabled # Template manager builder_node_pool = var.build_node_pool @@ -334,6 +337,10 @@ module "nomad" { # Filestore shared_chunk_cache_path = module.cluster.shared_chunk_cache_path + rapid_bucket_cache_bucket_name = var.rapid_bucket_cache_bucket_name + rapid_bucket_cache_cleanup_dry_run = var.rapid_bucket_cache_cleanup_dry_run + rapid_bucket_cache_cleanup_max_age = var.rapid_bucket_cache_cleanup_max_age + rapid_bucket_cache_cleanup_max_deletions = var.rapid_bucket_cache_cleanup_max_deletions filestore_cache_cleanup_disk_usage_target = var.filestore_cache_cleanup_disk_usage_target filestore_cache_cleanup_dry_run = var.filestore_cache_cleanup_dry_run filestore_cache_cleanup_deletions_per_loop = var.filestore_cache_cleanup_deletions_per_loop diff --git a/iac/provider-gcp/nomad/jobs/clean-rapid-cache.hcl b/iac/provider-gcp/nomad/jobs/clean-rapid-cache.hcl new file mode 100644 index 0000000000..cea4ebdb16 --- /dev/null +++ b/iac/provider-gcp/nomad/jobs/clean-rapid-cache.hcl @@ -0,0 +1,45 @@ +job "rapid-cache-cleanup" { + type = "batch" + node_pool = "${node_pool}" + + periodic { + cron = "0 * * * *" + prohibit_overlap = true + time_zone = "America/Los_Angeles" + } + + group "rapid-cache-cleanup" { + restart { + attempts = 0 + mode = "fail" + } + + task "rapid-cache-cleanup" { + driver = "raw_exec" + + resources { + memory = 512 + } + + env { + RAPID_BUCKET_CACHE_BUCKET_NAME = "${bucket_name}" + } + + config { + command = "local/clean-rapid-cache" + args = [ + "--dry-run=${dry_run}", + "--max-age=${max_age}", + "--max-deletions=${max_deletions}", + "${bucket_name}", + ] + } + + artifact { + source = "${artifact_source}" + destination = "local/clean-rapid-cache" + mode = "file" + } + } + } +} diff --git a/iac/provider-gcp/nomad/main.tf b/iac/provider-gcp/nomad/main.tf index c73105f47c..de08e546df 100644 --- a/iac/provider-gcp/nomad/main.tf +++ b/iac/provider-gcp/nomad/main.tf @@ -673,8 +673,15 @@ data "google_storage_bucket_object" "filestore_cleanup" { bucket = var.fc_env_pipeline_bucket_name } +data "google_storage_bucket_object" "rapid_cache_cleanup" { + count = var.rapid_bucket_cache_bucket_name != "" ? 1 : 0 + name = "clean-rapid-cache" + bucket = var.fc_env_pipeline_bucket_name +} + locals { - clean_nfs_cache_artifact_source = "gcs::https://www.googleapis.com/storage/v1/${var.fc_env_pipeline_bucket_name}/clean-nfs-cache?version=${data.google_storage_bucket_object.filestore_cleanup.generation}" + clean_nfs_cache_artifact_source = "gcs::https://www.googleapis.com/storage/v1/${var.fc_env_pipeline_bucket_name}/clean-nfs-cache?version=${data.google_storage_bucket_object.filestore_cleanup.generation}" + clean_rapid_cache_artifact_source = var.rapid_bucket_cache_bucket_name != "" ? "gcs::https://www.googleapis.com/storage/v1/${var.fc_env_pipeline_bucket_name}/clean-rapid-cache?version=${data.google_storage_bucket_object.rapid_cache_cleanup[0].generation}" : "" } resource "nomad_job" "clean_nfs_cache" { @@ -696,3 +703,16 @@ resource "nomad_job" "clean_nfs_cache" { launch_darkly_api_key = trimspace(data.google_secret_manager_secret_version.launch_darkly_api_key.secret_data) }) } + +resource "nomad_job" "clean_rapid_cache" { + count = var.rapid_bucket_cache_bucket_name != "" ? 1 : 0 + + jobspec = templatefile("${path.module}/jobs/clean-rapid-cache.hcl", { + node_pool = var.builder_node_pool + artifact_source = local.clean_rapid_cache_artifact_source + bucket_name = var.rapid_bucket_cache_bucket_name + dry_run = var.rapid_bucket_cache_cleanup_dry_run + max_age = var.rapid_bucket_cache_cleanup_max_age + max_deletions = var.rapid_bucket_cache_cleanup_max_deletions + }) +} diff --git a/iac/provider-gcp/nomad/variables.tf b/iac/provider-gcp/nomad/variables.tf index a1924892ec..185582963a 100644 --- a/iac/provider-gcp/nomad/variables.tf +++ b/iac/provider-gcp/nomad/variables.tf @@ -454,6 +454,26 @@ variable "shared_chunk_cache_path" { default = "" } +variable "rapid_bucket_cache_bucket_name" { + type = string + default = "" +} + +variable "rapid_bucket_cache_cleanup_dry_run" { + type = bool + default = true +} + +variable "rapid_bucket_cache_cleanup_max_age" { + type = string + default = "168h" +} + +variable "rapid_bucket_cache_cleanup_max_deletions" { + type = number + default = 10000 +} + variable "filestore_cache_cleanup_disk_usage_target" { type = number description = "The disk usage target for the Filestore cache in percent" diff --git a/iac/provider-gcp/variables.tf b/iac/provider-gcp/variables.tf index 0245a48aa7..d71f255538 100644 --- a/iac/provider-gcp/variables.tf +++ b/iac/provider-gcp/variables.tf @@ -835,6 +835,26 @@ variable "anywhere_cache_ttl" { default = null } +variable "rapid_bucket_cache_bucket_name" { + type = string + default = "" +} + +variable "rapid_bucket_cache_cleanup_dry_run" { + type = bool + default = true +} + +variable "rapid_bucket_cache_cleanup_max_age" { + type = string + default = "168h" +} + +variable "rapid_bucket_cache_cleanup_max_deletions" { + type = number + default = 10000 +} + variable "orchestrator_env_vars" { type = map(string) default = {} diff --git a/packages/orchestrator/Dockerfile b/packages/orchestrator/Dockerfile index 5639a4d7e6..e7bcd90487 100644 --- a/packages/orchestrator/Dockerfile +++ b/packages/orchestrator/Dockerfile @@ -44,4 +44,5 @@ RUN --mount=type=cache,target=/root/.cache/go-build make build-local COMMIT_SHA= FROM scratch COPY --from=builder /build/orchestrator/bin/clean-nfs-cache . +COPY --from=builder /build/orchestrator/bin/clean-rapid-cache . COPY --from=builder /build/orchestrator/bin/orchestrator . diff --git a/packages/orchestrator/Makefile b/packages/orchestrator/Makefile index f3d3a1dbf2..583efbd51c 100644 --- a/packages/orchestrator/Makefile +++ b/packages/orchestrator/Makefile @@ -43,6 +43,7 @@ build-local: $(eval COMMIT_SHA ?= $(shell git rev-parse --short HEAD)) CGO_ENABLED=1 GOOS=linux GOARCH=$(BUILD_ARCH) go build -o bin/orchestrator -ldflags "-X=main.commitSHA=$(COMMIT_SHA)" . CGO_ENABLED=1 GOOS=linux GOARCH=$(BUILD_ARCH) go build -o bin/clean-nfs-cache -ldflags "-X=main.commitSHA=$(COMMIT_SHA)" ./cmd/clean-nfs-cache + CGO_ENABLED=0 GOOS=linux GOARCH=$(BUILD_ARCH) go build -o bin/clean-rapid-cache ./cmd/clean-rapid-cache .PHONY: build-debug build-debug: @@ -96,6 +97,15 @@ else gsutil -h "Cache-Control:no-cache, max-age=0" cp ./bin/clean-nfs-cache "gs://${GCP_BUCKET_PREFIX}fc-env-pipeline/clean-nfs-cache" endif +.PHONY: upload/clean-rapid-cache +upload/clean-rapid-cache: + chmod +x ./bin/clean-rapid-cache +ifeq ($(PROVIDER),aws) + @echo "clean-rapid-cache is GCP-only" +else + gsutil -h "Cache-Control:no-cache, max-age=0" cp ./bin/clean-rapid-cache "gs://${GCP_BUCKET_PREFIX}fc-env-pipeline/clean-rapid-cache" +endif + .PHONY: upload/orchestrator upload/orchestrator: chmod +x ./bin/orchestrator @@ -117,6 +127,9 @@ endif .PHONY: build-and-upload/clean-nfs-cache build-and-upload/clean-nfs-cache: build upload/clean-nfs-cache +.PHONY: build-and-upload/clean-rapid-cache +build-and-upload/clean-rapid-cache: build upload/clean-rapid-cache + .PHONY: build-and-upload/orchestrator build-and-upload/orchestrator: build upload/orchestrator diff --git a/packages/orchestrator/cmd/clean-rapid-cache/main.go b/packages/orchestrator/cmd/clean-rapid-cache/main.go new file mode 100644 index 0000000000..65a1302b88 --- /dev/null +++ b/packages/orchestrator/cmd/clean-rapid-cache/main.go @@ -0,0 +1,98 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "log" + "os" + "time" + + "cloud.google.com/go/storage" + "cloud.google.com/go/storage/experimental" + "google.golang.org/api/iterator" +) + +const defaultPrefix = "rapid-cache/" + +func main() { + var ( + prefix string + maxAge time.Duration + maxDeletions int + dryRun bool + ) + + flags := flag.NewFlagSet("clean-rapid-cache", flag.ExitOnError) + flags.StringVar(&prefix, "prefix", defaultPrefix, "cache object prefix") + flags.DurationVar(&maxAge, "max-age", 7*24*time.Hour, "delete objects older than this") + flags.IntVar(&maxDeletions, "max-deletions", 10000, "maximum objects to delete") + flags.BoolVar(&dryRun, "dry-run", true, "dry run") + if err := flags.Parse(os.Args[1:]); err != nil { + log.Fatal(err) + } + + bucket := os.Getenv("RAPID_BUCKET_CACHE_BUCKET_NAME") + if flags.NArg() > 0 { + bucket = flags.Arg(0) + } + if bucket == "" { + log.Fatal("missing bucket") + } + if prefix == "" { + log.Fatal("missing prefix") + } + if maxAge <= 0 { + log.Fatal("max-age must be positive") + } + if maxDeletions <= 0 { + log.Fatal("max-deletions must be positive") + } + + ctx := context.Background() + if err := clean(ctx, bucket, prefix, time.Now().Add(-maxAge), maxDeletions, dryRun); err != nil { + log.Fatal(err) + } +} + +func clean(ctx context.Context, bucket string, prefix string, cutoff time.Time, maxDeletions int, dryRun bool) error { + client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs()) + if err != nil { + return fmt.Errorf("create storage client: %w", err) + } + defer client.Close() + + var scanned, matched, deleted int + objects := client.Bucket(bucket).Objects(ctx, &storage.Query{Prefix: prefix}) + for { + attrs, err := objects.Next() + if errors.Is(err, iterator.Done) { + break + } + if err != nil { + return fmt.Errorf("list cache objects: %w", err) + } + + scanned++ + if !attrs.Updated.Before(cutoff) { + continue + } + matched++ + if deleted >= maxDeletions { + break + } + if dryRun { + deleted++ + continue + } + if err := client.Bucket(bucket).Object(attrs.Name).Delete(ctx); err != nil { + return fmt.Errorf("delete cache object: %w", err) + } + deleted++ + } + + log.Printf("summary dry_run=%t scanned=%d matched=%d deleted=%d", dryRun, scanned, matched, deleted) + + return nil +} From f379ea33a94c179070ce0967d9393c05e6c394a8 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 20:22:17 -0700 Subject: [PATCH 05/13] fix(storage): ignore rapid cleaner close error Make the rapid cache cleaner explicit about ignoring client close errors. --- packages/orchestrator/cmd/clean-rapid-cache/main.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/orchestrator/cmd/clean-rapid-cache/main.go b/packages/orchestrator/cmd/clean-rapid-cache/main.go index 65a1302b88..35a10f665a 100644 --- a/packages/orchestrator/cmd/clean-rapid-cache/main.go +++ b/packages/orchestrator/cmd/clean-rapid-cache/main.go @@ -61,7 +61,9 @@ func clean(ctx context.Context, bucket string, prefix string, cutoff time.Time, if err != nil { return fmt.Errorf("create storage client: %w", err) } - defer client.Close() + defer func() { + _ = client.Close() + }() var scanned, matched, deleted int objects := client.Bucket(bucket).Objects(ctx, &storage.Query{Prefix: prefix}) From ce6016433dfb69025b8358db32cf8c7dd471f0bb Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 20:26:00 -0700 Subject: [PATCH 06/13] fix(storage): format rapid cleaner loop Satisfy orchestrator lint formatting for the dry-run branch. --- packages/orchestrator/cmd/clean-rapid-cache/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/orchestrator/cmd/clean-rapid-cache/main.go b/packages/orchestrator/cmd/clean-rapid-cache/main.go index 35a10f665a..b545f18208 100644 --- a/packages/orchestrator/cmd/clean-rapid-cache/main.go +++ b/packages/orchestrator/cmd/clean-rapid-cache/main.go @@ -86,6 +86,7 @@ func clean(ctx context.Context, bucket string, prefix string, cutoff time.Time, } if dryRun { deleted++ + continue } if err := client.Bucket(bucket).Object(attrs.Name).Delete(ctx); err != nil { From 7d161c42668bc9f58076078016edb4c6ad123868 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 21:21:18 -0700 Subject: [PATCH 07/13] fix(storage): track rapid cache chunks in redis Record Rapid cache chunk recency asynchronously so cleanup can evict cold chunks while reads still use deterministic bucket paths. --- .../nomad/jobs/clean-rapid-cache.hcl | 3 + iac/provider-gcp/nomad/main.tf | 15 +- .../cmd/clean-rapid-cache/main.go | 78 ++++++++++- packages/orchestrator/pkg/factories/run.go | 1 + .../pkg/sandbox/template/cache.go | 10 +- .../shared/pkg/storage/storage_cache_rapid.go | 27 +++- .../pkg/storage/storage_cache_rapid_index.go | 129 ++++++++++++++++++ 7 files changed, 249 insertions(+), 14 deletions(-) create mode 100644 packages/shared/pkg/storage/storage_cache_rapid_index.go diff --git a/iac/provider-gcp/nomad/jobs/clean-rapid-cache.hcl b/iac/provider-gcp/nomad/jobs/clean-rapid-cache.hcl index cea4ebdb16..b94b454603 100644 --- a/iac/provider-gcp/nomad/jobs/clean-rapid-cache.hcl +++ b/iac/provider-gcp/nomad/jobs/clean-rapid-cache.hcl @@ -23,6 +23,9 @@ job "rapid-cache-cleanup" { env { RAPID_BUCKET_CACHE_BUCKET_NAME = "${bucket_name}" + REDIS_URL = "${redis_url}" + REDIS_CLUSTER_URL = "${redis_cluster_url}" + REDIS_TLS_CA_BASE64 = "${redis_tls_ca_base64}" } config { diff --git a/iac/provider-gcp/nomad/main.tf b/iac/provider-gcp/nomad/main.tf index de08e546df..cde84badaa 100644 --- a/iac/provider-gcp/nomad/main.tf +++ b/iac/provider-gcp/nomad/main.tf @@ -708,11 +708,14 @@ resource "nomad_job" "clean_rapid_cache" { count = var.rapid_bucket_cache_bucket_name != "" ? 1 : 0 jobspec = templatefile("${path.module}/jobs/clean-rapid-cache.hcl", { - node_pool = var.builder_node_pool - artifact_source = local.clean_rapid_cache_artifact_source - bucket_name = var.rapid_bucket_cache_bucket_name - dry_run = var.rapid_bucket_cache_cleanup_dry_run - max_age = var.rapid_bucket_cache_cleanup_max_age - max_deletions = var.rapid_bucket_cache_cleanup_max_deletions + node_pool = var.builder_node_pool + artifact_source = local.clean_rapid_cache_artifact_source + bucket_name = var.rapid_bucket_cache_bucket_name + dry_run = var.rapid_bucket_cache_cleanup_dry_run + max_age = var.rapid_bucket_cache_cleanup_max_age + max_deletions = var.rapid_bucket_cache_cleanup_max_deletions + redis_url = local.redis_url + redis_cluster_url = local.redis_cluster_url + redis_tls_ca_base64 = trimspace(data.google_secret_manager_secret_version.redis_tls_ca_base64.secret_data) }) } diff --git a/packages/orchestrator/cmd/clean-rapid-cache/main.go b/packages/orchestrator/cmd/clean-rapid-cache/main.go index b545f18208..bbeedbc5f8 100644 --- a/packages/orchestrator/cmd/clean-rapid-cache/main.go +++ b/packages/orchestrator/cmd/clean-rapid-cache/main.go @@ -9,9 +9,12 @@ import ( "os" "time" - "cloud.google.com/go/storage" + gcs "cloud.google.com/go/storage" "cloud.google.com/go/storage/experimental" "google.golang.org/api/iterator" + + "github.com/e2b-dev/infra/packages/shared/pkg/factories" + "github.com/e2b-dev/infra/packages/shared/pkg/storage" ) const defaultPrefix = "rapid-cache/" @@ -51,13 +54,16 @@ func main() { } ctx := context.Background() - if err := clean(ctx, bucket, prefix, time.Now().Add(-maxAge), maxDeletions, dryRun); err != nil { + index, closeIndex := newRapidIndex(ctx, bucket) + defer closeIndex() + + if err := clean(ctx, bucket, prefix, time.Now().Add(-maxAge), maxDeletions, dryRun, index); err != nil { log.Fatal(err) } } -func clean(ctx context.Context, bucket string, prefix string, cutoff time.Time, maxDeletions int, dryRun bool) error { - client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs()) +func clean(ctx context.Context, bucket string, prefix string, cutoff time.Time, maxDeletions int, dryRun bool, index storage.RapidCacheIndex) error { + client, err := gcs.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs()) if err != nil { return fmt.Errorf("create storage client: %w", err) } @@ -65,8 +71,55 @@ func clean(ctx context.Context, bucket string, prefix string, cutoff time.Time, _ = client.Close() }() + if deleted, err := cleanFromIndex(ctx, client, bucket, cutoff, maxDeletions, dryRun, index); err != nil { + return err + } else if deleted > 0 { + log.Printf("summary dry_run=%t deleted=%d source=redis", dryRun, deleted) + + return nil + } + + return cleanFromBucket(ctx, client, bucket, prefix, cutoff, maxDeletions, dryRun) +} + +func cleanFromIndex(ctx context.Context, client *gcs.Client, bucket string, cutoff time.Time, maxDeletions int, dryRun bool, index storage.RapidCacheIndex) (int, error) { + candidates, err := index.Candidates(ctx, cutoff, int64(maxDeletions)) + if err != nil || len(candidates) == 0 { + return 0, nil + } + + deleted := 0 + for _, path := range candidates { + obj := client.Bucket(bucket).Object(path) + attrs, err := obj.Attrs(ctx) + if errors.Is(err, gcs.ErrObjectNotExist) { + if !dryRun { + _ = index.Evict(ctx, path, 0) + } + + continue + } + if err != nil { + return deleted, fmt.Errorf("read cache object metadata: %w", err) + } + if dryRun { + deleted++ + + continue + } + if err := obj.Delete(ctx); err != nil { + return deleted, fmt.Errorf("delete cache object: %w", err) + } + _ = index.Evict(ctx, path, attrs.Size) + deleted++ + } + + return deleted, nil +} + +func cleanFromBucket(ctx context.Context, client *gcs.Client, bucket string, prefix string, cutoff time.Time, maxDeletions int, dryRun bool) error { var scanned, matched, deleted int - objects := client.Bucket(bucket).Objects(ctx, &storage.Query{Prefix: prefix}) + objects := client.Bucket(bucket).Objects(ctx, &gcs.Query{Prefix: prefix}) for { attrs, err := objects.Next() if errors.Is(err, iterator.Done) { @@ -99,3 +152,18 @@ func clean(ctx context.Context, bucket string, prefix string, cutoff time.Time, return nil } + +func newRapidIndex(ctx context.Context, bucket string) (storage.RapidCacheIndex, func()) { + redisClient, err := factories.NewRedisClient(ctx, factories.RedisConfig{ + RedisURL: os.Getenv("REDIS_URL"), + RedisClusterURL: os.Getenv("REDIS_CLUSTER_URL"), + RedisTLSCABase64: os.Getenv("REDIS_TLS_CA_BASE64"), + }) + if err != nil { + return storage.NoopRapidCacheIndex(), func() {} + } + + return storage.NewRedisRapidCacheIndex(redisClient, bucket), func() { + _ = factories.CloseCleanly(redisClient) + } +} diff --git a/packages/orchestrator/pkg/factories/run.go b/packages/orchestrator/pkg/factories/run.go index 0a963000df..1fa81f8b1c 100644 --- a/packages/orchestrator/pkg/factories/run.go +++ b/packages/orchestrator/pkg/factories/run.go @@ -405,6 +405,7 @@ func run(config cfg.Config, opts Options) (success bool) { if err != nil { logger.L().Fatal(ctx, "failed to create template cache", zap.Error(err)) } + templateCache.SetRapidCacheIndex(storage.NewRedisRapidCacheIndex(redisClient, storage.RapidBucketCacheStorageConfig.GetBucketName())) templateCache.Start(ctx) closers = append(closers, closer{"template cache", func(context.Context) error { templateCache.Stop() diff --git a/packages/orchestrator/pkg/sandbox/template/cache.go b/packages/orchestrator/pkg/sandbox/template/cache.go index bbbead0770..3d8fef2656 100644 --- a/packages/orchestrator/pkg/sandbox/template/cache.go +++ b/packages/orchestrator/pkg/sandbox/template/cache.go @@ -54,6 +54,7 @@ type Cache struct { cache *ttlcache.Cache[string, Template] persistence storage.StorageProvider rapidCache storage.StorageProvider + rapidIndex storage.RapidCacheIndex rapidCacheMu sync.Mutex buildStore *build.DiffStore blockMetrics blockmetrics.Metrics @@ -108,6 +109,7 @@ func NewCache( blockMetrics: metrics, config: config, persistence: persistence, + rapidIndex: storage.NoopRapidCacheIndex(), buildStore: buildStore, cache: cache, flags: flags, @@ -116,6 +118,12 @@ func NewCache( }, nil } +func (c *Cache) SetRapidCacheIndex(index storage.RapidCacheIndex) { + if index != nil { + c.rapidIndex = index + } +} + func (c *Cache) Start(ctx context.Context) { c.buildStore.Start(ctx) @@ -172,7 +180,7 @@ func (c *Cache) GetTemplate( persistence := c.persistence if cache, enabled := c.useRapidBucketCache(ctx, isBuilding); enabled { - persistence = storage.WrapInRapidBucketCache(ctx, cache, persistence) + persistence = storage.WrapInRapidBucketCache(ctx, cache, persistence, c.rapidIndex) span.SetAttributes(attribute.Bool("use_cache", true)) } else if path, enabled := c.useNFSCache(ctx, isBuilding, isSnapshot); enabled { logger.L().Info(ctx, "using local template cache", zap.String("path", c.rootCachePath)) diff --git a/packages/shared/pkg/storage/storage_cache_rapid.go b/packages/shared/pkg/storage/storage_cache_rapid.go index d9fb72e339..4c7ac38895 100644 --- a/packages/shared/pkg/storage/storage_cache_rapid.go +++ b/packages/shared/pkg/storage/storage_cache_rapid.go @@ -11,12 +11,18 @@ import ( type rapidCacheProvider struct { cache StorageProvider inner StorageProvider + index RapidCacheIndex } -func WrapInRapidBucketCache(_ context.Context, cache StorageProvider, inner StorageProvider) StorageProvider { +func WrapInRapidBucketCache(_ context.Context, cache StorageProvider, inner StorageProvider, index RapidCacheIndex) StorageProvider { + if index == nil { + index = NoopRapidCacheIndex() + } + return &rapidCacheProvider{ cache: cache, inner: inner, + index: index, } } @@ -44,6 +50,7 @@ func (p *rapidCacheProvider) OpenSeekable(ctx context.Context, path string, obje path: "rapid-cache/" + path, cache: p.cache, inner: inner, + index: p.index, }, nil } @@ -55,6 +62,7 @@ type rapidCachedSeekable struct { path string cache StorageProvider inner Seekable + index RapidCacheIndex } func (c *rapidCachedSeekable) OpenRangeReader(ctx context.Context, off int64, length int64, frameTable *FrameTable) (io.ReadCloser, error) { @@ -67,6 +75,8 @@ func (c *rapidCachedSeekable) OpenRangeReader(ctx context.Context, off int64, le cachePath := fmt.Sprintf("%s/%012d-%d.bin", c.path, off/MemoryChunkSize, length) if rc, err := c.openCache(ctx, cachePath, length); err == nil { + c.touch(ctx, cachePath) + return rc, nil } @@ -90,6 +100,7 @@ func (c *rapidCachedSeekable) openCompressed(ctx context.Context, off int64, fra cachePath := makeFrameFilename(c.path, r) if raw, err := c.openCache(ctx, cachePath, int64(r.Length)); err == nil { + c.touch(ctx, cachePath) dec, err := newDecompressingReadCloser(raw, frameTable.CompressionType()) if err != nil { raw.Close() @@ -159,7 +170,13 @@ func (c *rapidCachedSeekable) writeCache(ctx context.Context, path string, data return err } - return blob.Put(ctx, data) + if err := blob.Put(ctx, data); err != nil { + return err + } + + _ = c.index.Admit(ctx, path, int64(len(data))) + + return nil } func (c *rapidCachedSeekable) goCtx(ctx context.Context, fn func(context.Context)) { @@ -168,6 +185,12 @@ func (c *rapidCachedSeekable) goCtx(ctx context.Context, fn func(context.Context }() } +func (c *rapidCachedSeekable) touch(ctx context.Context, path string) { + c.goCtx(ctx, func(ctx context.Context) { + _ = c.index.Touch(ctx, path) + }) +} + type rapidCacheWriteThroughReader struct { inner io.ReadCloser buf *bytes.Buffer diff --git a/packages/shared/pkg/storage/storage_cache_rapid_index.go b/packages/shared/pkg/storage/storage_cache_rapid_index.go new file mode 100644 index 0000000000..69bec548bb --- /dev/null +++ b/packages/shared/pkg/storage/storage_cache_rapid_index.go @@ -0,0 +1,129 @@ +package storage + +import ( + "context" + "strconv" + "strings" + "time" + + "github.com/redis/go-redis/v9" +) + +const rapidCachePrefix = "rapid-cache/" + +type RapidCacheIndex interface { + Touch(ctx context.Context, path string) error + Admit(ctx context.Context, path string, size int64) error + Evict(ctx context.Context, path string, size int64) error + Candidates(ctx context.Context, before time.Time, limit int64) ([]string, error) +} + +type noopRapidCacheIndex struct{} + +func NoopRapidCacheIndex() RapidCacheIndex { return noopRapidCacheIndex{} } + +func (noopRapidCacheIndex) Touch(context.Context, string) error { return nil } +func (noopRapidCacheIndex) Admit(context.Context, string, int64) error { return nil } +func (noopRapidCacheIndex) Evict(context.Context, string, int64) error { return nil } +func (noopRapidCacheIndex) Candidates(context.Context, time.Time, int64) ([]string, error) { + return nil, nil +} + +type redisRapidCacheIndex struct { + redis redis.UniversalClient + keys rapidCacheIndexKeys +} + +type rapidCacheIndexKeys struct { + chunks string + builds string + buildBytes string + buildChunks string +} + +func NewRedisRapidCacheIndex(client redis.UniversalClient, bucket string) RapidCacheIndex { + if client == nil || bucket == "" { + return NoopRapidCacheIndex() + } + + prefix := "rapid-cache:{" + bucket + "}" + + return &redisRapidCacheIndex{ + redis: client, + keys: rapidCacheIndexKeys{ + chunks: prefix + ":chunks", + builds: prefix + ":builds", + buildBytes: prefix + ":build_bytes", + buildChunks: prefix + ":build_chunks", + }, + } +} + +func (i *redisRapidCacheIndex) Touch(ctx context.Context, path string) error { + buildID := rapidCacheBuildID(path) + if buildID == "" { + return nil + } + + now := float64(time.Now().Unix()) + pipe := i.redis.Pipeline() + pipe.ZAdd(ctx, i.keys.chunks, redis.Z{Score: now, Member: path}) + pipe.ZAdd(ctx, i.keys.builds, redis.Z{Score: now, Member: buildID}) + _, err := pipe.Exec(ctx) + + return err +} + +func (i *redisRapidCacheIndex) Admit(ctx context.Context, path string, size int64) error { + buildID := rapidCacheBuildID(path) + if buildID == "" { + return nil + } + + return rapidCacheAdmitScript.Run(ctx, i.redis, []string{ + i.keys.chunks, + i.keys.builds, + i.keys.buildBytes, + i.keys.buildChunks, + }, time.Now().Unix(), path, buildID, size).Err() +} + +func (i *redisRapidCacheIndex) Evict(ctx context.Context, path string, size int64) error { + buildID := rapidCacheBuildID(path) + pipe := i.redis.Pipeline() + pipe.ZRem(ctx, i.keys.chunks, path) + if buildID != "" { + pipe.HIncrBy(ctx, i.keys.buildBytes, buildID, -size) + pipe.HIncrBy(ctx, i.keys.buildChunks, buildID, -1) + } + _, err := pipe.Exec(ctx) + + return err +} + +func (i *redisRapidCacheIndex) Candidates(ctx context.Context, before time.Time, limit int64) ([]string, error) { + return i.redis.ZRangeByScore(ctx, i.keys.chunks, &redis.ZRangeBy{ + Min: "-inf", + Max: strconv.FormatInt(before.Unix(), 10), + Offset: 0, + Count: limit, + }).Result() +} + +func rapidCacheBuildID(path string) string { + path = strings.TrimPrefix(path, rapidCachePrefix) + buildID, _ := SplitPath(path) + + return buildID +} + +var rapidCacheAdmitScript = redis.NewScript(` +local added = redis.call('ZADD', KEYS[1], 'NX', ARGV[1], ARGV[2]) +redis.call('ZADD', KEYS[1], ARGV[1], ARGV[2]) +redis.call('ZADD', KEYS[2], ARGV[1], ARGV[3]) +if added == 1 then + redis.call('HINCRBY', KEYS[3], ARGV[3], ARGV[4]) + redis.call('HINCRBY', KEYS[4], ARGV[3], 1) +end +return added +`) From 585d7e8af23387c057f51905165fdd5240e687dd Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Sun, 31 May 2026 04:28:01 +0000 Subject: [PATCH 08/13] chore: auto-commit generated changes --- tests/integration/go.mod | 2 ++ tests/integration/go.sum | 8 ++++++++ 2 files changed, 10 insertions(+) diff --git a/tests/integration/go.mod b/tests/integration/go.mod index 290788022d..45c22628a8 100644 --- a/tests/integration/go.mod +++ b/tests/integration/go.mod @@ -79,6 +79,7 @@ require ( github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dchest/uniuri v1.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dnephin/pflag v1.0.7 // indirect github.com/dprotaso/go-yit v0.0.0-20220510233725-9ba8df137936 // indirect github.com/envoyproxy/go-control-plane/envoy v1.37.0 // indirect @@ -143,6 +144,7 @@ require ( github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/pressly/goose/v3 v3.26.0 // indirect + github.com/redis/go-redis/v9 v9.17.3 // indirect github.com/rs/zerolog v1.34.0 // indirect github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 // indirect github.com/segmentio/asm v1.2.0 // indirect diff --git a/tests/integration/go.sum b/tests/integration/go.sum index f9c5bc7e0e..f4031472df 100644 --- a/tests/integration/go.sum +++ b/tests/integration/go.sum @@ -91,6 +91,10 @@ github.com/bitfield/gotestdox v0.2.2/go.mod h1:D+gwtS0urjBrzguAkTM2wodsTQYFHdpx8 github.com/bits-and-blooms/bitset v1.24.4 h1:95H15Og1clikBrKr/DuzMXkQzECs1M6hhoGXLwLQOZE= github.com/bits-and-blooms/bitset v1.24.4/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -107,6 +111,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dchest/uniuri v1.2.0 h1:koIcOUdrTIivZgSLhHQvKgqdWZq5d7KdMEWF1Ud6+5g= github.com/dchest/uniuri v1.2.0/go.mod h1:fSzm4SLHzNZvWLvWJew423PhAzkpNQYq+uNLq4kxhkY= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dlclark/regexp2 v1.11.5 h1:Q/sSnsKerHeCkc/jSTNq1oCm7KiVgUMZRDUoRu0JQZQ= github.com/dlclark/regexp2 v1.11.5/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= github.com/dnephin/pflag v1.0.7 h1:oxONGlWxhmUct0YzKTgrpQv9AUA1wtPBn7zuSjJqptk= @@ -326,6 +332,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pressly/goose/v3 v3.26.0 h1:KJakav68jdH0WDvoAcj8+n61WqOIaPGgH0bJWS6jpmM= github.com/pressly/goose/v3 v3.26.0/go.mod h1:4hC1KrritdCxtuFsqgs1R4AU5bWtTAf+cnWvfhf2DNY= +github.com/redis/go-redis/v9 v9.17.3 h1:fN29NdNrE17KttK5Ndf20buqfDZwGNgoUr9qjl1DQx4= +github.com/redis/go-redis/v9 v9.17.3/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= From 3e5ade6eecbb278e99d9450c5dc3ab7dc2dc08a8 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 21:51:07 -0700 Subject: [PATCH 09/13] fix(storage): satisfy rapid cleaner lint Avoid deferred cleanup before fatal exit and keep Redis cleanup fallback explicit. --- packages/orchestrator/cmd/clean-rapid-cache/main.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/orchestrator/cmd/clean-rapid-cache/main.go b/packages/orchestrator/cmd/clean-rapid-cache/main.go index bbeedbc5f8..8149e199a8 100644 --- a/packages/orchestrator/cmd/clean-rapid-cache/main.go +++ b/packages/orchestrator/cmd/clean-rapid-cache/main.go @@ -55,9 +55,9 @@ func main() { ctx := context.Background() index, closeIndex := newRapidIndex(ctx, bucket) - defer closeIndex() - - if err := clean(ctx, bucket, prefix, time.Now().Add(-maxAge), maxDeletions, dryRun, index); err != nil { + err := clean(ctx, bucket, prefix, time.Now().Add(-maxAge), maxDeletions, dryRun, index) + closeIndex() + if err != nil { log.Fatal(err) } } @@ -83,8 +83,8 @@ func clean(ctx context.Context, bucket string, prefix string, cutoff time.Time, } func cleanFromIndex(ctx context.Context, client *gcs.Client, bucket string, cutoff time.Time, maxDeletions int, dryRun bool, index storage.RapidCacheIndex) (int, error) { - candidates, err := index.Candidates(ctx, cutoff, int64(maxDeletions)) - if err != nil || len(candidates) == 0 { + candidates, _ := index.Candidates(ctx, cutoff, int64(maxDeletions)) + if len(candidates) == 0 { return 0, nil } From eec262ba4c1ecbfc44cdafbe2082477f1ff455e8 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 21:52:17 -0700 Subject: [PATCH 10/13] fix(storage): guard rapid cleanup with redis recency Use Redis touch scores to avoid deleting recently used chunks during bucket fallback cleanup and clean stale Redis entries after deletes. --- .../cmd/clean-rapid-cache/main.go | 20 ++++++++++++------- .../pkg/storage/storage_cache_rapid_index.go | 19 +++++++++++++++++- 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/packages/orchestrator/cmd/clean-rapid-cache/main.go b/packages/orchestrator/cmd/clean-rapid-cache/main.go index 8149e199a8..7ebe01af62 100644 --- a/packages/orchestrator/cmd/clean-rapid-cache/main.go +++ b/packages/orchestrator/cmd/clean-rapid-cache/main.go @@ -71,15 +71,12 @@ func clean(ctx context.Context, bucket string, prefix string, cutoff time.Time, _ = client.Close() }() - if deleted, err := cleanFromIndex(ctx, client, bucket, cutoff, maxDeletions, dryRun, index); err != nil { + deleted, err := cleanFromIndex(ctx, client, bucket, cutoff, maxDeletions, dryRun, index) + if err != nil { return err - } else if deleted > 0 { - log.Printf("summary dry_run=%t deleted=%d source=redis", dryRun, deleted) - - return nil } - return cleanFromBucket(ctx, client, bucket, prefix, cutoff, maxDeletions, dryRun) + return cleanFromBucket(ctx, client, bucket, prefix, cutoff, maxDeletions-deleted, dryRun, index) } func cleanFromIndex(ctx context.Context, client *gcs.Client, bucket string, cutoff time.Time, maxDeletions int, dryRun bool, index storage.RapidCacheIndex) (int, error) { @@ -117,7 +114,11 @@ func cleanFromIndex(ctx context.Context, client *gcs.Client, bucket string, cuto return deleted, nil } -func cleanFromBucket(ctx context.Context, client *gcs.Client, bucket string, prefix string, cutoff time.Time, maxDeletions int, dryRun bool) error { +func cleanFromBucket(ctx context.Context, client *gcs.Client, bucket string, prefix string, cutoff time.Time, maxDeletions int, dryRun bool, index storage.RapidCacheIndex) error { + if maxDeletions <= 0 { + return nil + } + var scanned, matched, deleted int objects := client.Bucket(bucket).Objects(ctx, &gcs.Query{Prefix: prefix}) for { @@ -133,6 +134,10 @@ func cleanFromBucket(ctx context.Context, client *gcs.Client, bucket string, pre if !attrs.Updated.Before(cutoff) { continue } + lastAccess, ok, _ := index.LastAccess(ctx, attrs.Name) + if ok && lastAccess >= cutoff.Unix() { + continue + } matched++ if deleted >= maxDeletions { break @@ -145,6 +150,7 @@ func cleanFromBucket(ctx context.Context, client *gcs.Client, bucket string, pre if err := client.Bucket(bucket).Object(attrs.Name).Delete(ctx); err != nil { return fmt.Errorf("delete cache object: %w", err) } + _ = index.Evict(ctx, attrs.Name, attrs.Size) deleted++ } diff --git a/packages/shared/pkg/storage/storage_cache_rapid_index.go b/packages/shared/pkg/storage/storage_cache_rapid_index.go index 69bec548bb..9b81abb0cd 100644 --- a/packages/shared/pkg/storage/storage_cache_rapid_index.go +++ b/packages/shared/pkg/storage/storage_cache_rapid_index.go @@ -2,6 +2,7 @@ package storage import ( "context" + "errors" "strconv" "strings" "time" @@ -16,6 +17,7 @@ type RapidCacheIndex interface { Admit(ctx context.Context, path string, size int64) error Evict(ctx context.Context, path string, size int64) error Candidates(ctx context.Context, before time.Time, limit int64) ([]string, error) + LastAccess(ctx context.Context, path string) (int64, bool, error) } type noopRapidCacheIndex struct{} @@ -28,6 +30,9 @@ func (noopRapidCacheIndex) Evict(context.Context, string, int64) error { return func (noopRapidCacheIndex) Candidates(context.Context, time.Time, int64) ([]string, error) { return nil, nil } +func (noopRapidCacheIndex) LastAccess(context.Context, string) (int64, bool, error) { + return 0, false, nil +} type redisRapidCacheIndex struct { redis redis.UniversalClient @@ -92,7 +97,7 @@ func (i *redisRapidCacheIndex) Evict(ctx context.Context, path string, size int6 buildID := rapidCacheBuildID(path) pipe := i.redis.Pipeline() pipe.ZRem(ctx, i.keys.chunks, path) - if buildID != "" { + if buildID != "" && size > 0 { pipe.HIncrBy(ctx, i.keys.buildBytes, buildID, -size) pipe.HIncrBy(ctx, i.keys.buildChunks, buildID, -1) } @@ -110,6 +115,18 @@ func (i *redisRapidCacheIndex) Candidates(ctx context.Context, before time.Time, }).Result() } +func (i *redisRapidCacheIndex) LastAccess(ctx context.Context, path string) (int64, bool, error) { + score, err := i.redis.ZScore(ctx, i.keys.chunks, path).Result() + if errors.Is(err, redis.Nil) { + return 0, false, nil + } + if err != nil { + return 0, false, err + } + + return int64(score), true, nil +} + func rapidCacheBuildID(path string) string { path = strings.TrimPrefix(path, rapidCachePrefix) buildID, _ := SplitPath(path) From 57494004639d034b515be0f0f8ac6fb13f60cadc Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 21:58:34 -0700 Subject: [PATCH 11/13] fix(storage): gofumpt rapid cache index Apply gofumpt formatting to the rapid cache index helper. --- packages/shared/pkg/storage/storage_cache_rapid_index.go | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/shared/pkg/storage/storage_cache_rapid_index.go b/packages/shared/pkg/storage/storage_cache_rapid_index.go index 9b81abb0cd..108ff85c90 100644 --- a/packages/shared/pkg/storage/storage_cache_rapid_index.go +++ b/packages/shared/pkg/storage/storage_cache_rapid_index.go @@ -30,6 +30,7 @@ func (noopRapidCacheIndex) Evict(context.Context, string, int64) error { return func (noopRapidCacheIndex) Candidates(context.Context, time.Time, int64) ([]string, error) { return nil, nil } + func (noopRapidCacheIndex) LastAccess(context.Context, string) (int64, bool, error) { return 0, false, nil } From 1043f735800939b32c27133d0ccc4d8f381f6b10 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 21:59:17 -0700 Subject: [PATCH 12/13] fix(storage): tighten rapid cleanup errors Surface rapid prefix delete failures and avoid bucket fallback deletes when Redis recency cannot be checked. --- packages/orchestrator/cmd/clean-rapid-cache/main.go | 5 ++++- packages/shared/pkg/storage/storage_cache_rapid.go | 6 ++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/packages/orchestrator/cmd/clean-rapid-cache/main.go b/packages/orchestrator/cmd/clean-rapid-cache/main.go index 7ebe01af62..69768e0ee3 100644 --- a/packages/orchestrator/cmd/clean-rapid-cache/main.go +++ b/packages/orchestrator/cmd/clean-rapid-cache/main.go @@ -134,7 +134,10 @@ func cleanFromBucket(ctx context.Context, client *gcs.Client, bucket string, pre if !attrs.Updated.Before(cutoff) { continue } - lastAccess, ok, _ := index.LastAccess(ctx, attrs.Name) + lastAccess, ok, err := index.LastAccess(ctx, attrs.Name) + if err != nil { + continue + } if ok && lastAccess >= cutoff.Unix() { continue } diff --git a/packages/shared/pkg/storage/storage_cache_rapid.go b/packages/shared/pkg/storage/storage_cache_rapid.go index 4c7ac38895..f28338aa0f 100644 --- a/packages/shared/pkg/storage/storage_cache_rapid.go +++ b/packages/shared/pkg/storage/storage_cache_rapid.go @@ -3,6 +3,7 @@ package storage import ( "bytes" "context" + "errors" "fmt" "io" "time" @@ -27,9 +28,10 @@ func WrapInRapidBucketCache(_ context.Context, cache StorageProvider, inner Stor } func (p *rapidCacheProvider) DeleteObjectsWithPrefix(ctx context.Context, prefix string) error { - _ = p.cache.DeleteObjectsWithPrefix(ctx, "rapid-cache/"+prefix) + cacheErr := p.cache.DeleteObjectsWithPrefix(ctx, "rapid-cache/"+prefix) + innerErr := p.inner.DeleteObjectsWithPrefix(ctx, prefix) - return p.inner.DeleteObjectsWithPrefix(ctx, prefix) + return errors.Join(cacheErr, innerErr) } func (p *rapidCacheProvider) UploadSignedURL(ctx context.Context, path string, ttl time.Duration) (string, error) { From b61e373a6df6a75d6bc450a72b43eff20efa8aaf Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 22:07:31 -0700 Subject: [PATCH 13/13] fix(storage): keep rapid cleanup fallback running Make Redis index cleanup best-effort, recheck recency before index deletes, and let bucket fallback proceed when Redis is unavailable. --- .../cmd/clean-rapid-cache/main.go | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/packages/orchestrator/cmd/clean-rapid-cache/main.go b/packages/orchestrator/cmd/clean-rapid-cache/main.go index 69768e0ee3..7fe6ca0f89 100644 --- a/packages/orchestrator/cmd/clean-rapid-cache/main.go +++ b/packages/orchestrator/cmd/clean-rapid-cache/main.go @@ -71,22 +71,23 @@ func clean(ctx context.Context, bucket string, prefix string, cutoff time.Time, _ = client.Close() }() - deleted, err := cleanFromIndex(ctx, client, bucket, cutoff, maxDeletions, dryRun, index) - if err != nil { - return err - } + deleted := cleanFromIndex(ctx, client, bucket, cutoff, maxDeletions, dryRun, index) return cleanFromBucket(ctx, client, bucket, prefix, cutoff, maxDeletions-deleted, dryRun, index) } -func cleanFromIndex(ctx context.Context, client *gcs.Client, bucket string, cutoff time.Time, maxDeletions int, dryRun bool, index storage.RapidCacheIndex) (int, error) { - candidates, _ := index.Candidates(ctx, cutoff, int64(maxDeletions)) - if len(candidates) == 0 { - return 0, nil +func cleanFromIndex(ctx context.Context, client *gcs.Client, bucket string, cutoff time.Time, maxDeletions int, dryRun bool, index storage.RapidCacheIndex) int { + candidates, err := index.Candidates(ctx, cutoff, int64(maxDeletions)) + if err != nil || len(candidates) == 0 { + return 0 } deleted := 0 for _, path := range candidates { + lastAccess, ok, err := index.LastAccess(ctx, path) + if err != nil || (ok && lastAccess >= cutoff.Unix()) { + continue + } obj := client.Bucket(bucket).Object(path) attrs, err := obj.Attrs(ctx) if errors.Is(err, gcs.ErrObjectNotExist) { @@ -97,7 +98,7 @@ func cleanFromIndex(ctx context.Context, client *gcs.Client, bucket string, cuto continue } if err != nil { - return deleted, fmt.Errorf("read cache object metadata: %w", err) + continue } if dryRun { deleted++ @@ -105,13 +106,13 @@ func cleanFromIndex(ctx context.Context, client *gcs.Client, bucket string, cuto continue } if err := obj.Delete(ctx); err != nil { - return deleted, fmt.Errorf("delete cache object: %w", err) + continue } _ = index.Evict(ctx, path, attrs.Size) deleted++ } - return deleted, nil + return deleted } func cleanFromBucket(ctx context.Context, client *gcs.Client, bucket string, prefix string, cutoff time.Time, maxDeletions int, dryRun bool, index storage.RapidCacheIndex) error { @@ -135,10 +136,7 @@ func cleanFromBucket(ctx context.Context, client *gcs.Client, bucket string, pre continue } lastAccess, ok, err := index.LastAccess(ctx, attrs.Name) - if err != nil { - continue - } - if ok && lastAccess >= cutoff.Unix() { + if err == nil && ok && lastAccess >= cutoff.Unix() { continue } matched++