diff --git a/Makefile b/Makefile index 267c03fc0b..c6a0a43360 100644 --- a/Makefile +++ b/Makefile @@ -82,6 +82,7 @@ build-and-upload:build-and-upload/client-proxy build-and-upload:build-and-upload/dashboard-api build-and-upload:build-and-upload/docker-reverse-proxy build-and-upload:build-and-upload/clean-nfs-cache +build-and-upload:build-and-upload/clean-rapid-cache build-and-upload:build-and-upload/orchestrator build-and-upload:build-and-upload/template-manager build-and-upload:build-and-upload/envd @@ -90,6 +91,9 @@ build-and-upload:build-and-upload/nomad-nodepool-apm build-and-upload/clean-nfs-cache: ./scripts/confirm.sh $(TERRAFORM_ENVIRONMENT) GCP_PROJECT_ID=$(GCP_PROJECT_ID) $(MAKE) -C packages/orchestrator build-and-upload/clean-nfs-cache +build-and-upload/clean-rapid-cache: + ./scripts/confirm.sh $(TERRAFORM_ENVIRONMENT) + GCP_PROJECT_ID=$(GCP_PROJECT_ID) $(MAKE) -C packages/orchestrator build-and-upload/clean-rapid-cache build-and-upload/template-manager: ./scripts/confirm.sh $(TERRAFORM_ENVIRONMENT) GCP_PROJECT_ID=$(GCP_PROJECT_ID) $(MAKE) -C packages/orchestrator build-and-upload/template-manager diff --git a/iac/provider-gcp/main.tf b/iac/provider-gcp/main.tf index 747fa95db3..277b182dc5 100644 --- a/iac/provider-gcp/main.tf +++ b/iac/provider-gcp/main.tf @@ -315,8 +315,11 @@ module "nomad" { envd_timeout = var.envd_timeout persistent_volume_mounts = { for key, config in local.persistent_volume_types : key => config["local_mount_path"] } default_persistent_volume_type = var.default_persistent_volume_type - orchestrator_env_vars = var.orchestrator_env_vars - orchestrator_enabled = var.orchestrator_enabled + orchestrator_env_vars = merge( + var.orchestrator_env_vars, + var.rapid_bucket_cache_bucket_name != "" ? { RAPID_BUCKET_CACHE_BUCKET_NAME = var.rapid_bucket_cache_bucket_name } : {}, + ) + orchestrator_enabled = var.orchestrator_enabled # Template manager builder_node_pool = var.build_node_pool @@ -334,6 +337,10 @@ module "nomad" { # Filestore shared_chunk_cache_path = module.cluster.shared_chunk_cache_path + rapid_bucket_cache_bucket_name = var.rapid_bucket_cache_bucket_name + rapid_bucket_cache_cleanup_dry_run = var.rapid_bucket_cache_cleanup_dry_run + rapid_bucket_cache_cleanup_max_age = var.rapid_bucket_cache_cleanup_max_age + rapid_bucket_cache_cleanup_max_deletions = var.rapid_bucket_cache_cleanup_max_deletions filestore_cache_cleanup_disk_usage_target = var.filestore_cache_cleanup_disk_usage_target filestore_cache_cleanup_dry_run = var.filestore_cache_cleanup_dry_run filestore_cache_cleanup_deletions_per_loop = var.filestore_cache_cleanup_deletions_per_loop diff --git a/iac/provider-gcp/nomad/jobs/clean-rapid-cache.hcl b/iac/provider-gcp/nomad/jobs/clean-rapid-cache.hcl new file mode 100644 index 0000000000..b94b454603 --- /dev/null +++ b/iac/provider-gcp/nomad/jobs/clean-rapid-cache.hcl @@ -0,0 +1,48 @@ +job "rapid-cache-cleanup" { + type = "batch" + node_pool = "${node_pool}" + + periodic { + cron = "0 * * * *" + prohibit_overlap = true + time_zone = "America/Los_Angeles" + } + + group "rapid-cache-cleanup" { + restart { + attempts = 0 + mode = "fail" + } + + task "rapid-cache-cleanup" { + driver = "raw_exec" + + resources { + memory = 512 + } + + env { + RAPID_BUCKET_CACHE_BUCKET_NAME = "${bucket_name}" + REDIS_URL = "${redis_url}" + REDIS_CLUSTER_URL = "${redis_cluster_url}" + REDIS_TLS_CA_BASE64 = "${redis_tls_ca_base64}" + } + + config { + command = "local/clean-rapid-cache" + args = [ + "--dry-run=${dry_run}", + "--max-age=${max_age}", + "--max-deletions=${max_deletions}", + "${bucket_name}", + ] + } + + artifact { + source = "${artifact_source}" + destination = "local/clean-rapid-cache" + mode = "file" + } + } + } +} diff --git a/iac/provider-gcp/nomad/main.tf b/iac/provider-gcp/nomad/main.tf index c73105f47c..cde84badaa 100644 --- a/iac/provider-gcp/nomad/main.tf +++ b/iac/provider-gcp/nomad/main.tf @@ -673,8 +673,15 @@ data "google_storage_bucket_object" "filestore_cleanup" { bucket = var.fc_env_pipeline_bucket_name } +data "google_storage_bucket_object" "rapid_cache_cleanup" { + count = var.rapid_bucket_cache_bucket_name != "" ? 1 : 0 + name = "clean-rapid-cache" + bucket = var.fc_env_pipeline_bucket_name +} + locals { - clean_nfs_cache_artifact_source = "gcs::https://www.googleapis.com/storage/v1/${var.fc_env_pipeline_bucket_name}/clean-nfs-cache?version=${data.google_storage_bucket_object.filestore_cleanup.generation}" + clean_nfs_cache_artifact_source = "gcs::https://www.googleapis.com/storage/v1/${var.fc_env_pipeline_bucket_name}/clean-nfs-cache?version=${data.google_storage_bucket_object.filestore_cleanup.generation}" + clean_rapid_cache_artifact_source = var.rapid_bucket_cache_bucket_name != "" ? "gcs::https://www.googleapis.com/storage/v1/${var.fc_env_pipeline_bucket_name}/clean-rapid-cache?version=${data.google_storage_bucket_object.rapid_cache_cleanup[0].generation}" : "" } resource "nomad_job" "clean_nfs_cache" { @@ -696,3 +703,19 @@ resource "nomad_job" "clean_nfs_cache" { launch_darkly_api_key = trimspace(data.google_secret_manager_secret_version.launch_darkly_api_key.secret_data) }) } + +resource "nomad_job" "clean_rapid_cache" { + count = var.rapid_bucket_cache_bucket_name != "" ? 1 : 0 + + jobspec = templatefile("${path.module}/jobs/clean-rapid-cache.hcl", { + node_pool = var.builder_node_pool + artifact_source = local.clean_rapid_cache_artifact_source + bucket_name = var.rapid_bucket_cache_bucket_name + dry_run = var.rapid_bucket_cache_cleanup_dry_run + max_age = var.rapid_bucket_cache_cleanup_max_age + max_deletions = var.rapid_bucket_cache_cleanup_max_deletions + redis_url = local.redis_url + redis_cluster_url = local.redis_cluster_url + redis_tls_ca_base64 = trimspace(data.google_secret_manager_secret_version.redis_tls_ca_base64.secret_data) + }) +} diff --git a/iac/provider-gcp/nomad/variables.tf b/iac/provider-gcp/nomad/variables.tf index a1924892ec..185582963a 100644 --- a/iac/provider-gcp/nomad/variables.tf +++ b/iac/provider-gcp/nomad/variables.tf @@ -454,6 +454,26 @@ variable "shared_chunk_cache_path" { default = "" } +variable "rapid_bucket_cache_bucket_name" { + type = string + default = "" +} + +variable "rapid_bucket_cache_cleanup_dry_run" { + type = bool + default = true +} + +variable "rapid_bucket_cache_cleanup_max_age" { + type = string + default = "168h" +} + +variable "rapid_bucket_cache_cleanup_max_deletions" { + type = number + default = 10000 +} + variable "filestore_cache_cleanup_disk_usage_target" { type = number description = "The disk usage target for the Filestore cache in percent" diff --git a/iac/provider-gcp/variables.tf b/iac/provider-gcp/variables.tf index 0245a48aa7..d71f255538 100644 --- a/iac/provider-gcp/variables.tf +++ b/iac/provider-gcp/variables.tf @@ -835,6 +835,26 @@ variable "anywhere_cache_ttl" { default = null } +variable "rapid_bucket_cache_bucket_name" { + type = string + default = "" +} + +variable "rapid_bucket_cache_cleanup_dry_run" { + type = bool + default = true +} + +variable "rapid_bucket_cache_cleanup_max_age" { + type = string + default = "168h" +} + +variable "rapid_bucket_cache_cleanup_max_deletions" { + type = number + default = 10000 +} + variable "orchestrator_env_vars" { type = map(string) default = {} diff --git a/packages/orchestrator/Dockerfile b/packages/orchestrator/Dockerfile index 5639a4d7e6..e7bcd90487 100644 --- a/packages/orchestrator/Dockerfile +++ b/packages/orchestrator/Dockerfile @@ -44,4 +44,5 @@ RUN --mount=type=cache,target=/root/.cache/go-build make build-local COMMIT_SHA= FROM scratch COPY --from=builder /build/orchestrator/bin/clean-nfs-cache . +COPY --from=builder /build/orchestrator/bin/clean-rapid-cache . COPY --from=builder /build/orchestrator/bin/orchestrator . diff --git a/packages/orchestrator/Makefile b/packages/orchestrator/Makefile index f3d3a1dbf2..583efbd51c 100644 --- a/packages/orchestrator/Makefile +++ b/packages/orchestrator/Makefile @@ -43,6 +43,7 @@ build-local: $(eval COMMIT_SHA ?= $(shell git rev-parse --short HEAD)) CGO_ENABLED=1 GOOS=linux GOARCH=$(BUILD_ARCH) go build -o bin/orchestrator -ldflags "-X=main.commitSHA=$(COMMIT_SHA)" . CGO_ENABLED=1 GOOS=linux GOARCH=$(BUILD_ARCH) go build -o bin/clean-nfs-cache -ldflags "-X=main.commitSHA=$(COMMIT_SHA)" ./cmd/clean-nfs-cache + CGO_ENABLED=0 GOOS=linux GOARCH=$(BUILD_ARCH) go build -o bin/clean-rapid-cache ./cmd/clean-rapid-cache .PHONY: build-debug build-debug: @@ -96,6 +97,15 @@ else gsutil -h "Cache-Control:no-cache, max-age=0" cp ./bin/clean-nfs-cache "gs://${GCP_BUCKET_PREFIX}fc-env-pipeline/clean-nfs-cache" endif +.PHONY: upload/clean-rapid-cache +upload/clean-rapid-cache: + chmod +x ./bin/clean-rapid-cache +ifeq ($(PROVIDER),aws) + @echo "clean-rapid-cache is GCP-only" +else + gsutil -h "Cache-Control:no-cache, max-age=0" cp ./bin/clean-rapid-cache "gs://${GCP_BUCKET_PREFIX}fc-env-pipeline/clean-rapid-cache" +endif + .PHONY: upload/orchestrator upload/orchestrator: chmod +x ./bin/orchestrator @@ -117,6 +127,9 @@ endif .PHONY: build-and-upload/clean-nfs-cache build-and-upload/clean-nfs-cache: build upload/clean-nfs-cache +.PHONY: build-and-upload/clean-rapid-cache +build-and-upload/clean-rapid-cache: build upload/clean-rapid-cache + .PHONY: build-and-upload/orchestrator build-and-upload/orchestrator: build upload/orchestrator diff --git a/packages/orchestrator/cmd/clean-rapid-cache/main.go b/packages/orchestrator/cmd/clean-rapid-cache/main.go new file mode 100644 index 0000000000..7fe6ca0f89 --- /dev/null +++ b/packages/orchestrator/cmd/clean-rapid-cache/main.go @@ -0,0 +1,176 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "log" + "os" + "time" + + gcs "cloud.google.com/go/storage" + "cloud.google.com/go/storage/experimental" + "google.golang.org/api/iterator" + + "github.com/e2b-dev/infra/packages/shared/pkg/factories" + "github.com/e2b-dev/infra/packages/shared/pkg/storage" +) + +const defaultPrefix = "rapid-cache/" + +func main() { + var ( + prefix string + maxAge time.Duration + maxDeletions int + dryRun bool + ) + + flags := flag.NewFlagSet("clean-rapid-cache", flag.ExitOnError) + flags.StringVar(&prefix, "prefix", defaultPrefix, "cache object prefix") + flags.DurationVar(&maxAge, "max-age", 7*24*time.Hour, "delete objects older than this") + flags.IntVar(&maxDeletions, "max-deletions", 10000, "maximum objects to delete") + flags.BoolVar(&dryRun, "dry-run", true, "dry run") + if err := flags.Parse(os.Args[1:]); err != nil { + log.Fatal(err) + } + + bucket := os.Getenv("RAPID_BUCKET_CACHE_BUCKET_NAME") + if flags.NArg() > 0 { + bucket = flags.Arg(0) + } + if bucket == "" { + log.Fatal("missing bucket") + } + if prefix == "" { + log.Fatal("missing prefix") + } + if maxAge <= 0 { + log.Fatal("max-age must be positive") + } + if maxDeletions <= 0 { + log.Fatal("max-deletions must be positive") + } + + ctx := context.Background() + index, closeIndex := newRapidIndex(ctx, bucket) + err := clean(ctx, bucket, prefix, time.Now().Add(-maxAge), maxDeletions, dryRun, index) + closeIndex() + if err != nil { + log.Fatal(err) + } +} + +func clean(ctx context.Context, bucket string, prefix string, cutoff time.Time, maxDeletions int, dryRun bool, index storage.RapidCacheIndex) error { + client, err := gcs.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs()) + if err != nil { + return fmt.Errorf("create storage client: %w", err) + } + defer func() { + _ = client.Close() + }() + + deleted := cleanFromIndex(ctx, client, bucket, cutoff, maxDeletions, dryRun, index) + + return cleanFromBucket(ctx, client, bucket, prefix, cutoff, maxDeletions-deleted, dryRun, index) +} + +func cleanFromIndex(ctx context.Context, client *gcs.Client, bucket string, cutoff time.Time, maxDeletions int, dryRun bool, index storage.RapidCacheIndex) int { + candidates, err := index.Candidates(ctx, cutoff, int64(maxDeletions)) + if err != nil || len(candidates) == 0 { + return 0 + } + + deleted := 0 + for _, path := range candidates { + lastAccess, ok, err := index.LastAccess(ctx, path) + if err != nil || (ok && lastAccess >= cutoff.Unix()) { + continue + } + obj := client.Bucket(bucket).Object(path) + attrs, err := obj.Attrs(ctx) + if errors.Is(err, gcs.ErrObjectNotExist) { + if !dryRun { + _ = index.Evict(ctx, path, 0) + } + + continue + } + if err != nil { + continue + } + if dryRun { + deleted++ + + continue + } + if err := obj.Delete(ctx); err != nil { + continue + } + _ = index.Evict(ctx, path, attrs.Size) + deleted++ + } + + return deleted +} + +func cleanFromBucket(ctx context.Context, client *gcs.Client, bucket string, prefix string, cutoff time.Time, maxDeletions int, dryRun bool, index storage.RapidCacheIndex) error { + if maxDeletions <= 0 { + return nil + } + + var scanned, matched, deleted int + objects := client.Bucket(bucket).Objects(ctx, &gcs.Query{Prefix: prefix}) + for { + attrs, err := objects.Next() + if errors.Is(err, iterator.Done) { + break + } + if err != nil { + return fmt.Errorf("list cache objects: %w", err) + } + + scanned++ + if !attrs.Updated.Before(cutoff) { + continue + } + lastAccess, ok, err := index.LastAccess(ctx, attrs.Name) + if err == nil && ok && lastAccess >= cutoff.Unix() { + continue + } + matched++ + if deleted >= maxDeletions { + break + } + if dryRun { + deleted++ + + continue + } + if err := client.Bucket(bucket).Object(attrs.Name).Delete(ctx); err != nil { + return fmt.Errorf("delete cache object: %w", err) + } + _ = index.Evict(ctx, attrs.Name, attrs.Size) + deleted++ + } + + log.Printf("summary dry_run=%t scanned=%d matched=%d deleted=%d", dryRun, scanned, matched, deleted) + + return nil +} + +func newRapidIndex(ctx context.Context, bucket string) (storage.RapidCacheIndex, func()) { + redisClient, err := factories.NewRedisClient(ctx, factories.RedisConfig{ + RedisURL: os.Getenv("REDIS_URL"), + RedisClusterURL: os.Getenv("REDIS_CLUSTER_URL"), + RedisTLSCABase64: os.Getenv("REDIS_TLS_CA_BASE64"), + }) + if err != nil { + return storage.NoopRapidCacheIndex(), func() {} + } + + return storage.NewRedisRapidCacheIndex(redisClient, bucket), func() { + _ = factories.CloseCleanly(redisClient) + } +} diff --git a/packages/orchestrator/pkg/factories/run.go b/packages/orchestrator/pkg/factories/run.go index 0a963000df..1fa81f8b1c 100644 --- a/packages/orchestrator/pkg/factories/run.go +++ b/packages/orchestrator/pkg/factories/run.go @@ -405,6 +405,7 @@ func run(config cfg.Config, opts Options) (success bool) { if err != nil { logger.L().Fatal(ctx, "failed to create template cache", zap.Error(err)) } + templateCache.SetRapidCacheIndex(storage.NewRedisRapidCacheIndex(redisClient, storage.RapidBucketCacheStorageConfig.GetBucketName())) templateCache.Start(ctx) closers = append(closers, closer{"template cache", func(context.Context) error { templateCache.Stop() diff --git a/packages/orchestrator/pkg/sandbox/template/cache.go b/packages/orchestrator/pkg/sandbox/template/cache.go index 2296542fb1..3d8fef2656 100644 --- a/packages/orchestrator/pkg/sandbox/template/cache.go +++ b/packages/orchestrator/pkg/sandbox/template/cache.go @@ -53,6 +53,9 @@ type Cache struct { flags *featureflags.Client cache *ttlcache.Cache[string, Template] persistence storage.StorageProvider + rapidCache storage.StorageProvider + rapidIndex storage.RapidCacheIndex + rapidCacheMu sync.Mutex buildStore *build.DiffStore blockMetrics blockmetrics.Metrics rootCachePath string @@ -106,6 +109,7 @@ func NewCache( blockMetrics: metrics, config: config, persistence: persistence, + rapidIndex: storage.NoopRapidCacheIndex(), buildStore: buildStore, cache: cache, flags: flags, @@ -114,6 +118,12 @@ func NewCache( }, nil } +func (c *Cache) SetRapidCacheIndex(index storage.RapidCacheIndex) { + if index != nil { + c.rapidIndex = index + } +} + func (c *Cache) Start(ctx context.Context) { c.buildStore.Start(ctx) @@ -169,9 +179,10 @@ func (c *Cache) GetTemplate( defer span.End() persistence := c.persistence - // Because of the template caching, if we enable the NFS cache feature flag, - // it will start working only for new orchestrators or new builds. - if path, enabled := c.useNFSCache(ctx, isBuilding, isSnapshot); enabled { + if cache, enabled := c.useRapidBucketCache(ctx, isBuilding); enabled { + persistence = storage.WrapInRapidBucketCache(ctx, cache, persistence, c.rapidIndex) + span.SetAttributes(attribute.Bool("use_cache", true)) + } else if path, enabled := c.useNFSCache(ctx, isBuilding, isSnapshot); enabled { logger.L().Info(ctx, "using local template cache", zap.String("path", c.rootCachePath)) persistence = storage.WrapInNFSCache(ctx, path, persistence, c.flags) span.SetAttributes(attribute.Bool("use_cache", true)) @@ -305,6 +316,32 @@ func (c *Cache) useNFSCache(ctx context.Context, isBuilding bool, isSnapshot boo return c.rootCachePath, useNFSCache } +func (c *Cache) useRapidBucketCache(ctx context.Context, isBuilding bool) (storage.StorageProvider, bool) { + if isBuilding || !c.flags.BoolFlag(ctx, featureflags.RapidBucketCacheFlag) { + return nil, false + } + c.rapidCacheMu.Lock() + defer c.rapidCacheMu.Unlock() + if c.rapidCache != nil { + return c.rapidCache, true + } + bucketName := storage.RapidBucketCacheStorageConfig.GetBucketName() + if bucketName == "" { + logger.L().Warn(ctx, "rapid bucket cache feature flag is enabled but bucket is not set") + + return nil, false + } + p, err := storage.NewGCPRapid(ctx, bucketName, nil) + if err != nil { + logger.L().Warn(ctx, "failed to initialize rapid bucket cache", zap.Error(err)) + + return nil, false + } + c.rapidCache = p + + return c.rapidCache, true +} + func cleanDir(path string) error { entries, err := os.ReadDir(path) if err != nil { diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index 28b6f8e370..d60d3305ec 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -120,6 +120,7 @@ var ( MetricsReadFlag = NewBoolFlag("sandbox-metrics-read", true) SnapshotFeatureFlag = NewBoolFlag("use-nfs-for-snapshots", env.IsDevelopment()) TemplateFeatureFlag = NewBoolFlag("use-nfs-for-templates", env.IsDevelopment()) + RapidBucketCacheFlag = NewBoolFlag("use-rapid-bucket-cache", false) EnableWriteThroughCacheFlag = NewBoolFlag("write-to-cache-on-writes", false) UseNFSCacheForBuildingTemplatesFlag = NewBoolFlag("use-nfs-for-building-templates", env.IsDevelopment()) BestOfKCanFitFlag = NewBoolFlag("best-of-k-can-fit", true) diff --git a/packages/shared/pkg/storage/storage.go b/packages/shared/pkg/storage/storage.go index 668ca49fb4..6a06bca44a 100644 --- a/packages/shared/pkg/storage/storage.go +++ b/packages/shared/pkg/storage/storage.go @@ -243,6 +243,15 @@ var BuildCacheStorageConfig = StorageConfig{ }, } +var RapidBucketCacheStorageConfig = StorageConfig{ + GetLocalBasePath: func() string { + return "" + }, + GetBucketName: func() string { + return env.GetEnv("RAPID_BUCKET_CACHE_BUCKET_NAME", "") + }, +} + func GetStorageProvider(ctx context.Context, cfg StorageConfig) (StorageProvider, error) { provider := GetProviderType() diff --git a/packages/shared/pkg/storage/storage_cache_rapid.go b/packages/shared/pkg/storage/storage_cache_rapid.go new file mode 100644 index 0000000000..f28338aa0f --- /dev/null +++ b/packages/shared/pkg/storage/storage_cache_rapid.go @@ -0,0 +1,294 @@ +package storage + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "time" +) + +type rapidCacheProvider struct { + cache StorageProvider + inner StorageProvider + index RapidCacheIndex +} + +func WrapInRapidBucketCache(_ context.Context, cache StorageProvider, inner StorageProvider, index RapidCacheIndex) StorageProvider { + if index == nil { + index = NoopRapidCacheIndex() + } + + return &rapidCacheProvider{ + cache: cache, + inner: inner, + index: index, + } +} + +func (p *rapidCacheProvider) DeleteObjectsWithPrefix(ctx context.Context, prefix string) error { + cacheErr := p.cache.DeleteObjectsWithPrefix(ctx, "rapid-cache/"+prefix) + innerErr := p.inner.DeleteObjectsWithPrefix(ctx, prefix) + + return errors.Join(cacheErr, innerErr) +} + +func (p *rapidCacheProvider) UploadSignedURL(ctx context.Context, path string, ttl time.Duration) (string, error) { + return p.inner.UploadSignedURL(ctx, path, ttl) +} + +func (p *rapidCacheProvider) OpenBlob(ctx context.Context, path string, objectType ObjectType) (Blob, error) { + return p.inner.OpenBlob(ctx, path, objectType) +} + +func (p *rapidCacheProvider) OpenSeekable(ctx context.Context, path string, objectType SeekableObjectType) (Seekable, error) { + inner, err := p.inner.OpenSeekable(ctx, path, objectType) + if err != nil { + return nil, err + } + + return &rapidCachedSeekable{ + path: "rapid-cache/" + path, + cache: p.cache, + inner: inner, + index: p.index, + }, nil +} + +func (p *rapidCacheProvider) GetDetails() string { + return fmt.Sprintf("[Rapid bucket cache, inner=%s]", p.inner.GetDetails()) +} + +type rapidCachedSeekable struct { + path string + cache StorageProvider + inner Seekable + index RapidCacheIndex +} + +func (c *rapidCachedSeekable) OpenRangeReader(ctx context.Context, off int64, length int64, frameTable *FrameTable) (io.ReadCloser, error) { + if frameTable != nil && frameTable.IsCompressed() { + return c.openCompressed(ctx, off, frameTable) + } + if err := validateRapidReadParams(length, off); err != nil { + return nil, err + } + + cachePath := fmt.Sprintf("%s/%012d-%d.bin", c.path, off/MemoryChunkSize, length) + if rc, err := c.openCache(ctx, cachePath, length); err == nil { + c.touch(ctx, cachePath) + + return rc, nil + } + + rc, err := c.inner.OpenRangeReader(ctx, off, length, nil) + if err != nil { + return nil, err + } + + if !skipCacheWriteback(ctx) { + return newRapidCacheWriteThroughReader(rc, c, ctx, cachePath, length), nil + } + + return rc, nil +} + +func (c *rapidCachedSeekable) openCompressed(ctx context.Context, off int64, frameTable *FrameTable) (io.ReadCloser, error) { + r, err := frameTable.LocateCompressed(off) + if err != nil { + return nil, fmt.Errorf("frame lookup for offset %d: %w", off, err) + } + + cachePath := makeFrameFilename(c.path, r) + if raw, err := c.openCache(ctx, cachePath, int64(r.Length)); err == nil { + c.touch(ctx, cachePath) + dec, err := newDecompressingReadCloser(raw, frameTable.CompressionType()) + if err != nil { + raw.Close() + + return nil, err + } + + return dec, nil + } + + raw, err := c.inner.OpenRangeReader(ctx, r.Offset, int64(r.Length), nil) + if err != nil { + return nil, err + } + + dec, err := newRapidDecompressingCacheReader(raw, frameTable.CompressionType(), c, ctx, cachePath, r.Length) + if err != nil { + raw.Close() + + return nil, err + } + + return dec, nil +} + +func (c *rapidCachedSeekable) Size(ctx context.Context) (int64, error) { + return c.inner.Size(ctx) +} + +func (c *rapidCachedSeekable) StoreFile(ctx context.Context, path string, opts ...PutOption) (*FrameTable, [32]byte, error) { + return c.inner.StoreFile(ctx, path, opts...) +} + +func validateRapidReadParams(length, off int64) error { + if length == 0 { + return ErrBufferTooSmall + } + if length > MemoryChunkSize { + return ErrBufferTooLarge + } + if off%MemoryChunkSize != 0 { + return ErrOffsetUnaligned + } + + return nil +} + +func (c *rapidCachedSeekable) openCache(ctx context.Context, path string, length int64) (io.ReadCloser, error) { + obj, err := c.cache.OpenSeekable(ctx, path, UnknownSeekableObjectType) + if err != nil { + return nil, err + } + size, err := obj.Size(ctx) + if err != nil { + return nil, err + } + if size != length { + return nil, fmt.Errorf("rapid cache object %s size %d != expected %d", path, size, length) + } + + return obj.OpenRangeReader(ctx, 0, length, nil) +} + +func (c *rapidCachedSeekable) writeCache(ctx context.Context, path string, data []byte) error { + blob, err := c.cache.OpenBlob(ctx, path, UnknownObjectType) + if err != nil { + return err + } + + if err := blob.Put(ctx, data); err != nil { + return err + } + + _ = c.index.Admit(ctx, path, int64(len(data))) + + return nil +} + +func (c *rapidCachedSeekable) goCtx(ctx context.Context, fn func(context.Context)) { + go func() { + fn(context.WithoutCancel(ctx)) + }() +} + +func (c *rapidCachedSeekable) touch(ctx context.Context, path string) { + c.goCtx(ctx, func(ctx context.Context) { + _ = c.index.Touch(ctx, path) + }) +} + +type rapidCacheWriteThroughReader struct { + inner io.ReadCloser + buf *bytes.Buffer + cache *rapidCachedSeekable + ctx context.Context //nolint:containedctx // needed for async cache write-back in Close + path string + expectedLen int64 +} + +func newRapidCacheWriteThroughReader(inner io.ReadCloser, cache *rapidCachedSeekable, ctx context.Context, path string, expectedLen int64) io.ReadCloser { + return &rapidCacheWriteThroughReader{ + inner: inner, + buf: bytes.NewBuffer(make([]byte, 0, expectedLen)), + cache: cache, + ctx: ctx, + path: path, + expectedLen: expectedLen, + } +} + +func (r *rapidCacheWriteThroughReader) Read(p []byte) (int, error) { + n, err := r.inner.Read(p) + if n > 0 { + r.buf.Write(p[:n]) + } + + return n, err +} + +func (r *rapidCacheWriteThroughReader) Close() error { + closeErr := r.inner.Close() + if isCompleteRead(r.buf.Len(), int(r.expectedLen), nil) { + data := make([]byte, r.buf.Len()) + copy(data, r.buf.Bytes()) + r.cache.goCtx(r.ctx, func(ctx context.Context) { + _ = r.cache.writeCache(ctx, r.path, data) + }) + } + + return closeErr +} + +type rapidDecompressingCacheReader struct { + decompressor io.ReadCloser + raw io.ReadCloser + compressedBuf *bytes.Buffer + cache *rapidCachedSeekable + ctx context.Context //nolint:containedctx // needed for async cache write-back in Close + path string + expectedSize int +} + +func newRapidDecompressingCacheReader(raw io.ReadCloser, ct CompressionType, cache *rapidCachedSeekable, ctx context.Context, path string, expectedSize int) (io.ReadCloser, error) { + var compressedBuf bytes.Buffer + compressedBuf.Grow(expectedSize) + + dec, err := NewDecompressingReader(io.TeeReader(raw, &compressedBuf), ct) + if err != nil { + return nil, err + } + + return &rapidDecompressingCacheReader{ + decompressor: dec, + raw: raw, + compressedBuf: &compressedBuf, + cache: cache, + ctx: ctx, + path: path, + expectedSize: expectedSize, + }, nil +} + +func (r *rapidDecompressingCacheReader) Read(p []byte) (int, error) { + return r.decompressor.Read(p) +} + +func (r *rapidDecompressingCacheReader) Close() error { + _, _ = io.Copy(io.Discard, r.decompressor) + + decErr := r.decompressor.Close() + rawErr := r.raw.Close() + if decErr != nil { + return decErr + } + if rawErr != nil { + return rawErr + } + if skipCacheWriteback(r.ctx) || !isCompleteRead(r.compressedBuf.Len(), r.expectedSize, nil) { + return nil + } + + data := make([]byte, r.compressedBuf.Len()) + copy(data, r.compressedBuf.Bytes()) + r.cache.goCtx(r.ctx, func(ctx context.Context) { + _ = r.cache.writeCache(ctx, r.path, data) + }) + + return nil +} diff --git a/packages/shared/pkg/storage/storage_cache_rapid_index.go b/packages/shared/pkg/storage/storage_cache_rapid_index.go new file mode 100644 index 0000000000..108ff85c90 --- /dev/null +++ b/packages/shared/pkg/storage/storage_cache_rapid_index.go @@ -0,0 +1,147 @@ +package storage + +import ( + "context" + "errors" + "strconv" + "strings" + "time" + + "github.com/redis/go-redis/v9" +) + +const rapidCachePrefix = "rapid-cache/" + +type RapidCacheIndex interface { + Touch(ctx context.Context, path string) error + Admit(ctx context.Context, path string, size int64) error + Evict(ctx context.Context, path string, size int64) error + Candidates(ctx context.Context, before time.Time, limit int64) ([]string, error) + LastAccess(ctx context.Context, path string) (int64, bool, error) +} + +type noopRapidCacheIndex struct{} + +func NoopRapidCacheIndex() RapidCacheIndex { return noopRapidCacheIndex{} } + +func (noopRapidCacheIndex) Touch(context.Context, string) error { return nil } +func (noopRapidCacheIndex) Admit(context.Context, string, int64) error { return nil } +func (noopRapidCacheIndex) Evict(context.Context, string, int64) error { return nil } +func (noopRapidCacheIndex) Candidates(context.Context, time.Time, int64) ([]string, error) { + return nil, nil +} + +func (noopRapidCacheIndex) LastAccess(context.Context, string) (int64, bool, error) { + return 0, false, nil +} + +type redisRapidCacheIndex struct { + redis redis.UniversalClient + keys rapidCacheIndexKeys +} + +type rapidCacheIndexKeys struct { + chunks string + builds string + buildBytes string + buildChunks string +} + +func NewRedisRapidCacheIndex(client redis.UniversalClient, bucket string) RapidCacheIndex { + if client == nil || bucket == "" { + return NoopRapidCacheIndex() + } + + prefix := "rapid-cache:{" + bucket + "}" + + return &redisRapidCacheIndex{ + redis: client, + keys: rapidCacheIndexKeys{ + chunks: prefix + ":chunks", + builds: prefix + ":builds", + buildBytes: prefix + ":build_bytes", + buildChunks: prefix + ":build_chunks", + }, + } +} + +func (i *redisRapidCacheIndex) Touch(ctx context.Context, path string) error { + buildID := rapidCacheBuildID(path) + if buildID == "" { + return nil + } + + now := float64(time.Now().Unix()) + pipe := i.redis.Pipeline() + pipe.ZAdd(ctx, i.keys.chunks, redis.Z{Score: now, Member: path}) + pipe.ZAdd(ctx, i.keys.builds, redis.Z{Score: now, Member: buildID}) + _, err := pipe.Exec(ctx) + + return err +} + +func (i *redisRapidCacheIndex) Admit(ctx context.Context, path string, size int64) error { + buildID := rapidCacheBuildID(path) + if buildID == "" { + return nil + } + + return rapidCacheAdmitScript.Run(ctx, i.redis, []string{ + i.keys.chunks, + i.keys.builds, + i.keys.buildBytes, + i.keys.buildChunks, + }, time.Now().Unix(), path, buildID, size).Err() +} + +func (i *redisRapidCacheIndex) Evict(ctx context.Context, path string, size int64) error { + buildID := rapidCacheBuildID(path) + pipe := i.redis.Pipeline() + pipe.ZRem(ctx, i.keys.chunks, path) + if buildID != "" && size > 0 { + pipe.HIncrBy(ctx, i.keys.buildBytes, buildID, -size) + pipe.HIncrBy(ctx, i.keys.buildChunks, buildID, -1) + } + _, err := pipe.Exec(ctx) + + return err +} + +func (i *redisRapidCacheIndex) Candidates(ctx context.Context, before time.Time, limit int64) ([]string, error) { + return i.redis.ZRangeByScore(ctx, i.keys.chunks, &redis.ZRangeBy{ + Min: "-inf", + Max: strconv.FormatInt(before.Unix(), 10), + Offset: 0, + Count: limit, + }).Result() +} + +func (i *redisRapidCacheIndex) LastAccess(ctx context.Context, path string) (int64, bool, error) { + score, err := i.redis.ZScore(ctx, i.keys.chunks, path).Result() + if errors.Is(err, redis.Nil) { + return 0, false, nil + } + if err != nil { + return 0, false, err + } + + return int64(score), true, nil +} + +func rapidCacheBuildID(path string) string { + path = strings.TrimPrefix(path, rapidCachePrefix) + buildID, _ := SplitPath(path) + + return buildID +} + +var rapidCacheAdmitScript = redis.NewScript(` +local added = redis.call('ZADD', KEYS[1], 'NX', ARGV[1], ARGV[2]) +redis.call('ZADD', KEYS[1], ARGV[1], ARGV[2]) +redis.call('ZADD', KEYS[2], ARGV[1], ARGV[3]) +if added == 1 then + redis.call('HINCRBY', KEYS[3], ARGV[3], ARGV[4]) + redis.call('HINCRBY', KEYS[4], ARGV[3], 1) +end +return added +`) diff --git a/packages/shared/pkg/storage/storage_google.go b/packages/shared/pkg/storage/storage_google.go index 0556cecb73..c040af0355 100644 --- a/packages/shared/pkg/storage/storage_google.go +++ b/packages/shared/pkg/storage/storage_google.go @@ -17,6 +17,7 @@ import ( "time" "cloud.google.com/go/storage" + "cloud.google.com/go/storage/experimental" "github.com/googleapis/gax-go/v2" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -83,6 +84,7 @@ type gcpStorage struct { bucket *storage.BucketHandle limiter *limit.Limiter + zonal bool } var _ StorageProvider = (*gcpStorage)(nil) @@ -102,6 +104,14 @@ var ( ) func NewGCP(ctx context.Context, bucketName string, limiter *limit.Limiter) (StorageProvider, error) { + return newGCP(ctx, bucketName, limiter, false) +} + +func NewGCPRapid(ctx context.Context, bucketName string, limiter *limit.Limiter) (StorageProvider, error) { + return newGCP(ctx, bucketName, limiter, true) +} + +func newGCP(ctx context.Context, bucketName string, limiter *limit.Limiter, zonal bool) (StorageProvider, error) { grpcPoolSize, err := env.GetEnvAsInt("GCS_GRPC_CONNECTION_POOL_SIZE", defaultGRPCConnectionPoolSize) if err != nil { return nil, fmt.Errorf("failed to parse GCS_GRPC_CONNECTION_POOL_SIZE: %w", err) @@ -113,6 +123,9 @@ func NewGCP(ctx context.Context, bucketName string, limiter *limit.Limiter) (Sto option.WithGRPCDialOption(grpc.WithInitialWindowSize(4 * megabyte)), internaloption.EnableDirectPath(defaultGCSEnableDirectPath), } + if zonal { + opts = append(opts, experimental.WithZonalBucketAPIs()) + } client, err := storage.NewGRPCClient(ctx, opts...) if err != nil { @@ -123,6 +136,7 @@ func NewGCP(ctx context.Context, bucketName string, limiter *limit.Limiter) (Sto client: client, bucket: client.Bucket(bucketName), limiter: limiter, + zonal: zonal, }, nil } @@ -325,6 +339,9 @@ func (o *gcpObject) Put(ctx context.Context, data []byte, opts ...PutOption) err timer := googleWriteTimerFactory.Begin(attribute.String(gcsOperationAttr, gcsOperationAttrWrite)) w := o.handle.NewWriter(ctx) + if o.storage.zonal { + w.FinalizeOnClose = true + } if putOpts := ApplyPutOptions(opts); len(putOpts.Metadata) > 0 { w.Metadata = putOpts.Metadata } diff --git a/tests/integration/go.mod b/tests/integration/go.mod index 290788022d..45c22628a8 100644 --- a/tests/integration/go.mod +++ b/tests/integration/go.mod @@ -79,6 +79,7 @@ require ( github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dchest/uniuri v1.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dnephin/pflag v1.0.7 // indirect github.com/dprotaso/go-yit v0.0.0-20220510233725-9ba8df137936 // indirect github.com/envoyproxy/go-control-plane/envoy v1.37.0 // indirect @@ -143,6 +144,7 @@ require ( github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/pressly/goose/v3 v3.26.0 // indirect + github.com/redis/go-redis/v9 v9.17.3 // indirect github.com/rs/zerolog v1.34.0 // indirect github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 // indirect github.com/segmentio/asm v1.2.0 // indirect diff --git a/tests/integration/go.sum b/tests/integration/go.sum index f9c5bc7e0e..f4031472df 100644 --- a/tests/integration/go.sum +++ b/tests/integration/go.sum @@ -91,6 +91,10 @@ github.com/bitfield/gotestdox v0.2.2/go.mod h1:D+gwtS0urjBrzguAkTM2wodsTQYFHdpx8 github.com/bits-and-blooms/bitset v1.24.4 h1:95H15Og1clikBrKr/DuzMXkQzECs1M6hhoGXLwLQOZE= github.com/bits-and-blooms/bitset v1.24.4/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -107,6 +111,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dchest/uniuri v1.2.0 h1:koIcOUdrTIivZgSLhHQvKgqdWZq5d7KdMEWF1Ud6+5g= github.com/dchest/uniuri v1.2.0/go.mod h1:fSzm4SLHzNZvWLvWJew423PhAzkpNQYq+uNLq4kxhkY= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dlclark/regexp2 v1.11.5 h1:Q/sSnsKerHeCkc/jSTNq1oCm7KiVgUMZRDUoRu0JQZQ= github.com/dlclark/regexp2 v1.11.5/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= github.com/dnephin/pflag v1.0.7 h1:oxONGlWxhmUct0YzKTgrpQv9AUA1wtPBn7zuSjJqptk= @@ -326,6 +332,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pressly/goose/v3 v3.26.0 h1:KJakav68jdH0WDvoAcj8+n61WqOIaPGgH0bJWS6jpmM= github.com/pressly/goose/v3 v3.26.0/go.mod h1:4hC1KrritdCxtuFsqgs1R4AU5bWtTAf+cnWvfhf2DNY= +github.com/redis/go-redis/v9 v9.17.3 h1:fN29NdNrE17KttK5Ndf20buqfDZwGNgoUr9qjl1DQx4= +github.com/redis/go-redis/v9 v9.17.3/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=