Add Rapid bucket template cache#2876
Conversation
Add a feature-gated bucket-backed read-through cache for template artifacts as an alternative to the shared NFS cache.
PR SummaryMedium Risk Overview Reviewed by Cursor Bugbot for commit b61e373. Bugbot is set up for automated code reviews on this repo. Configure here. |
There was a problem hiding this comment.
Code Review
In packages/shared/pkg/storage/storage_cache_rapid.go, calling c.wg.Go will cause a compilation error because sync.WaitGroup does not have a Go method. The background cache write-back goroutines do not need to be synchronized or waited on, so the function should be run directly in a goroutine and the unused wg field removed from the rapidCachedSeekable struct.
❌ 3 Tests Failed:
View the full list of 5 ❄️ flaky test(s)
To view more test analytics, go to the Test Analytics Dashboard |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Autofix Details
Bugbot Autofix prepared fixes for all 3 issues found in the latest run.
- ✅ Fixed: Rapid cache not invalidated on delete
- Added async deletion of rapid-cache/ prefixed objects in DeleteObjectsWithPrefix to match NFS cache cleanup behavior.
- ✅ Resolved by another fix: Rapid cache orphaned on template delete
- This bug is resolved by the same fix applied to bug 1c5b6c3a which adds cache invalidation to DeleteObjectsWithPrefix.
- ✅ Fixed: Compressed rapid hits skip size check
- Added size validation in openCompressed to check cached object size matches expected frame length, with automatic invalidation of mismatched entries.
Or push these changes by commenting:
@cursor push ee088a1d9b
Preview (ee088a1d9b)
diff --git a/packages/shared/pkg/storage/storage_cache_rapid.go b/packages/shared/pkg/storage/storage_cache_rapid.go
--- a/packages/shared/pkg/storage/storage_cache_rapid.go
+++ b/packages/shared/pkg/storage/storage_cache_rapid.go
@@ -22,6 +22,12 @@
}
func (p *rapidCacheProvider) DeleteObjectsWithPrefix(ctx context.Context, prefix string) error {
+ // Delete from cache bucket asynchronously (cached objects are under "rapid-cache/" prefix)
+ go func(ctx context.Context) {
+ cachePrefix := "rapid-cache/" + prefix
+ _ = p.cache.DeleteObjectsWithPrefix(ctx, cachePrefix)
+ }(context.WithoutCancel(ctx))
+
return p.inner.DeleteObjectsWithPrefix(ctx, prefix)
}
@@ -88,14 +94,26 @@
cachePath := makeFrameFilename(c.path, r)
if raw, err := c.openCache(ctx, cachePath, int64(r.Length)); err == nil {
- dec, err := newDecompressingReadCloser(raw, frameTable.CompressionType())
- if err != nil {
- raw.Close()
+ // Validate cached object size matches expected frame length
+ size, sizeErr := c.getCacheSize(ctx, cachePath)
+ if sizeErr == nil && size == int64(r.Length) {
+ dec, err := newDecompressingReadCloser(raw, frameTable.CompressionType())
+ if err != nil {
+ raw.Close()
- return nil, err
+ return nil, err
+ }
+
+ return dec, nil
}
-
- return dec, nil
+ // Size mismatch or size check failed: close reader and delete bad cache entry
+ raw.Close()
+ if sizeErr == nil {
+ // Confirmed size mismatch: delete asynchronously to allow refetch
+ go func(ctx context.Context) {
+ _ = c.cache.DeleteObjectsWithPrefix(ctx, cachePath)
+ }(context.WithoutCancel(ctx))
+ }
}
raw, err := c.inner.OpenRangeReader(ctx, r.Offset, int64(r.Length), nil)
@@ -130,6 +148,15 @@
return obj.OpenRangeReader(ctx, 0, length, nil)
}
+func (c *rapidCachedSeekable) getCacheSize(ctx context.Context, path string) (int64, error) {
+ obj, err := c.cache.OpenSeekable(ctx, path, UnknownSeekableObjectType)
+ if err != nil {
+ return 0, err
+ }
+
+ return obj.Size(ctx)
+}
+
func (c *rapidCachedSeekable) writeCache(ctx context.Context, path string, data []byte) error {
blob, err := c.cache.OpenBlob(ctx, path, UnknownObjectType)
if err != nil {You can send follow-ups to the cloud agent here.
Validate cached object sizes, clean cache prefixes with canonical deletes, and use plain goroutines for best-effort writeback.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Autofix Details
Bugbot Autofix prepared fixes for both issues found in the latest run.
- ✅ Fixed: Rapid cache skips read validation
- Added validateReadParams method to rapidCachedSeekable to ensure offset alignment and prevent cache key collisions from unaligned reads.
- ✅ Fixed: Cache delete blocks primary delete
- Changed DeleteObjectsWithPrefix to ignore cache deletion failures by discarding the error and always proceeding to delete from primary storage.
Or push these changes by commenting:
@cursor push 65a2bd76f7
Preview (65a2bd76f7)
diff --git a/packages/shared/pkg/storage/storage_cache_rapid.go b/packages/shared/pkg/storage/storage_cache_rapid.go
--- a/packages/shared/pkg/storage/storage_cache_rapid.go
+++ b/packages/shared/pkg/storage/storage_cache_rapid.go
@@ -21,9 +21,7 @@
}
func (p *rapidCacheProvider) DeleteObjectsWithPrefix(ctx context.Context, prefix string) error {
- if err := p.cache.DeleteObjectsWithPrefix(ctx, "rapid-cache/"+prefix); err != nil {
- return err
- }
+ _ = p.cache.DeleteObjectsWithPrefix(ctx, "rapid-cache/"+prefix)
return p.inner.DeleteObjectsWithPrefix(ctx, prefix)
}
@@ -64,6 +62,10 @@
return c.openCompressed(ctx, off, frameTable)
}
+ if err := c.validateReadParams(length, off); err != nil {
+ return nil, err
+ }
+
cachePath := fmt.Sprintf("%s/%012d-%d.bin", c.path, off/MemoryChunkSize, length)
if rc, err := c.openCache(ctx, cachePath, length); err == nil {
return rc, nil
@@ -114,6 +116,23 @@
return dec, nil
}
+func (c *rapidCachedSeekable) validateReadParams(buffSize, offset int64) error {
+ if buffSize == 0 {
+ return ErrBufferTooSmall
+ }
+ if buffSize > MemoryChunkSize {
+ return ErrBufferTooLarge
+ }
+ if offset%MemoryChunkSize != 0 {
+ return ErrOffsetUnaligned
+ }
+ if (offset%MemoryChunkSize)+buffSize > MemoryChunkSize {
+ return ErrMultipleChunks
+ }
+
+ return nil
+}
+
func (c *rapidCachedSeekable) Size(ctx context.Context) (int64, error) {
return c.inner.Size(ctx)
}You can send follow-ups to the cloud agent here.
Validate uncompressed rapid cache reads and avoid blocking canonical deletes on cache cleanup failures.
Add a disabled-by-default periodic GCS cleaner for rapid-cache objects so the cache can be bounded without relying on bucket lifecycle rules.
Make the rapid cache cleaner explicit about ignoring client close errors.
Satisfy orchestrator lint formatting for the dry-run branch.
Record Rapid cache chunk recency asynchronously so cleanup can evict cold chunks while reads still use deterministic bucket paths.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Autofix Details
Bugbot Autofix prepared fixes for all 3 issues found in the latest run.
- ✅ Fixed: Cleanup deletes recently used cache
- Added IsRecent check during bucket scan to skip objects that exist in Redis index, preventing deletion of hot cache entries whose GCS Updated timestamp is old but Redis score is recent.
- ✅ Fixed: Successful index pass skips bucket scan
- Modified clean function to always run both cleanFromIndex and cleanFromBucket passes, ensuring orphaned objects (in GCS but not in Redis) are discovered and removed even when the index pass deletes objects.
- ✅ Fixed: Build delete leaves Redis index
- Added EvictPrefix method to RapidCacheIndex interface and implemented it using Redis ZSCAN to bulk-remove matching entries from chunks ZSET and decrement build counters when DeleteObjectsWithPrefix is called.
Or push these changes by commenting:
@cursor push 25445707df
Preview (25445707df)
diff --git a/packages/orchestrator/cmd/clean-rapid-cache/main.go b/packages/orchestrator/cmd/clean-rapid-cache/main.go
--- a/packages/orchestrator/cmd/clean-rapid-cache/main.go
+++ b/packages/orchestrator/cmd/clean-rapid-cache/main.go
@@ -71,15 +71,21 @@
_ = client.Close()
}()
- if deleted, err := cleanFromIndex(ctx, client, bucket, cutoff, maxDeletions, dryRun, index); err != nil {
+ deletedIndex, err := cleanFromIndex(ctx, client, bucket, cutoff, maxDeletions, dryRun, index)
+ if err != nil {
return err
- } else if deleted > 0 {
- log.Printf("summary dry_run=%t deleted=%d source=redis", dryRun, deleted)
+ }
+ if deletedIndex > 0 {
+ log.Printf("summary dry_run=%t deleted=%d source=redis", dryRun, deletedIndex)
+ }
+
+ remaining := maxDeletions - deletedIndex
+ if remaining <= 0 {
return nil
}
- return cleanFromBucket(ctx, client, bucket, prefix, cutoff, maxDeletions, dryRun)
+ return cleanFromBucket(ctx, client, bucket, prefix, cutoff, remaining, dryRun, index)
}
func cleanFromIndex(ctx context.Context, client *gcs.Client, bucket string, cutoff time.Time, maxDeletions int, dryRun bool, index storage.RapidCacheIndex) (int, error) {
@@ -117,7 +123,7 @@
return deleted, nil
}
-func cleanFromBucket(ctx context.Context, client *gcs.Client, bucket string, prefix string, cutoff time.Time, maxDeletions int, dryRun bool) error {
+func cleanFromBucket(ctx context.Context, client *gcs.Client, bucket string, prefix string, cutoff time.Time, maxDeletions int, dryRun bool, index storage.RapidCacheIndex) error {
var scanned, matched, deleted int
objects := client.Bucket(bucket).Objects(ctx, &gcs.Query{Prefix: prefix})
for {
@@ -133,6 +139,13 @@
if !attrs.Updated.Before(cutoff) {
continue
}
+
+ if !dryRun {
+ if recent, err := index.IsRecent(ctx, attrs.Name); err == nil && recent {
+ continue
+ }
+ }
+
matched++
if deleted >= maxDeletions {
break
@@ -145,6 +158,7 @@
if err := client.Bucket(bucket).Object(attrs.Name).Delete(ctx); err != nil {
return fmt.Errorf("delete cache object: %w", err)
}
+ _ = index.Evict(ctx, attrs.Name, attrs.Size)
deleted++
}
diff --git a/packages/shared/pkg/storage/storage_cache_rapid.go b/packages/shared/pkg/storage/storage_cache_rapid.go
--- a/packages/shared/pkg/storage/storage_cache_rapid.go
+++ b/packages/shared/pkg/storage/storage_cache_rapid.go
@@ -27,6 +27,7 @@
}
func (p *rapidCacheProvider) DeleteObjectsWithPrefix(ctx context.Context, prefix string) error {
+ _ = p.index.EvictPrefix(ctx, prefix)
_ = p.cache.DeleteObjectsWithPrefix(ctx, "rapid-cache/"+prefix)
return p.inner.DeleteObjectsWithPrefix(ctx, prefix)
diff --git a/packages/shared/pkg/storage/storage_cache_rapid_index.go b/packages/shared/pkg/storage/storage_cache_rapid_index.go
--- a/packages/shared/pkg/storage/storage_cache_rapid_index.go
+++ b/packages/shared/pkg/storage/storage_cache_rapid_index.go
@@ -16,6 +16,8 @@
Admit(ctx context.Context, path string, size int64) error
Evict(ctx context.Context, path string, size int64) error
Candidates(ctx context.Context, before time.Time, limit int64) ([]string, error)
+ IsRecent(ctx context.Context, path string) (bool, error)
+ EvictPrefix(ctx context.Context, prefix string) error
}
type noopRapidCacheIndex struct{}
@@ -28,6 +30,8 @@
func (noopRapidCacheIndex) Candidates(context.Context, time.Time, int64) ([]string, error) {
return nil, nil
}
+func (noopRapidCacheIndex) IsRecent(context.Context, string) (bool, error) { return false, nil }
+func (noopRapidCacheIndex) EvictPrefix(context.Context, string) error { return nil }
type redisRapidCacheIndex struct {
redis redis.UniversalClient
@@ -110,6 +114,61 @@
}).Result()
}
+func (i *redisRapidCacheIndex) IsRecent(ctx context.Context, path string) (bool, error) {
+ score := i.redis.ZScore(ctx, i.keys.chunks, path)
+ if score.Err() == redis.Nil {
+ return false, nil
+ }
+ if score.Err() != nil {
+ return false, score.Err()
+ }
+
+ return true, nil
+}
+
+func (i *redisRapidCacheIndex) EvictPrefix(ctx context.Context, prefix string) error {
+ script := redis.NewScript(`
+local cursor = "0"
+local prefix = ARGV[1]
+local chunks_key = KEYS[1]
+local build_bytes_key = KEYS[2]
+local build_chunks_key = KEYS[3]
+
+repeat
+ local scan_result = redis.call('ZSCAN', chunks_key, cursor, 'MATCH', prefix .. '*', 'COUNT', 100)
+ cursor = scan_result[1]
+ local members = scan_result[2]
+
+ for i = 1, #members, 2 do
+ local path = members[i]
+ redis.call('ZREM', chunks_key, path)
+
+ local path_without_prefix = string.gsub(path, '^rapid%-cache/', '')
+ local build_id = string.match(path_without_prefix, '^([^/]+)')
+ if build_id then
+ local obj_result = redis.call('HGET', build_bytes_key, build_id)
+ if obj_result then
+ redis.call('HINCRBY', build_chunks_key, build_id, -1)
+ local chunks_left = redis.call('HGET', build_chunks_key, build_id)
+ if tonumber(chunks_left) <= 0 then
+ redis.call('HDEL', build_bytes_key, build_id)
+ redis.call('HDEL', build_chunks_key, build_id)
+ end
+ end
+ end
+ end
+until cursor == "0"
+
+return 0
+`)
+
+ return script.Run(ctx, i.redis, []string{
+ i.keys.chunks,
+ i.keys.buildBytes,
+ i.keys.buildChunks,
+ }, "rapid-cache/"+prefix).Err()
+}
+
func rapidCacheBuildID(path string) string {
path = strings.TrimPrefix(path, rapidCachePrefix)
buildID, _ := SplitPath(path)You can send follow-ups to the cloud agent here.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
There are 4 total unresolved issues (including 2 from previous reviews).
Autofix Details
Bugbot Autofix prepared fixes for both issues found in the latest run.
- ✅ Fixed: Evict zero size skews Redis stats
- Added size > 0 check in Evict to prevent decrementing build counters when evicting ghost entries with zero size.
- ✅ Fixed: Bucket deletes skip Redis eviction
- Added index.Evict call in cleanFromBucket after GCS deletion to synchronize Redis index with bucket state.
Or push these changes by commenting:
@cursor push cffa73b102
Preview (cffa73b102)
diff --git a/packages/orchestrator/cmd/clean-rapid-cache/main.go b/packages/orchestrator/cmd/clean-rapid-cache/main.go
--- a/packages/orchestrator/cmd/clean-rapid-cache/main.go
+++ b/packages/orchestrator/cmd/clean-rapid-cache/main.go
@@ -79,7 +79,7 @@
return nil
}
- return cleanFromBucket(ctx, client, bucket, prefix, cutoff, maxDeletions, dryRun)
+ return cleanFromBucket(ctx, client, bucket, prefix, cutoff, maxDeletions, dryRun, index)
}
func cleanFromIndex(ctx context.Context, client *gcs.Client, bucket string, cutoff time.Time, maxDeletions int, dryRun bool, index storage.RapidCacheIndex) (int, error) {
@@ -117,7 +117,7 @@
return deleted, nil
}
-func cleanFromBucket(ctx context.Context, client *gcs.Client, bucket string, prefix string, cutoff time.Time, maxDeletions int, dryRun bool) error {
+func cleanFromBucket(ctx context.Context, client *gcs.Client, bucket string, prefix string, cutoff time.Time, maxDeletions int, dryRun bool, index storage.RapidCacheIndex) error {
var scanned, matched, deleted int
objects := client.Bucket(bucket).Objects(ctx, &gcs.Query{Prefix: prefix})
for {
@@ -145,6 +145,7 @@
if err := client.Bucket(bucket).Object(attrs.Name).Delete(ctx); err != nil {
return fmt.Errorf("delete cache object: %w", err)
}
+ _ = index.Evict(ctx, attrs.Name, attrs.Size)
deleted++
}
diff --git a/packages/shared/pkg/storage/storage_cache_rapid_index.go b/packages/shared/pkg/storage/storage_cache_rapid_index.go
--- a/packages/shared/pkg/storage/storage_cache_rapid_index.go
+++ b/packages/shared/pkg/storage/storage_cache_rapid_index.go
@@ -92,7 +92,7 @@
buildID := rapidCacheBuildID(path)
pipe := i.redis.Pipeline()
pipe.ZRem(ctx, i.keys.chunks, path)
- if buildID != "" {
+ if buildID != "" && size > 0 {
pipe.HIncrBy(ctx, i.keys.buildBytes, buildID, -size)
pipe.HIncrBy(ctx, i.keys.buildChunks, buildID, -1)
}You can send follow-ups to the cloud agent here.
Avoid deferred cleanup before fatal exit and keep Redis cleanup fallback explicit.
Use Redis touch scores to avoid deleting recently used chunks during bucket fallback cleanup and clean stale Redis entries after deletes.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Autofix Details
Bugbot Autofix prepared fixes for both issues found in the latest run.
- ✅ Fixed: Rapid cache delete errors ignored
- Changed DeleteObjectsWithPrefix to return cache deletion errors instead of discarding them, ensuring both cache and inner deletion failures are properly propagated.
- ✅ Fixed: LastAccess Redis errors ignored
- Modified cleanFromBucket to check and return errors from LastAccess, preventing Redis failures from being treated as missing index entries that could lead to incorrect deletions.
Or push these changes by commenting:
@cursor push d8236c266b
You can send follow-ups to the cloud agent here.
Apply gofumpt formatting to the rapid cache index helper.
Surface rapid prefix delete failures and avoid bucket fallback deletes when Redis recency cannot be checked.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Bugbot Autofix prepared fixes for all 3 issues found in the latest run.
- ✅ Fixed: Redis error skips bucket cleanup
- Changed cleanFromBucket to fall back to GCS timestamp when Redis LastAccess fails, treating errors the same as missing keys.
- ✅ Fixed: Index deletes skip access recheck
- Added LastAccess recheck in cleanFromIndex before deletion to prevent removing objects that were touched after Candidates snapshot.
- ✅ Fixed: Index cleanup error skips bucket scan
- Changed clean function to log cleanFromIndex errors and continue to cleanFromBucket instead of returning immediately.
Or push these changes by commenting:
@cursor push b58d31345b
Preview (b58d31345b)
diff --git a/packages/orchestrator/cmd/clean-rapid-cache/main.go b/packages/orchestrator/cmd/clean-rapid-cache/main.go
--- a/packages/orchestrator/cmd/clean-rapid-cache/main.go
+++ b/packages/orchestrator/cmd/clean-rapid-cache/main.go
@@ -73,7 +73,7 @@
deleted, err := cleanFromIndex(ctx, client, bucket, cutoff, maxDeletions, dryRun, index)
if err != nil {
- return err
+ log.Printf("cleanFromIndex failed (continuing with bucket scan): %v", err)
}
return cleanFromBucket(ctx, client, bucket, prefix, cutoff, maxDeletions-deleted, dryRun, index)
@@ -99,6 +99,10 @@
if err != nil {
return deleted, fmt.Errorf("read cache object metadata: %w", err)
}
+ lastAccess, ok, err := index.LastAccess(ctx, path)
+ if err == nil && ok && lastAccess >= cutoff.Unix() {
+ continue
+ }
if dryRun {
deleted++
@@ -135,12 +139,9 @@
continue
}
lastAccess, ok, err := index.LastAccess(ctx, attrs.Name)
- if err != nil {
+ if err == nil && ok && lastAccess >= cutoff.Unix() {
continue
}
- if ok && lastAccess >= cutoff.Unix() {
- continue
- }
matched++
if deleted >= maxDeletions {
breakYou can send follow-ups to the cloud agent here.
Reviewed by Cursor Bugbot for commit 1043f73. Configure here.
Make Redis index cleanup best-effort, recheck recency before index deletes, and let bucket fallback proceed when Redis is unavailable.


Adds a disabled-by-default Rapid bucket read-through cache for template artifacts. Rapid cache hits/writes update Redis asynchronously for chunk-level cleanup hints, while reads still use deterministic bucket paths and fall back to canonical storage.
Tested: go test ./packages/shared/pkg/storage ./packages/orchestrator/cmd/clean-rapid-cache