From e7a75c0eb9eef8f8659596adac6cf0fa5c2455f4 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Fri, 29 May 2026 18:54:05 -0700 Subject: [PATCH 01/36] feat(api): add cache-aware placement affinity Record recent successful placements and use them as a small, gated best-of-K bias so warm nodes are preferred without bypassing placement filters. --- .../internal/orchestrator/create_instance.go | 10 +- .../api/internal/orchestrator/orchestrator.go | 2 + .../orchestrator/placement/placement.go | 6 +- .../placement/placement_best_of_K.go | 92 ++++++++------ .../placement/placement_best_of_K_test.go | 40 ++++++ .../orchestrator/placement/placement_test.go | 2 +- .../orchestrator/placement_affinity.go | 119 ++++++++++++++++++ packages/shared/pkg/featureflags/flags.go | 1 + 8 files changed, 230 insertions(+), 42 deletions(-) create mode 100644 packages/api/internal/orchestrator/placement_affinity.go diff --git a/packages/api/internal/orchestrator/create_instance.go b/packages/api/internal/orchestrator/create_instance.go index c4f2239ee7..642c6c1574 100644 --- a/packages/api/internal/orchestrator/create_instance.go +++ b/packages/api/internal/orchestrator/create_instance.go @@ -300,8 +300,13 @@ func (o *Orchestrator) CreateSandbox( clusterNodes := o.GetClusterNodes(nodeClusterID) labelFilteringEnabled := o.featureFlagsClient.BoolFlag(ctx, featureflags.SandboxLabelBasedSchedulingFlag, featureflags.TeamContext(team.ID.String()), featureflags.SandboxContext(sandboxID)) + placementCacheAffinityEnabled := o.featureFlagsClient.BoolFlag(ctx, featureflags.SandboxPlacementCacheAffinityFlag, featureflags.TeamContext(team.ID.String()), featureflags.TemplateContext(sbxData.TemplateID), featureflags.SandboxContext(sandboxID)) + var affinityScores map[string]float64 + if placementCacheAffinityEnabled { + affinityScores = o.placementAffinity.scores(ctx, nodeClusterID, sbxData.Build.ID.String(), sbxData.TemplateID, sbxData.BaseTemplateID) + } - node, err = placement.PlaceSandbox(ctx, o.placementAlgorithm, clusterNodes, node, sbxRequest, builds.ToMachineInfo(sbxData.Build), labelFilteringEnabled, team.SandboxSchedulingLabels) + node, err = placement.PlaceSandbox(ctx, o.placementAlgorithm, clusterNodes, node, sbxRequest, builds.ToMachineInfo(sbxData.Build), labelFilteringEnabled, team.SandboxSchedulingLabels, affinityScores) if err != nil { return sandbox.Sandbox{}, &api.APIError{ Code: http.StatusInternalServerError, @@ -317,6 +322,9 @@ func (o *Orchestrator) CreateSandbox( attribute.Bool("node_affinity_success", sbxData.NodeID != nil && node.ID == *sbxData.NodeID), } o.createdSandboxesCounter.Add(ctx, 1, metric.WithAttributes(attributes...)) + if placementCacheAffinityEnabled { + o.placementAffinity.record(ctx, nodeClusterID, node.ID, sbxData.Build.ID.String(), sbxData.TemplateID, sbxData.BaseTemplateID) + } telemetry.SetAttributes(ctx, attribute.String("node.id", node.ID)) telemetry.ReportEvent(ctx, "Created sandbox") diff --git a/packages/api/internal/orchestrator/orchestrator.go b/packages/api/internal/orchestrator/orchestrator.go index 472fb34128..9034d74381 100644 --- a/packages/api/internal/orchestrator/orchestrator.go +++ b/packages/api/internal/orchestrator/orchestrator.go @@ -65,6 +65,7 @@ type Orchestrator struct { snapshotUpsertSem *utils.AdjustableSemaphore redisStorage *redisbackend.Storage + placementAffinity *placementAffinity // connectGroup deduplicates concurrent dial+register attempts for the same // physical node. It is keyed by NomadNodeShortID (Nomad-managed nodes) or @@ -147,6 +148,7 @@ func New( tel: tel, clusters: clusters, redisStorage: redisStorage, + placementAffinity: newPlacementAffinity(redisClient), createdCounter: createdCounter, diff --git a/packages/api/internal/orchestrator/placement/placement.go b/packages/api/internal/orchestrator/placement/placement.go index 54a3c78029..2c5bd19876 100644 --- a/packages/api/internal/orchestrator/placement/placement.go +++ b/packages/api/internal/orchestrator/placement/placement.go @@ -26,10 +26,10 @@ var errSandboxCreateFailed = errors.New("failed to create a new sandbox, if the // Implementations should choose an optimal node based on available resources // and current load distribution. type Algorithm interface { - chooseNode(ctx context.Context, nodes []*nodemanager.Node, nodesExcluded map[string]struct{}, requested nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) (*nodemanager.Node, error) + chooseNode(ctx context.Context, nodes []*nodemanager.Node, nodesExcluded map[string]struct{}, requested nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string, affinityScores ...map[string]float64) (*nodemanager.Node, error) } -func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*nodemanager.Node, preferredNode *nodemanager.Node, sbxRequest *orchestrator.SandboxCreateRequest, buildMachineInfo machineinfo.MachineInfo, labelFilteringEnabled bool, requiredLabels []string) (*nodemanager.Node, error) { +func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*nodemanager.Node, preferredNode *nodemanager.Node, sbxRequest *orchestrator.SandboxCreateRequest, buildMachineInfo machineinfo.MachineInfo, labelFilteringEnabled bool, requiredLabels []string, affinityScores ...map[string]float64) (*nodemanager.Node, error) { ctx, span := tracer.Start(ctx, "place-sandbox") defer span.End() @@ -57,7 +57,7 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node return nil, errors.New("no nodes available") } - node, err = algorithm.chooseNode(ctx, clusterNodes, nodesExcluded, nodemanager.SandboxResources{CPUs: sbxRequest.GetSandbox().GetVcpu(), MiBMemory: sbxRequest.GetSandbox().GetRamMb()}, buildMachineInfo, labelFilteringEnabled, requiredLabels) + node, err = algorithm.chooseNode(ctx, clusterNodes, nodesExcluded, nodemanager.SandboxResources{CPUs: sbxRequest.GetSandbox().GetVcpu(), MiBMemory: sbxRequest.GetSandbox().GetRamMb()}, buildMachineInfo, labelFilteringEnabled, requiredLabels, affinityScores...) if err != nil { return nil, err } diff --git a/packages/api/internal/orchestrator/placement/placement_best_of_K.go b/packages/api/internal/orchestrator/placement/placement_best_of_K.go index d06f2fb926..8fe2e0a6d8 100644 --- a/packages/api/internal/orchestrator/placement/placement_best_of_K.go +++ b/packages/api/internal/orchestrator/placement/placement_best_of_K.go @@ -36,7 +36,7 @@ func DefaultBestOfKConfig() BestOfKConfig { } // Score calculates the placement score for this node -func (b *BestOfK) Score(node *nodemanager.Node, resources nodemanager.SandboxResources, config BestOfKConfig) float64 { +func (b *BestOfK) Score(node *nodemanager.Node, resources nodemanager.SandboxResources, config BestOfKConfig, affinityScores ...map[string]float64) float64 { metrics := node.Metrics() // Get locally recorded resources that haven't been reported yet. @@ -61,7 +61,12 @@ func (b *BestOfK) Score(node *nodemanager.Node, resources nodemanager.SandboxRes cpuRequested := float64(resources.CPUs) - return (cpuRequested + float64(reserved) + config.Alpha*usageAvg) / totalCapacity + score := (cpuRequested + float64(reserved) + config.Alpha*usageAvg) / totalCapacity + if len(affinityScores) > 0 { + score -= affinityScores[0][node.ID] + } + + return score } // CanFit checks if the node can fit a new VM with the given quota @@ -111,19 +116,40 @@ func (b *BestOfK) UpdateConfig(config BestOfKConfig) { } // chooseNode selects the best node for placing a VM with the given quota -func (b *BestOfK) chooseNode(_ context.Context, nodes []*nodemanager.Node, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) (bestNode *nodemanager.Node, err error) { +func (b *BestOfK) chooseNode(_ context.Context, nodes []*nodemanager.Node, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string, affinityScores ...map[string]float64) (bestNode *nodemanager.Node, err error) { // Fix the config, we want to dynamically update it config := b.getConfig() + var affinity map[string]float64 + if len(affinityScores) > 0 { + affinity = affinityScores[0] + } // Filter eligible nodes candidates := b.sample(nodes, config, excludedNodes, resources, buildMachineInfo, filterByLabels, requiredLabels) + if len(affinity) > 0 { + seen := make(map[string]struct{}, len(candidates)) + for _, n := range candidates { + seen[n.ID] = struct{}{} + } + for _, n := range nodes { + if affinity[n.ID] <= 0 { + continue + } + if _, ok := seen[n.ID]; ok { + continue + } + if b.isCandidate(n, config, excludedNodes, resources, buildMachineInfo, filterByLabels, requiredLabels) { + candidates = append(candidates, n) + } + } + } // Find the best node among candidates bestScore := math.MaxFloat64 for _, node := range candidates { // Calculate score - score := b.Score(node, resources, config) + score := b.Score(node, resources, config, affinity) if score < bestScore { bestNode = node @@ -138,6 +164,29 @@ func (b *BestOfK) chooseNode(_ context.Context, nodes []*nodemanager.Node, exclu return bestNode, nil } +func (b *BestOfK) isCandidate(node *nodemanager.Node, config BestOfKConfig, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) bool { + if _, ok := excludedNodes[node.ID]; ok { + return false + } + if node.Status() != api.NodeStatusReady { + return false + } + if !isNodeCPUCompatible(node, buildMachineInfo) { + return false + } + if filterByLabels && !isNodeLabelsCompatible(node, requiredLabels) { + return false + } + if config.CanFit && !b.CanFit(node, resources, config) { + return false + } + if config.TooManyStarting && node.PlacementMetrics.InProgressCount() > maxStartingInstancesPerNode { + return false + } + + return true +} + // sample returns up to k items chosen uniformly from those passing ok. func (b *BestOfK) sample(items []*nodemanager.Node, config BestOfKConfig, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) []*nodemanager.Node { if config.K <= 0 || len(items) == 0 { @@ -163,40 +212,9 @@ func (b *BestOfK) sample(items []*nodemanager.Node, config BestOfKConfig, exclud n := items[pick] - // Excluded filter - if _, ok := excludedNodes[n.ID]; ok { - continue - } - - // If the node is not ready, skip it - if n.Status() != api.NodeStatusReady { - continue - } - - // Skip if node is not CPU compatible - if !isNodeCPUCompatible(n, buildMachineInfo) { - continue - } - - // Skip if node doesn't have the required labels - if filterByLabels && !isNodeLabelsCompatible(n, requiredLabels) { - continue - } - - if config.CanFit { - if !b.CanFit(n, resources, config) { - continue - } - } - - if config.TooManyStarting { - // To prevent overloading the node - if n.PlacementMetrics.InProgressCount() > maxStartingInstancesPerNode { - continue - } + if b.isCandidate(n, config, excludedNodes, resources, buildMachineInfo, filterByLabels, requiredLabels) { + candidates = append(candidates, n) } - - candidates = append(candidates, n) } return candidates diff --git a/packages/api/internal/orchestrator/placement/placement_best_of_K_test.go b/packages/api/internal/orchestrator/placement/placement_best_of_K_test.go index 87aed15901..4dd9d86c53 100644 --- a/packages/api/internal/orchestrator/placement/placement_best_of_K_test.go +++ b/packages/api/internal/orchestrator/placement/placement_best_of_K_test.go @@ -95,6 +95,46 @@ func TestBestOfK_Score_WithPendingResources(t *testing.T) { assert.Greater(t, scorePending, scoreNormal, "Node with pending resources should receive a higher (worse) score") } +func TestBestOfK_ChooseNode_ConsidersAffinityOutsideRandomSample(t *testing.T) { + t.Parallel() + + ctx := t.Context() + config := BestOfKConfig{ + R: 10, + Alpha: 0.5, + K: 1, + } + algo := NewBestOfK(config).(*BestOfK) + + hot := nodemanager.NewTestNode("hot", api.NodeStatusReady, 0, 4) + cold := nodemanager.NewTestNode("cold", api.NodeStatusReady, 0, 4) + nodes := []*nodemanager.Node{cold, hot} + + selected, err := algo.chooseNode(ctx, nodes, nil, nodemanager.SandboxResources{CPUs: 1, MiBMemory: 512}, machineinfo.MachineInfo{}, false, nil, map[string]float64{"hot": 0.1}) + require.NoError(t, err) + require.Equal(t, "hot", selected.ID) +} + +func TestBestOfK_ChooseNode_IgnoresIneligibleAffinity(t *testing.T) { + t.Parallel() + + ctx := t.Context() + config := BestOfKConfig{ + R: 10, + Alpha: 0.5, + K: 1, + } + algo := NewBestOfK(config).(*BestOfK) + + hot := nodemanager.NewTestNode("hot", api.NodeStatusDraining, 0, 4) + cold := nodemanager.NewTestNode("cold", api.NodeStatusReady, 0, 4) + nodes := []*nodemanager.Node{cold, hot} + + selected, err := algo.chooseNode(ctx, nodes, nil, nodemanager.SandboxResources{CPUs: 1, MiBMemory: 512}, machineinfo.MachineInfo{}, false, nil, map[string]float64{"hot": 0.1}) + require.NoError(t, err) + require.Equal(t, "cold", selected.ID) +} + func TestBestOfK_CanFit(t *testing.T) { t.Parallel() config := DefaultBestOfKConfig() diff --git a/packages/api/internal/orchestrator/placement/placement_test.go b/packages/api/internal/orchestrator/placement/placement_test.go index 3d27d43a8d..a89ea8813a 100644 --- a/packages/api/internal/orchestrator/placement/placement_test.go +++ b/packages/api/internal/orchestrator/placement/placement_test.go @@ -25,7 +25,7 @@ type mockAlgorithm struct { mock.Mock } -func (m *mockAlgorithm) chooseNode(ctx context.Context, nodes []*nodemanager.Node, nodesExcluded map[string]struct{}, requested nodemanager.SandboxResources, buildCPUInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) (*nodemanager.Node, error) { +func (m *mockAlgorithm) chooseNode(ctx context.Context, nodes []*nodemanager.Node, nodesExcluded map[string]struct{}, requested nodemanager.SandboxResources, buildCPUInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string, affinityScores ...map[string]float64) (*nodemanager.Node, error) { args := m.Called(ctx, nodes, nodesExcluded, requested, buildCPUInfo, filterByLabels, requiredLabels) if args.Get(0) == nil { return nil, args.Error(1) diff --git a/packages/api/internal/orchestrator/placement_affinity.go b/packages/api/internal/orchestrator/placement_affinity.go new file mode 100644 index 0000000000..964c6633ad --- /dev/null +++ b/packages/api/internal/orchestrator/placement_affinity.go @@ -0,0 +1,119 @@ +package orchestrator + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/google/uuid" + "github.com/redis/go-redis/v9" + "go.uber.org/zap" + + "github.com/e2b-dev/infra/packages/shared/pkg/logger" +) + +const ( + placementAffinityTTL = 4 * time.Hour + placementAffinityTop = 20 + placementAffinityReadTimeout = 100 * time.Millisecond + placementAffinityWriteTimeout = time.Second + + buildAffinityWeight = 0.001 + templateAffinityWeight = 0.0005 + baseTemplateAffinityWeight = 0.00025 + maxAffinityScore = 10 + maxTotalAffinityScoreBonus = 0.02 +) + +type placementAffinity struct { + redis redis.UniversalClient +} + +func newPlacementAffinity(redisClient redis.UniversalClient) *placementAffinity { + if redisClient == nil { + return nil + } + + return &placementAffinity{redis: redisClient} +} + +func placementAffinityKey(clusterID uuid.UUID, kind, id string) string { + return fmt.Sprintf("placement-affinity:%s:%s:%s", clusterID, kind, id) +} + +func (a *placementAffinity) record(ctx context.Context, clusterID uuid.UUID, nodeID, buildID, templateID, baseTemplateID string) { + if a == nil || nodeID == "" { + return + } + ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), placementAffinityWriteTimeout) + defer cancel() + + for _, item := range []struct { + kind string + id string + }{ + {kind: "build", id: buildID}, + {kind: "template", id: templateID}, + {kind: "base-template", id: baseTemplateID}, + } { + if item.id == "" { + continue + } + + key := placementAffinityKey(clusterID, item.kind, item.id) + pipe := a.redis.Pipeline() + pipe.ZIncrBy(ctx, key, 1, nodeID) + pipe.ZRemRangeByRank(ctx, key, 0, -placementAffinityTop-1) + pipe.Expire(ctx, key, placementAffinityTTL) + _, err := pipe.Exec(ctx) + if err != nil { + logger.L().Debug(ctx, "failed to record placement affinity", zap.String("key", key), zap.Error(err)) + } + } +} + +func (a *placementAffinity) scores(ctx context.Context, clusterID uuid.UUID, buildID, templateID, baseTemplateID string) map[string]float64 { + if a == nil { + return nil + } + ctx, cancel := context.WithTimeout(ctx, placementAffinityReadTimeout) + defer cancel() + + scores := make(map[string]float64) + for _, item := range []struct { + kind string + id string + weight float64 + }{ + {kind: "build", id: buildID, weight: buildAffinityWeight}, + {kind: "template", id: templateID, weight: templateAffinityWeight}, + {kind: "base-template", id: baseTemplateID, weight: baseTemplateAffinityWeight}, + } { + if item.id == "" { + continue + } + + key := placementAffinityKey(clusterID, item.kind, item.id) + rows, err := a.redis.ZRevRangeWithScores(ctx, key, 0, placementAffinityTop-1).Result() + if err != nil { + logger.L().Debug(ctx, "failed to read placement affinity", zap.String("key", key), zap.Error(err)) + continue + } + + for _, row := range rows { + nodeID, ok := row.Member.(string) + if !ok || nodeID == "" { + continue + } + scores[nodeID] += math.Min(row.Score, maxAffinityScore) * item.weight + scores[nodeID] = math.Min(scores[nodeID], maxTotalAffinityScoreBonus) + } + } + + if len(scores) == 0 { + return nil + } + + return scores +} diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index 068308e811..d0c2c09773 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -157,6 +157,7 @@ var ( PersistentVolumesFlag = NewBoolFlag("can-use-persistent-volumes", env.IsDevelopment()) ExecutionMetricsOnWebhooksFlag = NewBoolFlag("execution-metrics-on-webhooks", false) // TODO: Remove NLT 20250315 SandboxLabelBasedSchedulingFlag = NewBoolFlag("sandbox-label-based-scheduling", false) + SandboxPlacementCacheAffinityFlag = NewBoolFlag("sandbox-placement-cache-affinity", false) OptimisticResourceAccountingFlag = NewBoolFlag("sandbox-placement-optimistic-resource-accounting", false) FreePageReportingFlag = NewBoolFlag("free-page-reporting", false) FreezeUserCgroupFlag = NewBoolFlag("freeze-user-cgroup", env.IsDevelopment()) From 57f7c2776b6810500c0d18fa433240ca413bb754 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Fri, 29 May 2026 18:59:42 -0700 Subject: [PATCH 02/36] fix(api): batch placement affinity redis calls Use a single pipeline for affinity reads and writes to keep the placement path bounded when the flag is enabled. --- .../orchestrator/placement_affinity.go | 46 +++++++++++++++---- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/packages/api/internal/orchestrator/placement_affinity.go b/packages/api/internal/orchestrator/placement_affinity.go index 964c6633ad..9422b6778c 100644 --- a/packages/api/internal/orchestrator/placement_affinity.go +++ b/packages/api/internal/orchestrator/placement_affinity.go @@ -49,6 +49,8 @@ func (a *placementAffinity) record(ctx context.Context, clusterID uuid.UUID, nod ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), placementAffinityWriteTimeout) defer cancel() + pipe := a.redis.Pipeline() + hasCommands := false for _, item := range []struct { kind string id string @@ -62,14 +64,18 @@ func (a *placementAffinity) record(ctx context.Context, clusterID uuid.UUID, nod } key := placementAffinityKey(clusterID, item.kind, item.id) - pipe := a.redis.Pipeline() pipe.ZIncrBy(ctx, key, 1, nodeID) pipe.ZRemRangeByRank(ctx, key, 0, -placementAffinityTop-1) pipe.Expire(ctx, key, placementAffinityTTL) - _, err := pipe.Exec(ctx) - if err != nil { - logger.L().Debug(ctx, "failed to record placement affinity", zap.String("key", key), zap.Error(err)) - } + hasCommands = true + } + if !hasCommands { + return + } + + _, err := pipe.Exec(ctx) + if err != nil { + logger.L().Debug(ctx, "failed to record placement affinity", zap.Error(err)) } } @@ -80,7 +86,12 @@ func (a *placementAffinity) scores(ctx context.Context, clusterID uuid.UUID, bui ctx, cancel := context.WithTimeout(ctx, placementAffinityReadTimeout) defer cancel() - scores := make(map[string]float64) + pipe := a.redis.Pipeline() + type affinityCmd struct { + cmd *redis.ZSliceCmd + weight float64 + } + cmds := make([]affinityCmd, 0, 3) for _, item := range []struct { kind string id string @@ -95,9 +106,26 @@ func (a *placementAffinity) scores(ctx context.Context, clusterID uuid.UUID, bui } key := placementAffinityKey(clusterID, item.kind, item.id) - rows, err := a.redis.ZRevRangeWithScores(ctx, key, 0, placementAffinityTop-1).Result() + cmds = append(cmds, affinityCmd{ + cmd: pipe.ZRevRangeWithScores(ctx, key, 0, placementAffinityTop-1), + weight: item.weight, + }) + } + if len(cmds) == 0 { + return nil + } + + _, err := pipe.Exec(ctx) + if err != nil && err != redis.Nil { + logger.L().Debug(ctx, "failed to read placement affinity", zap.Error(err)) + + return nil + } + + scores := make(map[string]float64) + for _, cmd := range cmds { + rows, err := cmd.cmd.Result() if err != nil { - logger.L().Debug(ctx, "failed to read placement affinity", zap.String("key", key), zap.Error(err)) continue } @@ -106,7 +134,7 @@ func (a *placementAffinity) scores(ctx context.Context, clusterID uuid.UUID, bui if !ok || nodeID == "" { continue } - scores[nodeID] += math.Min(row.Score, maxAffinityScore) * item.weight + scores[nodeID] += math.Min(row.Score, maxAffinityScore) * cmd.weight scores[nodeID] = math.Min(scores[nodeID], maxTotalAffinityScoreBonus) } } From 001c7a9ca3b47a9ac23675be83dc99415a5de34b Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Fri, 29 May 2026 19:04:02 -0700 Subject: [PATCH 03/36] fix(api): resolve placement affinity lint Use error-aware Redis checks and apply the repository formatter to the new feature flag. --- .../orchestrator/placement/placement_test.go | 2 +- .../api/internal/orchestrator/placement_affinity.go | 3 ++- packages/shared/pkg/featureflags/flags.go | 12 ++++++------ 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/packages/api/internal/orchestrator/placement/placement_test.go b/packages/api/internal/orchestrator/placement/placement_test.go index a89ea8813a..d91d49ccd4 100644 --- a/packages/api/internal/orchestrator/placement/placement_test.go +++ b/packages/api/internal/orchestrator/placement/placement_test.go @@ -25,7 +25,7 @@ type mockAlgorithm struct { mock.Mock } -func (m *mockAlgorithm) chooseNode(ctx context.Context, nodes []*nodemanager.Node, nodesExcluded map[string]struct{}, requested nodemanager.SandboxResources, buildCPUInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string, affinityScores ...map[string]float64) (*nodemanager.Node, error) { +func (m *mockAlgorithm) chooseNode(ctx context.Context, nodes []*nodemanager.Node, nodesExcluded map[string]struct{}, requested nodemanager.SandboxResources, buildCPUInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string, _ ...map[string]float64) (*nodemanager.Node, error) { args := m.Called(ctx, nodes, nodesExcluded, requested, buildCPUInfo, filterByLabels, requiredLabels) if args.Get(0) == nil { return nil, args.Error(1) diff --git a/packages/api/internal/orchestrator/placement_affinity.go b/packages/api/internal/orchestrator/placement_affinity.go index 9422b6778c..2b147bdc69 100644 --- a/packages/api/internal/orchestrator/placement_affinity.go +++ b/packages/api/internal/orchestrator/placement_affinity.go @@ -2,6 +2,7 @@ package orchestrator import ( "context" + "errors" "fmt" "math" "time" @@ -116,7 +117,7 @@ func (a *placementAffinity) scores(ctx context.Context, clusterID uuid.UUID, bui } _, err := pipe.Exec(ctx) - if err != nil && err != redis.Nil { + if err != nil && !errors.Is(err, redis.Nil) { logger.L().Debug(ctx, "failed to read placement affinity", zap.Error(err)) return nil diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index d0c2c09773..0c99dc42ac 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -154,13 +154,13 @@ var ( // of synchronous. Only safe to enable after PeerToPeerChunkTransferFlag is ON. PeerToPeerAsyncCheckpointFlag = NewBoolFlag("peer-to-peer-async-checkpoint", false) - PersistentVolumesFlag = NewBoolFlag("can-use-persistent-volumes", env.IsDevelopment()) - ExecutionMetricsOnWebhooksFlag = NewBoolFlag("execution-metrics-on-webhooks", false) // TODO: Remove NLT 20250315 - SandboxLabelBasedSchedulingFlag = NewBoolFlag("sandbox-label-based-scheduling", false) + PersistentVolumesFlag = NewBoolFlag("can-use-persistent-volumes", env.IsDevelopment()) + ExecutionMetricsOnWebhooksFlag = NewBoolFlag("execution-metrics-on-webhooks", false) // TODO: Remove NLT 20250315 + SandboxLabelBasedSchedulingFlag = NewBoolFlag("sandbox-label-based-scheduling", false) SandboxPlacementCacheAffinityFlag = NewBoolFlag("sandbox-placement-cache-affinity", false) - OptimisticResourceAccountingFlag = NewBoolFlag("sandbox-placement-optimistic-resource-accounting", false) - FreePageReportingFlag = NewBoolFlag("free-page-reporting", false) - FreezeUserCgroupFlag = NewBoolFlag("freeze-user-cgroup", env.IsDevelopment()) + OptimisticResourceAccountingFlag = NewBoolFlag("sandbox-placement-optimistic-resource-accounting", false) + FreePageReportingFlag = NewBoolFlag("free-page-reporting", false) + FreezeUserCgroupFlag = NewBoolFlag("freeze-user-cgroup", env.IsDevelopment()) NetworkTransformRulesFlag = NewBoolFlag("network-transform-rules", env.IsDevelopment()) From 3eb348424bf722d05c1eb7da07883278947970f7 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 13:02:20 -0700 Subject: [PATCH 04/36] fix(api): make placement affinity tunable Move cache affinity rollout knobs into the existing flag so Redis TTL, scoring, limits, and timeouts can be adjusted without a deploy. --- .../internal/orchestrator/create_instance.go | 10 +- .../orchestrator/placement_affinity.go | 102 ++++++++++++++---- packages/shared/pkg/featureflags/flags.go | 26 +++-- 3 files changed, 104 insertions(+), 34 deletions(-) diff --git a/packages/api/internal/orchestrator/create_instance.go b/packages/api/internal/orchestrator/create_instance.go index 642c6c1574..d25d3d4b44 100644 --- a/packages/api/internal/orchestrator/create_instance.go +++ b/packages/api/internal/orchestrator/create_instance.go @@ -300,10 +300,10 @@ func (o *Orchestrator) CreateSandbox( clusterNodes := o.GetClusterNodes(nodeClusterID) labelFilteringEnabled := o.featureFlagsClient.BoolFlag(ctx, featureflags.SandboxLabelBasedSchedulingFlag, featureflags.TeamContext(team.ID.String()), featureflags.SandboxContext(sandboxID)) - placementCacheAffinityEnabled := o.featureFlagsClient.BoolFlag(ctx, featureflags.SandboxPlacementCacheAffinityFlag, featureflags.TeamContext(team.ID.String()), featureflags.TemplateContext(sbxData.TemplateID), featureflags.SandboxContext(sandboxID)) + placementCacheAffinityConfig := placementAffinityConfigFromFlags(ctx, o.featureFlagsClient, featureflags.TeamContext(team.ID.String()), featureflags.TemplateContext(sbxData.TemplateID), featureflags.SandboxContext(sandboxID)) var affinityScores map[string]float64 - if placementCacheAffinityEnabled { - affinityScores = o.placementAffinity.scores(ctx, nodeClusterID, sbxData.Build.ID.String(), sbxData.TemplateID, sbxData.BaseTemplateID) + if placementCacheAffinityConfig.enabled { + affinityScores = o.placementAffinity.scores(ctx, placementCacheAffinityConfig, nodeClusterID, sbxData.Build.ID.String(), sbxData.TemplateID, sbxData.BaseTemplateID) } node, err = placement.PlaceSandbox(ctx, o.placementAlgorithm, clusterNodes, node, sbxRequest, builds.ToMachineInfo(sbxData.Build), labelFilteringEnabled, team.SandboxSchedulingLabels, affinityScores) @@ -322,8 +322,8 @@ func (o *Orchestrator) CreateSandbox( attribute.Bool("node_affinity_success", sbxData.NodeID != nil && node.ID == *sbxData.NodeID), } o.createdSandboxesCounter.Add(ctx, 1, metric.WithAttributes(attributes...)) - if placementCacheAffinityEnabled { - o.placementAffinity.record(ctx, nodeClusterID, node.ID, sbxData.Build.ID.String(), sbxData.TemplateID, sbxData.BaseTemplateID) + if placementCacheAffinityConfig.enabled { + o.placementAffinity.record(ctx, placementCacheAffinityConfig, nodeClusterID, node.ID, sbxData.Build.ID.String(), sbxData.TemplateID, sbxData.BaseTemplateID) } telemetry.SetAttributes(ctx, attribute.String("node.id", node.ID)) diff --git a/packages/api/internal/orchestrator/placement_affinity.go b/packages/api/internal/orchestrator/placement_affinity.go index 2b147bdc69..4af08c6e64 100644 --- a/packages/api/internal/orchestrator/placement_affinity.go +++ b/packages/api/internal/orchestrator/placement_affinity.go @@ -8,29 +8,46 @@ import ( "time" "github.com/google/uuid" + "github.com/launchdarkly/go-sdk-common/v3/ldcontext" + "github.com/launchdarkly/go-sdk-common/v3/ldvalue" "github.com/redis/go-redis/v9" "go.uber.org/zap" + "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" "github.com/e2b-dev/infra/packages/shared/pkg/logger" ) const ( - placementAffinityTTL = 4 * time.Hour - placementAffinityTop = 20 - placementAffinityReadTimeout = 100 * time.Millisecond - placementAffinityWriteTimeout = time.Second - - buildAffinityWeight = 0.001 - templateAffinityWeight = 0.0005 - baseTemplateAffinityWeight = 0.00025 - maxAffinityScore = 10 - maxTotalAffinityScoreBonus = 0.02 + placementAffinityMinTimeoutMs = 10 + placementAffinityMaxTimeoutMs = 2000 + defaultPlacementAffinityTTLSeconds = 14400 + defaultPlacementAffinityTopNodes = 20 + defaultPlacementAffinityReadTimeoutMs = 100 + defaultPlacementAffinityWriteTimeoutMs = 1000 + defaultPlacementAffinityMaxScore = 10 + defaultPlacementAffinityMaxScoreBonusPpm = 20000 + defaultPlacementAffinityBuildWeightPpm = 1000 + defaultPlacementAffinityTemplateWeightPpm = 500 + defaultPlacementAffinityBaseTemplateWeightPpm = 250 ) type placementAffinity struct { redis redis.UniversalClient } +type placementAffinityConfig struct { + enabled bool + ttl time.Duration + topNodes int64 + readTimeout time.Duration + writeTimeout time.Duration + maxAffinityScore float64 + maxScoreBonus float64 + buildWeight float64 + templateWeight float64 + baseTemplateWeight float64 +} + func newPlacementAffinity(redisClient redis.UniversalClient) *placementAffinity { if redisClient == nil { return nil @@ -39,15 +56,56 @@ func newPlacementAffinity(redisClient redis.UniversalClient) *placementAffinity return &placementAffinity{redis: redisClient} } +func placementAffinityConfigFromFlags(ctx context.Context, ff *featureflags.Client, contexts ...ldcontext.Context) placementAffinityConfig { + v := ff.JSONFlag(ctx, featureflags.SandboxPlacementCacheAffinityFlag, contexts...) + + return placementAffinityConfig{ + enabled: v.GetByKey("enabled").BoolValue(), + ttl: time.Duration(jsonInt(v, "ttlSeconds", defaultPlacementAffinityTTLSeconds, 60, 86400)) * time.Second, + topNodes: int64(jsonInt(v, "topNodes", defaultPlacementAffinityTopNodes, 1, 100)), + readTimeout: time.Duration(jsonInt(v, "readTimeoutMs", defaultPlacementAffinityReadTimeoutMs, placementAffinityMinTimeoutMs, placementAffinityMaxTimeoutMs)) * time.Millisecond, + writeTimeout: time.Duration(jsonInt(v, "writeTimeoutMs", defaultPlacementAffinityWriteTimeoutMs, placementAffinityMinTimeoutMs, placementAffinityMaxTimeoutMs)) * time.Millisecond, + maxAffinityScore: float64(jsonInt(v, "maxAffinityScore", defaultPlacementAffinityMaxScore, 1, 1000)), + maxScoreBonus: jsonPPM(v, "maxScoreBonusPpm", defaultPlacementAffinityMaxScoreBonusPpm), + buildWeight: jsonPPM(v, "buildWeightPpm", defaultPlacementAffinityBuildWeightPpm), + templateWeight: jsonPPM(v, "templateWeightPpm", defaultPlacementAffinityTemplateWeightPpm), + baseTemplateWeight: jsonPPM(v, "baseTemplateWeightPpm", defaultPlacementAffinityBaseTemplateWeightPpm), + } +} + +func jsonInt(v ldvalue.Value, key string, fallback, minValue, maxValue int) int { + value := v.GetByKey(key) + if value.IsNull() { + return fallback + } + + return clampInt(value.IntValue(), minValue, maxValue) +} + +func jsonPPM(v ldvalue.Value, key string, fallback int) float64 { + return float64(jsonInt(v, key, fallback, 0, 100000)) / 1000000 +} + +func clampInt(v, minValue, maxValue int) int { + if v < minValue { + return minValue + } + if v > maxValue { + return maxValue + } + + return v +} + func placementAffinityKey(clusterID uuid.UUID, kind, id string) string { return fmt.Sprintf("placement-affinity:%s:%s:%s", clusterID, kind, id) } -func (a *placementAffinity) record(ctx context.Context, clusterID uuid.UUID, nodeID, buildID, templateID, baseTemplateID string) { +func (a *placementAffinity) record(ctx context.Context, cfg placementAffinityConfig, clusterID uuid.UUID, nodeID, buildID, templateID, baseTemplateID string) { if a == nil || nodeID == "" { return } - ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), placementAffinityWriteTimeout) + ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), cfg.writeTimeout) defer cancel() pipe := a.redis.Pipeline() @@ -66,8 +124,8 @@ func (a *placementAffinity) record(ctx context.Context, clusterID uuid.UUID, nod key := placementAffinityKey(clusterID, item.kind, item.id) pipe.ZIncrBy(ctx, key, 1, nodeID) - pipe.ZRemRangeByRank(ctx, key, 0, -placementAffinityTop-1) - pipe.Expire(ctx, key, placementAffinityTTL) + pipe.ZRemRangeByRank(ctx, key, 0, -cfg.topNodes-1) + pipe.Expire(ctx, key, cfg.ttl) hasCommands = true } if !hasCommands { @@ -80,11 +138,11 @@ func (a *placementAffinity) record(ctx context.Context, clusterID uuid.UUID, nod } } -func (a *placementAffinity) scores(ctx context.Context, clusterID uuid.UUID, buildID, templateID, baseTemplateID string) map[string]float64 { +func (a *placementAffinity) scores(ctx context.Context, cfg placementAffinityConfig, clusterID uuid.UUID, buildID, templateID, baseTemplateID string) map[string]float64 { if a == nil { return nil } - ctx, cancel := context.WithTimeout(ctx, placementAffinityReadTimeout) + ctx, cancel := context.WithTimeout(ctx, cfg.readTimeout) defer cancel() pipe := a.redis.Pipeline() @@ -98,9 +156,9 @@ func (a *placementAffinity) scores(ctx context.Context, clusterID uuid.UUID, bui id string weight float64 }{ - {kind: "build", id: buildID, weight: buildAffinityWeight}, - {kind: "template", id: templateID, weight: templateAffinityWeight}, - {kind: "base-template", id: baseTemplateID, weight: baseTemplateAffinityWeight}, + {kind: "build", id: buildID, weight: cfg.buildWeight}, + {kind: "template", id: templateID, weight: cfg.templateWeight}, + {kind: "base-template", id: baseTemplateID, weight: cfg.baseTemplateWeight}, } { if item.id == "" { continue @@ -108,7 +166,7 @@ func (a *placementAffinity) scores(ctx context.Context, clusterID uuid.UUID, bui key := placementAffinityKey(clusterID, item.kind, item.id) cmds = append(cmds, affinityCmd{ - cmd: pipe.ZRevRangeWithScores(ctx, key, 0, placementAffinityTop-1), + cmd: pipe.ZRevRangeWithScores(ctx, key, 0, cfg.topNodes-1), weight: item.weight, }) } @@ -135,8 +193,8 @@ func (a *placementAffinity) scores(ctx context.Context, clusterID uuid.UUID, bui if !ok || nodeID == "" { continue } - scores[nodeID] += math.Min(row.Score, maxAffinityScore) * cmd.weight - scores[nodeID] = math.Min(scores[nodeID], maxTotalAffinityScoreBonus) + scores[nodeID] += math.Min(row.Score, cfg.maxAffinityScore) * cmd.weight + scores[nodeID] = math.Min(scores[nodeID], cfg.maxScoreBonus) } } diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index 0c99dc42ac..dff707b71d 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -154,13 +154,25 @@ var ( // of synchronous. Only safe to enable after PeerToPeerChunkTransferFlag is ON. PeerToPeerAsyncCheckpointFlag = NewBoolFlag("peer-to-peer-async-checkpoint", false) - PersistentVolumesFlag = NewBoolFlag("can-use-persistent-volumes", env.IsDevelopment()) - ExecutionMetricsOnWebhooksFlag = NewBoolFlag("execution-metrics-on-webhooks", false) // TODO: Remove NLT 20250315 - SandboxLabelBasedSchedulingFlag = NewBoolFlag("sandbox-label-based-scheduling", false) - SandboxPlacementCacheAffinityFlag = NewBoolFlag("sandbox-placement-cache-affinity", false) - OptimisticResourceAccountingFlag = NewBoolFlag("sandbox-placement-optimistic-resource-accounting", false) - FreePageReportingFlag = NewBoolFlag("free-page-reporting", false) - FreezeUserCgroupFlag = NewBoolFlag("freeze-user-cgroup", env.IsDevelopment()) + PersistentVolumesFlag = NewBoolFlag("can-use-persistent-volumes", env.IsDevelopment()) + ExecutionMetricsOnWebhooksFlag = NewBoolFlag("execution-metrics-on-webhooks", false) // TODO: Remove NLT 20250315 + SandboxLabelBasedSchedulingFlag = NewBoolFlag("sandbox-label-based-scheduling", false) + OptimisticResourceAccountingFlag = NewBoolFlag("sandbox-placement-optimistic-resource-accounting", false) + FreePageReportingFlag = NewBoolFlag("free-page-reporting", false) + FreezeUserCgroupFlag = NewBoolFlag("freeze-user-cgroup", env.IsDevelopment()) + + SandboxPlacementCacheAffinityFlag = NewJSONFlag("sandbox-placement-cache-affinity", ldvalue.FromJSONMarshal(map[string]any{ + "enabled": false, + "ttlSeconds": 14400, + "topNodes": 20, + "readTimeoutMs": 100, + "writeTimeoutMs": 1000, + "maxAffinityScore": 10, + "maxScoreBonusPpm": 20000, + "buildWeightPpm": 1000, + "templateWeightPpm": 500, + "baseTemplateWeightPpm": 250, + })) NetworkTransformRulesFlag = NewBoolFlag("network-transform-rules", env.IsDevelopment()) From 497d6cf6a1a06e9fef1bd0595ef23061a2aea1f6 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 13:06:36 -0700 Subject: [PATCH 05/36] fix(api): avoid high-cardinality resume affinity keys Keep resume origin-node affinity as the primary signal and skip per-snapshot build keys so the Redis placement cache stays bounded. --- .../internal/orchestrator/create_instance.go | 8 ++++++-- .../internal/orchestrator/placement_affinity.go | 17 +++++++++-------- packages/shared/pkg/featureflags/flags.go | 2 +- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/packages/api/internal/orchestrator/create_instance.go b/packages/api/internal/orchestrator/create_instance.go index d25d3d4b44..e0151b2adf 100644 --- a/packages/api/internal/orchestrator/create_instance.go +++ b/packages/api/internal/orchestrator/create_instance.go @@ -300,10 +300,14 @@ func (o *Orchestrator) CreateSandbox( clusterNodes := o.GetClusterNodes(nodeClusterID) labelFilteringEnabled := o.featureFlagsClient.BoolFlag(ctx, featureflags.SandboxLabelBasedSchedulingFlag, featureflags.TeamContext(team.ID.String()), featureflags.SandboxContext(sandboxID)) + affinityBuildID := sbxData.Build.ID.String() + if isResume { + affinityBuildID = "" + } placementCacheAffinityConfig := placementAffinityConfigFromFlags(ctx, o.featureFlagsClient, featureflags.TeamContext(team.ID.String()), featureflags.TemplateContext(sbxData.TemplateID), featureflags.SandboxContext(sandboxID)) var affinityScores map[string]float64 if placementCacheAffinityConfig.enabled { - affinityScores = o.placementAffinity.scores(ctx, placementCacheAffinityConfig, nodeClusterID, sbxData.Build.ID.String(), sbxData.TemplateID, sbxData.BaseTemplateID) + affinityScores = o.placementAffinity.scores(ctx, placementCacheAffinityConfig, nodeClusterID, affinityBuildID, sbxData.TemplateID, sbxData.BaseTemplateID) } node, err = placement.PlaceSandbox(ctx, o.placementAlgorithm, clusterNodes, node, sbxRequest, builds.ToMachineInfo(sbxData.Build), labelFilteringEnabled, team.SandboxSchedulingLabels, affinityScores) @@ -323,7 +327,7 @@ func (o *Orchestrator) CreateSandbox( } o.createdSandboxesCounter.Add(ctx, 1, metric.WithAttributes(attributes...)) if placementCacheAffinityConfig.enabled { - o.placementAffinity.record(ctx, placementCacheAffinityConfig, nodeClusterID, node.ID, sbxData.Build.ID.String(), sbxData.TemplateID, sbxData.BaseTemplateID) + o.placementAffinity.record(ctx, placementCacheAffinityConfig, nodeClusterID, node.ID, affinityBuildID, sbxData.TemplateID, sbxData.BaseTemplateID) } telemetry.SetAttributes(ctx, attribute.String("node.id", node.ID)) diff --git a/packages/api/internal/orchestrator/placement_affinity.go b/packages/api/internal/orchestrator/placement_affinity.go index 4af08c6e64..9245c786cf 100644 --- a/packages/api/internal/orchestrator/placement_affinity.go +++ b/packages/api/internal/orchestrator/placement_affinity.go @@ -20,7 +20,7 @@ import ( const ( placementAffinityMinTimeoutMs = 10 placementAffinityMaxTimeoutMs = 2000 - defaultPlacementAffinityTTLSeconds = 14400 + defaultPlacementAffinityTTLSeconds = 90000 defaultPlacementAffinityTopNodes = 20 defaultPlacementAffinityReadTimeoutMs = 100 defaultPlacementAffinityWriteTimeoutMs = 1000 @@ -61,7 +61,7 @@ func placementAffinityConfigFromFlags(ctx context.Context, ff *featureflags.Clie return placementAffinityConfig{ enabled: v.GetByKey("enabled").BoolValue(), - ttl: time.Duration(jsonInt(v, "ttlSeconds", defaultPlacementAffinityTTLSeconds, 60, 86400)) * time.Second, + ttl: time.Duration(jsonInt(v, "ttlSeconds", defaultPlacementAffinityTTLSeconds, 60, 90000)) * time.Second, topNodes: int64(jsonInt(v, "topNodes", defaultPlacementAffinityTopNodes, 1, 100)), readTimeout: time.Duration(jsonInt(v, "readTimeoutMs", defaultPlacementAffinityReadTimeoutMs, placementAffinityMinTimeoutMs, placementAffinityMaxTimeoutMs)) * time.Millisecond, writeTimeout: time.Duration(jsonInt(v, "writeTimeoutMs", defaultPlacementAffinityWriteTimeoutMs, placementAffinityMinTimeoutMs, placementAffinityMaxTimeoutMs)) * time.Millisecond, @@ -111,14 +111,15 @@ func (a *placementAffinity) record(ctx context.Context, cfg placementAffinityCon pipe := a.redis.Pipeline() hasCommands := false for _, item := range []struct { - kind string - id string + kind string + id string + weight float64 }{ - {kind: "build", id: buildID}, - {kind: "template", id: templateID}, - {kind: "base-template", id: baseTemplateID}, + {kind: "build", id: buildID, weight: cfg.buildWeight}, + {kind: "template", id: templateID, weight: cfg.templateWeight}, + {kind: "base-template", id: baseTemplateID, weight: cfg.baseTemplateWeight}, } { - if item.id == "" { + if item.id == "" || item.weight == 0 { continue } diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index dff707b71d..0fc266f6d8 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -163,7 +163,7 @@ var ( SandboxPlacementCacheAffinityFlag = NewJSONFlag("sandbox-placement-cache-affinity", ldvalue.FromJSONMarshal(map[string]any{ "enabled": false, - "ttlSeconds": 14400, + "ttlSeconds": 90000, "topNodes": 20, "readTimeoutMs": 100, "writeTimeoutMs": 1000, From c8066f5074a385b6c7e7428ca88146bb6ee3001e Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 13:09:03 -0700 Subject: [PATCH 06/36] fix(api): skip unused placement affinity reads Avoid Redis affinity lookups when resume already has a ready origin node and skip disabled affinity dimensions. --- packages/api/internal/orchestrator/create_instance.go | 2 +- packages/api/internal/orchestrator/placement_affinity.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/api/internal/orchestrator/create_instance.go b/packages/api/internal/orchestrator/create_instance.go index e0151b2adf..6c5c7ad67a 100644 --- a/packages/api/internal/orchestrator/create_instance.go +++ b/packages/api/internal/orchestrator/create_instance.go @@ -306,7 +306,7 @@ func (o *Orchestrator) CreateSandbox( } placementCacheAffinityConfig := placementAffinityConfigFromFlags(ctx, o.featureFlagsClient, featureflags.TeamContext(team.ID.String()), featureflags.TemplateContext(sbxData.TemplateID), featureflags.SandboxContext(sandboxID)) var affinityScores map[string]float64 - if placementCacheAffinityConfig.enabled { + if placementCacheAffinityConfig.enabled && node == nil { affinityScores = o.placementAffinity.scores(ctx, placementCacheAffinityConfig, nodeClusterID, affinityBuildID, sbxData.TemplateID, sbxData.BaseTemplateID) } diff --git a/packages/api/internal/orchestrator/placement_affinity.go b/packages/api/internal/orchestrator/placement_affinity.go index 9245c786cf..fd96e37851 100644 --- a/packages/api/internal/orchestrator/placement_affinity.go +++ b/packages/api/internal/orchestrator/placement_affinity.go @@ -161,7 +161,7 @@ func (a *placementAffinity) scores(ctx context.Context, cfg placementAffinityCon {kind: "template", id: templateID, weight: cfg.templateWeight}, {kind: "base-template", id: baseTemplateID, weight: cfg.baseTemplateWeight}, } { - if item.id == "" { + if item.id == "" || item.weight == 0 { continue } From 428d6abcf238298028afa07216cd99a85bcb913f Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 13:11:01 -0700 Subject: [PATCH 07/36] fix(api): scope placement affinity by team template Use team-scoped template and base-template affinity as a low-cardinality fallback for resumes when origin-node placement is unavailable. --- .../internal/orchestrator/create_instance.go | 4 ++-- .../orchestrator/placement_affinity.go | 20 +++++++++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/packages/api/internal/orchestrator/create_instance.go b/packages/api/internal/orchestrator/create_instance.go index 6c5c7ad67a..f4be26c8f1 100644 --- a/packages/api/internal/orchestrator/create_instance.go +++ b/packages/api/internal/orchestrator/create_instance.go @@ -307,7 +307,7 @@ func (o *Orchestrator) CreateSandbox( placementCacheAffinityConfig := placementAffinityConfigFromFlags(ctx, o.featureFlagsClient, featureflags.TeamContext(team.ID.String()), featureflags.TemplateContext(sbxData.TemplateID), featureflags.SandboxContext(sandboxID)) var affinityScores map[string]float64 if placementCacheAffinityConfig.enabled && node == nil { - affinityScores = o.placementAffinity.scores(ctx, placementCacheAffinityConfig, nodeClusterID, affinityBuildID, sbxData.TemplateID, sbxData.BaseTemplateID) + affinityScores = o.placementAffinity.scores(ctx, placementCacheAffinityConfig, nodeClusterID, team.ID.String(), affinityBuildID, sbxData.TemplateID, sbxData.BaseTemplateID) } node, err = placement.PlaceSandbox(ctx, o.placementAlgorithm, clusterNodes, node, sbxRequest, builds.ToMachineInfo(sbxData.Build), labelFilteringEnabled, team.SandboxSchedulingLabels, affinityScores) @@ -327,7 +327,7 @@ func (o *Orchestrator) CreateSandbox( } o.createdSandboxesCounter.Add(ctx, 1, metric.WithAttributes(attributes...)) if placementCacheAffinityConfig.enabled { - o.placementAffinity.record(ctx, placementCacheAffinityConfig, nodeClusterID, node.ID, affinityBuildID, sbxData.TemplateID, sbxData.BaseTemplateID) + o.placementAffinity.record(ctx, placementCacheAffinityConfig, nodeClusterID, team.ID.String(), node.ID, affinityBuildID, sbxData.TemplateID, sbxData.BaseTemplateID) } telemetry.SetAttributes(ctx, attribute.String("node.id", node.ID)) diff --git a/packages/api/internal/orchestrator/placement_affinity.go b/packages/api/internal/orchestrator/placement_affinity.go index fd96e37851..7216e41e8f 100644 --- a/packages/api/internal/orchestrator/placement_affinity.go +++ b/packages/api/internal/orchestrator/placement_affinity.go @@ -101,7 +101,15 @@ func placementAffinityKey(clusterID uuid.UUID, kind, id string) string { return fmt.Sprintf("placement-affinity:%s:%s:%s", clusterID, kind, id) } -func (a *placementAffinity) record(ctx context.Context, cfg placementAffinityConfig, clusterID uuid.UUID, nodeID, buildID, templateID, baseTemplateID string) { +func placementAffinityID(teamID, id string) string { + if id == "" || teamID == "" { + return id + } + + return teamID + ":" + id +} + +func (a *placementAffinity) record(ctx context.Context, cfg placementAffinityConfig, clusterID uuid.UUID, teamID, nodeID, buildID, templateID, baseTemplateID string) { if a == nil || nodeID == "" { return } @@ -116,8 +124,8 @@ func (a *placementAffinity) record(ctx context.Context, cfg placementAffinityCon weight float64 }{ {kind: "build", id: buildID, weight: cfg.buildWeight}, - {kind: "template", id: templateID, weight: cfg.templateWeight}, - {kind: "base-template", id: baseTemplateID, weight: cfg.baseTemplateWeight}, + {kind: "template", id: placementAffinityID(teamID, templateID), weight: cfg.templateWeight}, + {kind: "base-template", id: placementAffinityID(teamID, baseTemplateID), weight: cfg.baseTemplateWeight}, } { if item.id == "" || item.weight == 0 { continue @@ -139,7 +147,7 @@ func (a *placementAffinity) record(ctx context.Context, cfg placementAffinityCon } } -func (a *placementAffinity) scores(ctx context.Context, cfg placementAffinityConfig, clusterID uuid.UUID, buildID, templateID, baseTemplateID string) map[string]float64 { +func (a *placementAffinity) scores(ctx context.Context, cfg placementAffinityConfig, clusterID uuid.UUID, teamID, buildID, templateID, baseTemplateID string) map[string]float64 { if a == nil { return nil } @@ -158,8 +166,8 @@ func (a *placementAffinity) scores(ctx context.Context, cfg placementAffinityCon weight float64 }{ {kind: "build", id: buildID, weight: cfg.buildWeight}, - {kind: "template", id: templateID, weight: cfg.templateWeight}, - {kind: "base-template", id: baseTemplateID, weight: cfg.baseTemplateWeight}, + {kind: "template", id: placementAffinityID(teamID, templateID), weight: cfg.templateWeight}, + {kind: "base-template", id: placementAffinityID(teamID, baseTemplateID), weight: cfg.baseTemplateWeight}, } { if item.id == "" || item.weight == 0 { continue From f20296d7bf29f57dd510272940a1633027674c52 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 13:23:39 -0700 Subject: [PATCH 08/36] fix(api): add low-weight team placement affinity Track team-level placement history as the weakest cache-affinity fallback alongside template and base-template signals. --- packages/api/internal/orchestrator/placement_affinity.go | 5 +++++ packages/shared/pkg/featureflags/flags.go | 1 + 2 files changed, 6 insertions(+) diff --git a/packages/api/internal/orchestrator/placement_affinity.go b/packages/api/internal/orchestrator/placement_affinity.go index 7216e41e8f..4a62f656e8 100644 --- a/packages/api/internal/orchestrator/placement_affinity.go +++ b/packages/api/internal/orchestrator/placement_affinity.go @@ -27,6 +27,7 @@ const ( defaultPlacementAffinityMaxScore = 10 defaultPlacementAffinityMaxScoreBonusPpm = 20000 defaultPlacementAffinityBuildWeightPpm = 1000 + defaultPlacementAffinityTeamWeightPpm = 100 defaultPlacementAffinityTemplateWeightPpm = 500 defaultPlacementAffinityBaseTemplateWeightPpm = 250 ) @@ -44,6 +45,7 @@ type placementAffinityConfig struct { maxAffinityScore float64 maxScoreBonus float64 buildWeight float64 + teamWeight float64 templateWeight float64 baseTemplateWeight float64 } @@ -68,6 +70,7 @@ func placementAffinityConfigFromFlags(ctx context.Context, ff *featureflags.Clie maxAffinityScore: float64(jsonInt(v, "maxAffinityScore", defaultPlacementAffinityMaxScore, 1, 1000)), maxScoreBonus: jsonPPM(v, "maxScoreBonusPpm", defaultPlacementAffinityMaxScoreBonusPpm), buildWeight: jsonPPM(v, "buildWeightPpm", defaultPlacementAffinityBuildWeightPpm), + teamWeight: jsonPPM(v, "teamWeightPpm", defaultPlacementAffinityTeamWeightPpm), templateWeight: jsonPPM(v, "templateWeightPpm", defaultPlacementAffinityTemplateWeightPpm), baseTemplateWeight: jsonPPM(v, "baseTemplateWeightPpm", defaultPlacementAffinityBaseTemplateWeightPpm), } @@ -124,6 +127,7 @@ func (a *placementAffinity) record(ctx context.Context, cfg placementAffinityCon weight float64 }{ {kind: "build", id: buildID, weight: cfg.buildWeight}, + {kind: "team", id: teamID, weight: cfg.teamWeight}, {kind: "template", id: placementAffinityID(teamID, templateID), weight: cfg.templateWeight}, {kind: "base-template", id: placementAffinityID(teamID, baseTemplateID), weight: cfg.baseTemplateWeight}, } { @@ -166,6 +170,7 @@ func (a *placementAffinity) scores(ctx context.Context, cfg placementAffinityCon weight float64 }{ {kind: "build", id: buildID, weight: cfg.buildWeight}, + {kind: "team", id: teamID, weight: cfg.teamWeight}, {kind: "template", id: placementAffinityID(teamID, templateID), weight: cfg.templateWeight}, {kind: "base-template", id: placementAffinityID(teamID, baseTemplateID), weight: cfg.baseTemplateWeight}, } { diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index 0fc266f6d8..8b068f8ce2 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -170,6 +170,7 @@ var ( "maxAffinityScore": 10, "maxScoreBonusPpm": 20000, "buildWeightPpm": 1000, + "teamWeightPpm": 100, "templateWeightPpm": 500, "baseTemplateWeightPpm": 250, })) From 17db41809dce919f6b790efaaf74d0e04879f471 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 13:42:45 -0700 Subject: [PATCH 09/36] fix(api): default placement affinity to build keys Keep template/team dimensions available for experiments but disable them by default until base build metadata is available. --- packages/api/internal/orchestrator/placement_affinity.go | 6 +++--- packages/shared/pkg/featureflags/flags.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/api/internal/orchestrator/placement_affinity.go b/packages/api/internal/orchestrator/placement_affinity.go index 4a62f656e8..d3dab2e63a 100644 --- a/packages/api/internal/orchestrator/placement_affinity.go +++ b/packages/api/internal/orchestrator/placement_affinity.go @@ -27,9 +27,9 @@ const ( defaultPlacementAffinityMaxScore = 10 defaultPlacementAffinityMaxScoreBonusPpm = 20000 defaultPlacementAffinityBuildWeightPpm = 1000 - defaultPlacementAffinityTeamWeightPpm = 100 - defaultPlacementAffinityTemplateWeightPpm = 500 - defaultPlacementAffinityBaseTemplateWeightPpm = 250 + defaultPlacementAffinityTeamWeightPpm = 0 + defaultPlacementAffinityTemplateWeightPpm = 0 + defaultPlacementAffinityBaseTemplateWeightPpm = 0 ) type placementAffinity struct { diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index 8b068f8ce2..c04b8dbe83 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -170,9 +170,9 @@ var ( "maxAffinityScore": 10, "maxScoreBonusPpm": 20000, "buildWeightPpm": 1000, - "teamWeightPpm": 100, - "templateWeightPpm": 500, - "baseTemplateWeightPpm": 250, + "teamWeightPpm": 0, + "templateWeightPpm": 0, + "baseTemplateWeightPpm": 0, })) NetworkTransformRulesFlag = NewBoolFlag("network-transform-rules", env.IsDevelopment()) From 922bd06a051cff162a8911af08e2764fd12dff5e Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 13:45:20 -0700 Subject: [PATCH 10/36] fix(api): keep placement affinity build-only Remove proxy affinity dimensions so the first rollout only uses concrete build cache signals. --- .../internal/orchestrator/create_instance.go | 4 +- .../orchestrator/placement_affinity.go | 154 +++++------------- packages/shared/pkg/featureflags/flags.go | 19 +-- 3 files changed, 53 insertions(+), 124 deletions(-) diff --git a/packages/api/internal/orchestrator/create_instance.go b/packages/api/internal/orchestrator/create_instance.go index f4be26c8f1..554d10dde9 100644 --- a/packages/api/internal/orchestrator/create_instance.go +++ b/packages/api/internal/orchestrator/create_instance.go @@ -307,7 +307,7 @@ func (o *Orchestrator) CreateSandbox( placementCacheAffinityConfig := placementAffinityConfigFromFlags(ctx, o.featureFlagsClient, featureflags.TeamContext(team.ID.String()), featureflags.TemplateContext(sbxData.TemplateID), featureflags.SandboxContext(sandboxID)) var affinityScores map[string]float64 if placementCacheAffinityConfig.enabled && node == nil { - affinityScores = o.placementAffinity.scores(ctx, placementCacheAffinityConfig, nodeClusterID, team.ID.String(), affinityBuildID, sbxData.TemplateID, sbxData.BaseTemplateID) + affinityScores = o.placementAffinity.scores(ctx, placementCacheAffinityConfig, nodeClusterID, affinityBuildID) } node, err = placement.PlaceSandbox(ctx, o.placementAlgorithm, clusterNodes, node, sbxRequest, builds.ToMachineInfo(sbxData.Build), labelFilteringEnabled, team.SandboxSchedulingLabels, affinityScores) @@ -327,7 +327,7 @@ func (o *Orchestrator) CreateSandbox( } o.createdSandboxesCounter.Add(ctx, 1, metric.WithAttributes(attributes...)) if placementCacheAffinityConfig.enabled { - o.placementAffinity.record(ctx, placementCacheAffinityConfig, nodeClusterID, team.ID.String(), node.ID, affinityBuildID, sbxData.TemplateID, sbxData.BaseTemplateID) + o.placementAffinity.record(ctx, placementCacheAffinityConfig, nodeClusterID, node.ID, affinityBuildID) } telemetry.SetAttributes(ctx, attribute.String("node.id", node.ID)) diff --git a/packages/api/internal/orchestrator/placement_affinity.go b/packages/api/internal/orchestrator/placement_affinity.go index d3dab2e63a..e895d51ca8 100644 --- a/packages/api/internal/orchestrator/placement_affinity.go +++ b/packages/api/internal/orchestrator/placement_affinity.go @@ -18,18 +18,15 @@ import ( ) const ( - placementAffinityMinTimeoutMs = 10 - placementAffinityMaxTimeoutMs = 2000 - defaultPlacementAffinityTTLSeconds = 90000 - defaultPlacementAffinityTopNodes = 20 - defaultPlacementAffinityReadTimeoutMs = 100 - defaultPlacementAffinityWriteTimeoutMs = 1000 - defaultPlacementAffinityMaxScore = 10 - defaultPlacementAffinityMaxScoreBonusPpm = 20000 - defaultPlacementAffinityBuildWeightPpm = 1000 - defaultPlacementAffinityTeamWeightPpm = 0 - defaultPlacementAffinityTemplateWeightPpm = 0 - defaultPlacementAffinityBaseTemplateWeightPpm = 0 + placementAffinityMinTimeoutMs = 10 + placementAffinityMaxTimeoutMs = 2000 + defaultPlacementAffinityTTLSeconds = 90000 + defaultPlacementAffinityTopNodes = 20 + defaultPlacementAffinityReadTimeoutMs = 100 + defaultPlacementAffinityWriteTimeoutMs = 1000 + defaultPlacementAffinityMaxScore = 10 + defaultPlacementAffinityMaxScoreBonusPpm = 20000 + defaultPlacementAffinityBuildWeightPpm = 1000 ) type placementAffinity struct { @@ -37,17 +34,14 @@ type placementAffinity struct { } type placementAffinityConfig struct { - enabled bool - ttl time.Duration - topNodes int64 - readTimeout time.Duration - writeTimeout time.Duration - maxAffinityScore float64 - maxScoreBonus float64 - buildWeight float64 - teamWeight float64 - templateWeight float64 - baseTemplateWeight float64 + enabled bool + ttl time.Duration + topNodes int64 + readTimeout time.Duration + writeTimeout time.Duration + maxAffinityScore float64 + maxScoreBonus float64 + buildWeight float64 } func newPlacementAffinity(redisClient redis.UniversalClient) *placementAffinity { @@ -62,17 +56,14 @@ func placementAffinityConfigFromFlags(ctx context.Context, ff *featureflags.Clie v := ff.JSONFlag(ctx, featureflags.SandboxPlacementCacheAffinityFlag, contexts...) return placementAffinityConfig{ - enabled: v.GetByKey("enabled").BoolValue(), - ttl: time.Duration(jsonInt(v, "ttlSeconds", defaultPlacementAffinityTTLSeconds, 60, 90000)) * time.Second, - topNodes: int64(jsonInt(v, "topNodes", defaultPlacementAffinityTopNodes, 1, 100)), - readTimeout: time.Duration(jsonInt(v, "readTimeoutMs", defaultPlacementAffinityReadTimeoutMs, placementAffinityMinTimeoutMs, placementAffinityMaxTimeoutMs)) * time.Millisecond, - writeTimeout: time.Duration(jsonInt(v, "writeTimeoutMs", defaultPlacementAffinityWriteTimeoutMs, placementAffinityMinTimeoutMs, placementAffinityMaxTimeoutMs)) * time.Millisecond, - maxAffinityScore: float64(jsonInt(v, "maxAffinityScore", defaultPlacementAffinityMaxScore, 1, 1000)), - maxScoreBonus: jsonPPM(v, "maxScoreBonusPpm", defaultPlacementAffinityMaxScoreBonusPpm), - buildWeight: jsonPPM(v, "buildWeightPpm", defaultPlacementAffinityBuildWeightPpm), - teamWeight: jsonPPM(v, "teamWeightPpm", defaultPlacementAffinityTeamWeightPpm), - templateWeight: jsonPPM(v, "templateWeightPpm", defaultPlacementAffinityTemplateWeightPpm), - baseTemplateWeight: jsonPPM(v, "baseTemplateWeightPpm", defaultPlacementAffinityBaseTemplateWeightPpm), + enabled: v.GetByKey("enabled").BoolValue(), + ttl: time.Duration(jsonInt(v, "ttlSeconds", defaultPlacementAffinityTTLSeconds, 60, 90000)) * time.Second, + topNodes: int64(jsonInt(v, "topNodes", defaultPlacementAffinityTopNodes, 1, 100)), + readTimeout: time.Duration(jsonInt(v, "readTimeoutMs", defaultPlacementAffinityReadTimeoutMs, placementAffinityMinTimeoutMs, placementAffinityMaxTimeoutMs)) * time.Millisecond, + writeTimeout: time.Duration(jsonInt(v, "writeTimeoutMs", defaultPlacementAffinityWriteTimeoutMs, placementAffinityMinTimeoutMs, placementAffinityMaxTimeoutMs)) * time.Millisecond, + maxAffinityScore: float64(jsonInt(v, "maxAffinityScore", defaultPlacementAffinityMaxScore, 1, 1000)), + maxScoreBonus: jsonPPM(v, "maxScoreBonusPpm", defaultPlacementAffinityMaxScoreBonusPpm), + buildWeight: jsonPPM(v, "buildWeightPpm", defaultPlacementAffinityBuildWeightPpm), } } @@ -104,46 +95,18 @@ func placementAffinityKey(clusterID uuid.UUID, kind, id string) string { return fmt.Sprintf("placement-affinity:%s:%s:%s", clusterID, kind, id) } -func placementAffinityID(teamID, id string) string { - if id == "" || teamID == "" { - return id - } - - return teamID + ":" + id -} - -func (a *placementAffinity) record(ctx context.Context, cfg placementAffinityConfig, clusterID uuid.UUID, teamID, nodeID, buildID, templateID, baseTemplateID string) { - if a == nil || nodeID == "" { +func (a *placementAffinity) record(ctx context.Context, cfg placementAffinityConfig, clusterID uuid.UUID, nodeID, buildID string) { + if a == nil || nodeID == "" || buildID == "" || cfg.buildWeight == 0 { return } ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), cfg.writeTimeout) defer cancel() pipe := a.redis.Pipeline() - hasCommands := false - for _, item := range []struct { - kind string - id string - weight float64 - }{ - {kind: "build", id: buildID, weight: cfg.buildWeight}, - {kind: "team", id: teamID, weight: cfg.teamWeight}, - {kind: "template", id: placementAffinityID(teamID, templateID), weight: cfg.templateWeight}, - {kind: "base-template", id: placementAffinityID(teamID, baseTemplateID), weight: cfg.baseTemplateWeight}, - } { - if item.id == "" || item.weight == 0 { - continue - } - - key := placementAffinityKey(clusterID, item.kind, item.id) - pipe.ZIncrBy(ctx, key, 1, nodeID) - pipe.ZRemRangeByRank(ctx, key, 0, -cfg.topNodes-1) - pipe.Expire(ctx, key, cfg.ttl) - hasCommands = true - } - if !hasCommands { - return - } + key := placementAffinityKey(clusterID, "build", buildID) + pipe.ZIncrBy(ctx, key, 1, nodeID) + pipe.ZRemRangeByRank(ctx, key, 0, -cfg.topNodes-1) + pipe.Expire(ctx, key, cfg.ttl) _, err := pipe.Exec(ctx) if err != nil { @@ -151,43 +114,15 @@ func (a *placementAffinity) record(ctx context.Context, cfg placementAffinityCon } } -func (a *placementAffinity) scores(ctx context.Context, cfg placementAffinityConfig, clusterID uuid.UUID, teamID, buildID, templateID, baseTemplateID string) map[string]float64 { - if a == nil { +func (a *placementAffinity) scores(ctx context.Context, cfg placementAffinityConfig, clusterID uuid.UUID, buildID string) map[string]float64 { + if a == nil || buildID == "" || cfg.buildWeight == 0 { return nil } ctx, cancel := context.WithTimeout(ctx, cfg.readTimeout) defer cancel() pipe := a.redis.Pipeline() - type affinityCmd struct { - cmd *redis.ZSliceCmd - weight float64 - } - cmds := make([]affinityCmd, 0, 3) - for _, item := range []struct { - kind string - id string - weight float64 - }{ - {kind: "build", id: buildID, weight: cfg.buildWeight}, - {kind: "team", id: teamID, weight: cfg.teamWeight}, - {kind: "template", id: placementAffinityID(teamID, templateID), weight: cfg.templateWeight}, - {kind: "base-template", id: placementAffinityID(teamID, baseTemplateID), weight: cfg.baseTemplateWeight}, - } { - if item.id == "" || item.weight == 0 { - continue - } - - key := placementAffinityKey(clusterID, item.kind, item.id) - cmds = append(cmds, affinityCmd{ - cmd: pipe.ZRevRangeWithScores(ctx, key, 0, cfg.topNodes-1), - weight: item.weight, - }) - } - if len(cmds) == 0 { - return nil - } - + cmd := pipe.ZRevRangeWithScores(ctx, placementAffinityKey(clusterID, "build", buildID), 0, cfg.topNodes-1) _, err := pipe.Exec(ctx) if err != nil && !errors.Is(err, redis.Nil) { logger.L().Debug(ctx, "failed to read placement affinity", zap.Error(err)) @@ -196,20 +131,17 @@ func (a *placementAffinity) scores(ctx context.Context, cfg placementAffinityCon } scores := make(map[string]float64) - for _, cmd := range cmds { - rows, err := cmd.cmd.Result() - if err != nil { + rows, err := cmd.Result() + if err != nil { + return nil + } + for _, row := range rows { + nodeID, ok := row.Member.(string) + if !ok || nodeID == "" { continue } - - for _, row := range rows { - nodeID, ok := row.Member.(string) - if !ok || nodeID == "" { - continue - } - scores[nodeID] += math.Min(row.Score, cfg.maxAffinityScore) * cmd.weight - scores[nodeID] = math.Min(scores[nodeID], cfg.maxScoreBonus) - } + scores[nodeID] += math.Min(row.Score, cfg.maxAffinityScore) * cfg.buildWeight + scores[nodeID] = math.Min(scores[nodeID], cfg.maxScoreBonus) } if len(scores) == 0 { diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index c04b8dbe83..fee99566a1 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -162,17 +162,14 @@ var ( FreezeUserCgroupFlag = NewBoolFlag("freeze-user-cgroup", env.IsDevelopment()) SandboxPlacementCacheAffinityFlag = NewJSONFlag("sandbox-placement-cache-affinity", ldvalue.FromJSONMarshal(map[string]any{ - "enabled": false, - "ttlSeconds": 90000, - "topNodes": 20, - "readTimeoutMs": 100, - "writeTimeoutMs": 1000, - "maxAffinityScore": 10, - "maxScoreBonusPpm": 20000, - "buildWeightPpm": 1000, - "teamWeightPpm": 0, - "templateWeightPpm": 0, - "baseTemplateWeightPpm": 0, + "enabled": false, + "ttlSeconds": 90000, + "topNodes": 20, + "readTimeoutMs": 100, + "writeTimeoutMs": 1000, + "maxAffinityScore": 10, + "maxScoreBonusPpm": 20000, + "buildWeightPpm": 1000, })) NetworkTransformRulesFlag = NewBoolFlag("network-transform-rules", env.IsDevelopment()) From 4afd0cc09757e627afa32cf2b4c2238d2de91845 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 14:56:55 -0700 Subject: [PATCH 11/36] fix(api): skip exhausted affinity nodes Exclude nodes that reject placement as exhausted so affinity retries can fall through to other candidates. --- packages/api/internal/orchestrator/placement/placement.go | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/api/internal/orchestrator/placement/placement.go b/packages/api/internal/orchestrator/placement/placement.go index 2c5bd19876..8cb76ba364 100644 --- a/packages/api/internal/orchestrator/placement/placement.go +++ b/packages/api/internal/orchestrator/placement/placement.go @@ -102,6 +102,7 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node switch statusCode { case codes.ResourceExhausted: + nodesExcluded[failedNode.ID] = struct{}{} failedNode.PlacementMetrics.Skip(sbxRequest.GetSandbox().GetSandboxId()) logger.L().Warn(ctx, "Node exhausted, trying another node", logger.WithSandboxID(sbxRequest.GetSandbox().GetSandboxId()), logger.WithNodeID(failedNode.ID)) default: From efe533c3db7015491d996c97823720c0e28eb70d Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 15:19:53 -0700 Subject: [PATCH 12/36] fix(api): keep single-node placement retries --- packages/api/internal/orchestrator/placement/placement.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/api/internal/orchestrator/placement/placement.go b/packages/api/internal/orchestrator/placement/placement.go index 8cb76ba364..e468697cf7 100644 --- a/packages/api/internal/orchestrator/placement/placement.go +++ b/packages/api/internal/orchestrator/placement/placement.go @@ -102,7 +102,9 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node switch statusCode { case codes.ResourceExhausted: - nodesExcluded[failedNode.ID] = struct{}{} + if len(nodesExcluded)+1 < len(clusterNodes) { + nodesExcluded[failedNode.ID] = struct{}{} + } failedNode.PlacementMetrics.Skip(sbxRequest.GetSandbox().GetSandboxId()) logger.L().Warn(ctx, "Node exhausted, trying another node", logger.WithSandboxID(sbxRequest.GetSandbox().GetSandboxId()), logger.WithNodeID(failedNode.ID)) default: From a4662698023d723e247f57478c7cec6e6eca5736 Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Sat, 30 May 2026 15:30:17 -0700 Subject: [PATCH 13/36] fix(api): bound exhausted-node placement retries --- packages/api/internal/orchestrator/placement/placement.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/api/internal/orchestrator/placement/placement.go b/packages/api/internal/orchestrator/placement/placement.go index e468697cf7..29a6c34b19 100644 --- a/packages/api/internal/orchestrator/placement/placement.go +++ b/packages/api/internal/orchestrator/placement/placement.go @@ -104,6 +104,8 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node case codes.ResourceExhausted: if len(nodesExcluded)+1 < len(clusterNodes) { nodesExcluded[failedNode.ID] = struct{}{} + } else { + attempt++ } failedNode.PlacementMetrics.Skip(sbxRequest.GetSandbox().GetSandboxId()) logger.L().Warn(ctx, "Node exhausted, trying another node", logger.WithSandboxID(sbxRequest.GetSandbox().GetSandboxId()), logger.WithNodeID(failedNode.ID)) From 38d8022e3b68e2a1e4975906030866fcd6095bec Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 15:47:44 -0700 Subject: [PATCH 14/36] fix(api): refresh empty placement node pools --- packages/api/internal/orchestrator/create_instance.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/api/internal/orchestrator/create_instance.go b/packages/api/internal/orchestrator/create_instance.go index 554d10dde9..ea6613ea67 100644 --- a/packages/api/internal/orchestrator/create_instance.go +++ b/packages/api/internal/orchestrator/create_instance.go @@ -298,6 +298,10 @@ func (o *Orchestrator) CreateSandbox( nodeClusterID := clusters.WithClusterFallback(team.ClusterID) clusterNodes := o.GetClusterNodes(nodeClusterID) + if len(clusterNodes) == 0 { + o.discoverClusterNode(ctx, nodeClusterID) + clusterNodes = o.GetClusterNodes(nodeClusterID) + } labelFilteringEnabled := o.featureFlagsClient.BoolFlag(ctx, featureflags.SandboxLabelBasedSchedulingFlag, featureflags.TeamContext(team.ID.String()), featureflags.SandboxContext(sandboxID)) affinityBuildID := sbxData.Build.ID.String() From 35c3f9e760015a6d721dd9e5de6664ecab700ec1 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 16:07:17 -0700 Subject: [PATCH 15/36] fix(api): discover local combined orchestrators --- packages/api/internal/orchestrator/client.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/api/internal/orchestrator/client.go b/packages/api/internal/orchestrator/client.go index ba3447db71..8c291f3cf9 100644 --- a/packages/api/internal/orchestrator/client.go +++ b/packages/api/internal/orchestrator/client.go @@ -226,7 +226,12 @@ func (o *Orchestrator) discoverClusterNode(ctx context.Context, clusterID uuid.U var wg sync.WaitGroup defer wg.Wait() - for _, instance := range cluster.GetOrchestrators() { + instances := cluster.GetOrchestrators() + if len(instances) == 0 && cluster.ID == consts.LocalClusterID { + instances = cluster.GetTemplateBuilders() + } + + for _, instance := range instances { wg.Go(func() { o.connectToClusterNode(ctx, cluster, instance) }) From 75d9d736d4fc039d63425a220b1b6f8754bcc81b Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 16:22:47 -0700 Subject: [PATCH 16/36] fix(api): connect local orchestrator on empty placement --- .../api/internal/orchestrator/create_instance.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/packages/api/internal/orchestrator/create_instance.go b/packages/api/internal/orchestrator/create_instance.go index ea6613ea67..8b62425463 100644 --- a/packages/api/internal/orchestrator/create_instance.go +++ b/packages/api/internal/orchestrator/create_instance.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "net" "net/http" + "os" "time" "go.opentelemetry.io/otel/attribute" @@ -22,6 +24,7 @@ import ( "github.com/e2b-dev/infra/packages/db/queries" "github.com/e2b-dev/infra/packages/shared/pkg/clusters" "github.com/e2b-dev/infra/packages/shared/pkg/consts" + "github.com/e2b-dev/infra/packages/shared/pkg/env" "github.com/e2b-dev/infra/packages/shared/pkg/fcversion" "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator" @@ -302,6 +305,18 @@ func (o *Orchestrator) CreateSandbox( o.discoverClusterNode(ctx, nodeClusterID) clusterNodes = o.GetClusterNodes(nodeClusterID) } + if len(clusterNodes) == 0 && env.IsLocal() { + port := os.Getenv("ORCHESTRATOR_PORT") + if port == "" { + port = "5008" + } + _ = o.connectToNode(ctx, nodemanager.NomadServiceDiscovery{ + NomadNodeShortID: "local", + OrchestratorAddress: net.JoinHostPort("localhost", port), + IPAddress: "localhost", + }) + clusterNodes = o.GetClusterNodes(nodeClusterID) + } labelFilteringEnabled := o.featureFlagsClient.BoolFlag(ctx, featureflags.SandboxLabelBasedSchedulingFlag, featureflags.TeamContext(team.ID.String()), featureflags.SandboxContext(sandboxID)) affinityBuildID := sbxData.Build.ID.String() From 5b6375ce6e44fd45619b11127c2ed6419ad67775 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 16:37:33 -0700 Subject: [PATCH 17/36] fix(api): allow local placement while metrics settle --- .../internal/orchestrator/placement/placement_best_of_K.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/api/internal/orchestrator/placement/placement_best_of_K.go b/packages/api/internal/orchestrator/placement/placement_best_of_K.go index 8fe2e0a6d8..6cdb843d1a 100644 --- a/packages/api/internal/orchestrator/placement/placement_best_of_K.go +++ b/packages/api/internal/orchestrator/placement/placement_best_of_K.go @@ -9,6 +9,8 @@ import ( "github.com/e2b-dev/infra/packages/api/internal/api" "github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager" + "github.com/e2b-dev/infra/packages/shared/pkg/consts" + "github.com/e2b-dev/infra/packages/shared/pkg/env" "github.com/e2b-dev/infra/packages/shared/pkg/machineinfo" ) @@ -168,6 +170,9 @@ func (b *BestOfK) isCandidate(node *nodemanager.Node, config BestOfKConfig, excl if _, ok := excludedNodes[node.ID]; ok { return false } + if env.IsLocal() && node.ClusterID == consts.LocalClusterID { + return true + } if node.Status() != api.NodeStatusReady { return false } From 81d074e64d121921b03b4348c44f003dab2a7444 Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Sat, 30 May 2026 16:42:04 -0700 Subject: [PATCH 18/36] fix(api): use nomad refresh for local placement cache --- .../internal/orchestrator/create_instance.go | 19 ++++--------------- .../placement/placement_best_of_K.go | 2 +- 2 files changed, 5 insertions(+), 16 deletions(-) diff --git a/packages/api/internal/orchestrator/create_instance.go b/packages/api/internal/orchestrator/create_instance.go index 8b62425463..dd2e8a2ff8 100644 --- a/packages/api/internal/orchestrator/create_instance.go +++ b/packages/api/internal/orchestrator/create_instance.go @@ -4,9 +4,7 @@ import ( "context" "errors" "fmt" - "net" "net/http" - "os" "time" "go.opentelemetry.io/otel/attribute" @@ -24,7 +22,6 @@ import ( "github.com/e2b-dev/infra/packages/db/queries" "github.com/e2b-dev/infra/packages/shared/pkg/clusters" "github.com/e2b-dev/infra/packages/shared/pkg/consts" - "github.com/e2b-dev/infra/packages/shared/pkg/env" "github.com/e2b-dev/infra/packages/shared/pkg/fcversion" "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator" @@ -302,19 +299,11 @@ func (o *Orchestrator) CreateSandbox( nodeClusterID := clusters.WithClusterFallback(team.ClusterID) clusterNodes := o.GetClusterNodes(nodeClusterID) if len(clusterNodes) == 0 { - o.discoverClusterNode(ctx, nodeClusterID) - clusterNodes = o.GetClusterNodes(nodeClusterID) - } - if len(clusterNodes) == 0 && env.IsLocal() { - port := os.Getenv("ORCHESTRATOR_PORT") - if port == "" { - port = "5008" + if nodeClusterID == consts.LocalClusterID { + o.discoverNomadNodes(ctx) + } else { + o.discoverClusterNode(ctx, nodeClusterID) } - _ = o.connectToNode(ctx, nodemanager.NomadServiceDiscovery{ - NomadNodeShortID: "local", - OrchestratorAddress: net.JoinHostPort("localhost", port), - IPAddress: "localhost", - }) clusterNodes = o.GetClusterNodes(nodeClusterID) } diff --git a/packages/api/internal/orchestrator/placement/placement_best_of_K.go b/packages/api/internal/orchestrator/placement/placement_best_of_K.go index 6cdb843d1a..83d1d734d4 100644 --- a/packages/api/internal/orchestrator/placement/placement_best_of_K.go +++ b/packages/api/internal/orchestrator/placement/placement_best_of_K.go @@ -65,7 +65,7 @@ func (b *BestOfK) Score(node *nodemanager.Node, resources nodemanager.SandboxRes score := (cpuRequested + float64(reserved) + config.Alpha*usageAvg) / totalCapacity if len(affinityScores) > 0 { - score -= affinityScores[0][node.ID] + score -= affinityScores[0][node.ID] / totalCapacity } return score From a3c5f190e7561d5dbefb6d0e129bffb4342f037c Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Sat, 30 May 2026 16:54:15 -0700 Subject: [PATCH 19/36] fix(api): tighten placement affinity refresh and scoring --- .../internal/orchestrator/create_instance.go | 4 +++- .../placement/placement_best_of_K.go | 6 ++++++ .../orchestrator/placement_affinity.go | 20 +++++++------------ 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/packages/api/internal/orchestrator/create_instance.go b/packages/api/internal/orchestrator/create_instance.go index dd2e8a2ff8..18d59914e6 100644 --- a/packages/api/internal/orchestrator/create_instance.go +++ b/packages/api/internal/orchestrator/create_instance.go @@ -310,9 +310,11 @@ func (o *Orchestrator) CreateSandbox( labelFilteringEnabled := o.featureFlagsClient.BoolFlag(ctx, featureflags.SandboxLabelBasedSchedulingFlag, featureflags.TeamContext(team.ID.String()), featureflags.SandboxContext(sandboxID)) affinityBuildID := sbxData.Build.ID.String() if isResume { + // Resume targets the previous sandbox's node first; avoid cache affinity fighting that pin. affinityBuildID = "" } - placementCacheAffinityConfig := placementAffinityConfigFromFlags(ctx, o.featureFlagsClient, featureflags.TeamContext(team.ID.String()), featureflags.TemplateContext(sbxData.TemplateID), featureflags.SandboxContext(sandboxID)) + affinityFlagCtx := featureflags.AddToContext(ctx, featureflags.TeamContext(team.ID.String()), featureflags.TemplateContext(sbxData.TemplateID), featureflags.SandboxContext(sandboxID)) + placementCacheAffinityConfig := placementAffinityConfigFromFlags(affinityFlagCtx, o.featureFlagsClient) var affinityScores map[string]float64 if placementCacheAffinityConfig.enabled && node == nil { affinityScores = o.placementAffinity.scores(ctx, placementCacheAffinityConfig, nodeClusterID, affinityBuildID) diff --git a/packages/api/internal/orchestrator/placement/placement_best_of_K.go b/packages/api/internal/orchestrator/placement/placement_best_of_K.go index 83d1d734d4..3f07176900 100644 --- a/packages/api/internal/orchestrator/placement/placement_best_of_K.go +++ b/packages/api/internal/orchestrator/placement/placement_best_of_K.go @@ -170,21 +170,27 @@ func (b *BestOfK) isCandidate(node *nodemanager.Node, config BestOfKConfig, excl if _, ok := excludedNodes[node.ID]; ok { return false } + // Local nodes are synthetic and may not report the full production status/label set. if env.IsLocal() && node.ClusterID == consts.LocalClusterID { return true } + // If the node is not ready, we don't want to schedule a new sandbox on it. if node.Status() != api.NodeStatusReady { return false } + // If the node CPU doesn't match the requested machine, we skip it. if !isNodeCPUCompatible(node, buildMachineInfo) { return false } + // If label filtering is enabled, the node has to match the team's required labels. if filterByLabels && !isNodeLabelsCompatible(node, requiredLabels) { return false } + // If can-fit is enabled, the node must have enough capacity for the requested resources. if config.CanFit && !b.CanFit(node, resources, config) { return false } + // Avoid placing on nodes that already have too many sandboxes starting. if config.TooManyStarting && node.PlacementMetrics.InProgressCount() > maxStartingInstancesPerNode { return false } diff --git a/packages/api/internal/orchestrator/placement_affinity.go b/packages/api/internal/orchestrator/placement_affinity.go index e895d51ca8..aa641ba016 100644 --- a/packages/api/internal/orchestrator/placement_affinity.go +++ b/packages/api/internal/orchestrator/placement_affinity.go @@ -8,7 +8,6 @@ import ( "time" "github.com/google/uuid" - "github.com/launchdarkly/go-sdk-common/v3/ldcontext" "github.com/launchdarkly/go-sdk-common/v3/ldvalue" "github.com/redis/go-redis/v9" "go.uber.org/zap" @@ -27,6 +26,8 @@ const ( defaultPlacementAffinityMaxScore = 10 defaultPlacementAffinityMaxScoreBonusPpm = 20000 defaultPlacementAffinityBuildWeightPpm = 1000 + + placementAffinityKindBuild = "build" ) type placementAffinity struct { @@ -52,8 +53,8 @@ func newPlacementAffinity(redisClient redis.UniversalClient) *placementAffinity return &placementAffinity{redis: redisClient} } -func placementAffinityConfigFromFlags(ctx context.Context, ff *featureflags.Client, contexts ...ldcontext.Context) placementAffinityConfig { - v := ff.JSONFlag(ctx, featureflags.SandboxPlacementCacheAffinityFlag, contexts...) +func placementAffinityConfigFromFlags(ctx context.Context, ff *featureflags.Client) placementAffinityConfig { + v := ff.JSONFlag(ctx, featureflags.SandboxPlacementCacheAffinityFlag) return placementAffinityConfig{ enabled: v.GetByKey("enabled").BoolValue(), @@ -81,14 +82,7 @@ func jsonPPM(v ldvalue.Value, key string, fallback int) float64 { } func clampInt(v, minValue, maxValue int) int { - if v < minValue { - return minValue - } - if v > maxValue { - return maxValue - } - - return v + return max(minValue, min(v, maxValue)) } func placementAffinityKey(clusterID uuid.UUID, kind, id string) string { @@ -103,7 +97,7 @@ func (a *placementAffinity) record(ctx context.Context, cfg placementAffinityCon defer cancel() pipe := a.redis.Pipeline() - key := placementAffinityKey(clusterID, "build", buildID) + key := placementAffinityKey(clusterID, placementAffinityKindBuild, buildID) pipe.ZIncrBy(ctx, key, 1, nodeID) pipe.ZRemRangeByRank(ctx, key, 0, -cfg.topNodes-1) pipe.Expire(ctx, key, cfg.ttl) @@ -122,7 +116,7 @@ func (a *placementAffinity) scores(ctx context.Context, cfg placementAffinityCon defer cancel() pipe := a.redis.Pipeline() - cmd := pipe.ZRevRangeWithScores(ctx, placementAffinityKey(clusterID, "build", buildID), 0, cfg.topNodes-1) + cmd := pipe.ZRevRangeWithScores(ctx, placementAffinityKey(clusterID, placementAffinityKindBuild, buildID), 0, cfg.topNodes-1) _, err := pipe.Exec(ctx) if err != nil && !errors.Is(err, redis.Nil) { logger.L().Debug(ctx, "failed to read placement affinity", zap.Error(err)) From 579a4414a4a40b62182dd18d6658e5b6521e7e36 Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Sat, 30 May 2026 17:01:36 -0700 Subject: [PATCH 20/36] fix(api): record placement affinity asynchronously --- packages/api/internal/orchestrator/create_instance.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/api/internal/orchestrator/create_instance.go b/packages/api/internal/orchestrator/create_instance.go index 18d59914e6..b3004e1e0c 100644 --- a/packages/api/internal/orchestrator/create_instance.go +++ b/packages/api/internal/orchestrator/create_instance.go @@ -337,7 +337,7 @@ func (o *Orchestrator) CreateSandbox( } o.createdSandboxesCounter.Add(ctx, 1, metric.WithAttributes(attributes...)) if placementCacheAffinityConfig.enabled { - o.placementAffinity.record(ctx, placementCacheAffinityConfig, nodeClusterID, node.ID, affinityBuildID) + go o.placementAffinity.record(ctx, placementCacheAffinityConfig, nodeClusterID, node.ID, affinityBuildID) } telemetry.SetAttributes(ctx, attribute.String("node.id", node.ID)) From edbb4d790dd59de5b46f6d0dc28d06a22b0ef6bc Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 17:29:06 -0700 Subject: [PATCH 21/36] fix(api): restore local placement fallback --- packages/api/internal/orchestrator/create_instance.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/packages/api/internal/orchestrator/create_instance.go b/packages/api/internal/orchestrator/create_instance.go index b3004e1e0c..9801045835 100644 --- a/packages/api/internal/orchestrator/create_instance.go +++ b/packages/api/internal/orchestrator/create_instance.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "net" "net/http" "time" @@ -22,6 +23,7 @@ import ( "github.com/e2b-dev/infra/packages/db/queries" "github.com/e2b-dev/infra/packages/shared/pkg/clusters" "github.com/e2b-dev/infra/packages/shared/pkg/consts" + "github.com/e2b-dev/infra/packages/shared/pkg/env" "github.com/e2b-dev/infra/packages/shared/pkg/fcversion" "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator" @@ -301,6 +303,13 @@ func (o *Orchestrator) CreateSandbox( if len(clusterNodes) == 0 { if nodeClusterID == consts.LocalClusterID { o.discoverNomadNodes(ctx) + if len(o.GetClusterNodes(nodeClusterID)) == 0 { + _ = o.connectToNode(ctx, nodemanager.NomadServiceDiscovery{ + NomadNodeShortID: "local", + OrchestratorAddress: net.JoinHostPort("localhost", env.GetEnv("ORCHESTRATOR_PORT", "5008")), + IPAddress: "localhost", + }) + } } else { o.discoverClusterNode(ctx, nodeClusterID) } From d3376515a3586a88caad43a27558acf43874c1a5 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 17:44:01 -0700 Subject: [PATCH 22/36] fix(api): sync local combined orchestrator nodes --- packages/api/internal/orchestrator/cache.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/api/internal/orchestrator/cache.go b/packages/api/internal/orchestrator/cache.go index f22178f28e..3767bc4a33 100644 --- a/packages/api/internal/orchestrator/cache.go +++ b/packages/api/internal/orchestrator/cache.go @@ -13,6 +13,7 @@ import ( "github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager" "github.com/e2b-dev/infra/packages/api/internal/sandbox" + "github.com/e2b-dev/infra/packages/shared/pkg/consts" "github.com/e2b-dev/infra/packages/shared/pkg/logger" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" ) @@ -147,7 +148,11 @@ func (o *Orchestrator) syncClusterDiscoveredNodes(ctx context.Context) { // Connect clustered nodes that are not in the list, yet // We need to iterate over all clusters and their nodes for _, cluster := range o.clusters.GetClusters() { - for _, n := range cluster.GetOrchestrators() { + instances := cluster.GetOrchestrators() + if len(instances) == 0 && cluster.ID == consts.LocalClusterID { + instances = cluster.GetTemplateBuilders() + } + for _, n := range instances { // If the node is not in the list, connect to it if o.GetNode(cluster.ID, n.NodeID) == nil { wg.Go(func() { From c8b688267bd2c67cee8b2c6680c7f42647459c23 Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Sat, 30 May 2026 17:55:54 -0700 Subject: [PATCH 23/36] fix(api): retry full exhausted pool instead of pinning last node --- .../orchestrator/placement/placement.go | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/packages/api/internal/orchestrator/placement/placement.go b/packages/api/internal/orchestrator/placement/placement.go index 29a6c34b19..9f341c698c 100644 --- a/packages/api/internal/orchestrator/placement/placement.go +++ b/packages/api/internal/orchestrator/placement/placement.go @@ -33,7 +33,8 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node ctx, span := tracer.Start(ctx, "place-sandbox") defer span.End() - nodesExcluded := make(map[string]struct{}) + nodesExcluded := make(map[string]struct{}) // hard failures, never retried + nodesExhausted := make(map[string]struct{}) // capacity-exhausted, retried as a pool var err error var node *nodemanager.Node @@ -53,11 +54,18 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node if node != nil { telemetry.ReportEvent(ctx, "Placing sandbox on the preferred node", telemetry.WithNodeID(node.ID)) } else { - if len(nodesExcluded) >= len(clusterNodes) { + skip := make(map[string]struct{}, len(nodesExcluded)+len(nodesExhausted)) + for id := range nodesExcluded { + skip[id] = struct{}{} + } + for id := range nodesExhausted { + skip[id] = struct{}{} + } + if len(skip) >= len(clusterNodes) { return nil, errors.New("no nodes available") } - node, err = algorithm.chooseNode(ctx, clusterNodes, nodesExcluded, nodemanager.SandboxResources{CPUs: sbxRequest.GetSandbox().GetVcpu(), MiBMemory: sbxRequest.GetSandbox().GetRamMb()}, buildMachineInfo, labelFilteringEnabled, requiredLabels, affinityScores...) + node, err = algorithm.chooseNode(ctx, clusterNodes, skip, nodemanager.SandboxResources{CPUs: sbxRequest.GetSandbox().GetVcpu(), MiBMemory: sbxRequest.GetSandbox().GetRamMb()}, buildMachineInfo, labelFilteringEnabled, requiredLabels, affinityScores...) if err != nil { return nil, err } @@ -102,12 +110,14 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node switch statusCode { case codes.ResourceExhausted: - if len(nodesExcluded)+1 < len(clusterNodes) { - nodesExcluded[failedNode.ID] = struct{}{} - } else { + nodesExhausted[failedNode.ID] = struct{}{} + failedNode.PlacementMetrics.Skip(sbxRequest.GetSandbox().GetSandboxId()) + // Once every node is excluded but some were only capacity-exhausted, + // retry the whole exhausted pool since capacity may free up. + if len(nodesExcluded)+len(nodesExhausted) >= len(clusterNodes) { + clear(nodesExhausted) attempt++ } - failedNode.PlacementMetrics.Skip(sbxRequest.GetSandbox().GetSandboxId()) logger.L().Warn(ctx, "Node exhausted, trying another node", logger.WithSandboxID(sbxRequest.GetSandbox().GetSandboxId()), logger.WithNodeID(failedNode.ID)) default: nodesExcluded[failedNode.ID] = struct{}{} From 8a6080cc64b64a55cdba837b68370778f01301cb Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 18:11:42 -0700 Subject: [PATCH 24/36] fix(api): refresh local cluster before placement --- packages/api/internal/orchestrator/create_instance.go | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/api/internal/orchestrator/create_instance.go b/packages/api/internal/orchestrator/create_instance.go index 9801045835..e6f69d2aa9 100644 --- a/packages/api/internal/orchestrator/create_instance.go +++ b/packages/api/internal/orchestrator/create_instance.go @@ -302,6 +302,7 @@ func (o *Orchestrator) CreateSandbox( clusterNodes := o.GetClusterNodes(nodeClusterID) if len(clusterNodes) == 0 { if nodeClusterID == consts.LocalClusterID { + o.discoverClusterNode(ctx, nodeClusterID) o.discoverNomadNodes(ctx) if len(o.GetClusterNodes(nodeClusterID)) == 0 { _ = o.connectToNode(ctx, nodemanager.NomadServiceDiscovery{ From 08c664de5d05a09742a91917cff17a4528b51ed4 Mon Sep 17 00:00:00 2001 From: ValentaTomas Date: Sat, 30 May 2026 18:35:08 -0700 Subject: [PATCH 25/36] fix(api): drive exhausted-pool retry off node selection failure --- .../orchestrator/placement/placement.go | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/packages/api/internal/orchestrator/placement/placement.go b/packages/api/internal/orchestrator/placement/placement.go index 9f341c698c..9b6b524150 100644 --- a/packages/api/internal/orchestrator/placement/placement.go +++ b/packages/api/internal/orchestrator/placement/placement.go @@ -61,12 +61,22 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node for id := range nodesExhausted { skip[id] = struct{}{} } - if len(skip) >= len(clusterNodes) { - return nil, errors.New("no nodes available") - } - node, err = algorithm.chooseNode(ctx, clusterNodes, skip, nodemanager.SandboxResources{CPUs: sbxRequest.GetSandbox().GetVcpu(), MiBMemory: sbxRequest.GetSandbox().GetRamMb()}, buildMachineInfo, labelFilteringEnabled, requiredLabels, affinityScores...) + if len(skip) < len(clusterNodes) { + node, err = algorithm.chooseNode(ctx, clusterNodes, skip, nodemanager.SandboxResources{CPUs: sbxRequest.GetSandbox().GetVcpu(), MiBMemory: sbxRequest.GetSandbox().GetRamMb()}, buildMachineInfo, labelFilteringEnabled, requiredLabels, affinityScores...) + } else { + node, err = nil, errors.New("no nodes available") + } if err != nil { + // No eligible node. If some were only capacity-exhausted, retry the + // whole exhausted pool since capacity may free up. + if len(nodesExhausted) > 0 { + clear(nodesExhausted) + attempt++ + + continue + } + return nil, err } @@ -112,12 +122,6 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node case codes.ResourceExhausted: nodesExhausted[failedNode.ID] = struct{}{} failedNode.PlacementMetrics.Skip(sbxRequest.GetSandbox().GetSandboxId()) - // Once every node is excluded but some were only capacity-exhausted, - // retry the whole exhausted pool since capacity may free up. - if len(nodesExcluded)+len(nodesExhausted) >= len(clusterNodes) { - clear(nodesExhausted) - attempt++ - } logger.L().Warn(ctx, "Node exhausted, trying another node", logger.WithSandboxID(sbxRequest.GetSandbox().GetSandboxId()), logger.WithNodeID(failedNode.ID)) default: nodesExcluded[failedNode.ID] = struct{}{} From c6e1e0c45f2d401aea21b5404d3b39490f56ad76 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 19:27:35 -0700 Subject: [PATCH 26/36] fix(api): connect local builders for placement fallback --- packages/api/internal/orchestrator/create_instance.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/api/internal/orchestrator/create_instance.go b/packages/api/internal/orchestrator/create_instance.go index e6f69d2aa9..43265a2f24 100644 --- a/packages/api/internal/orchestrator/create_instance.go +++ b/packages/api/internal/orchestrator/create_instance.go @@ -303,6 +303,13 @@ func (o *Orchestrator) CreateSandbox( if len(clusterNodes) == 0 { if nodeClusterID == consts.LocalClusterID { o.discoverClusterNode(ctx, nodeClusterID) + if len(o.GetClusterNodes(nodeClusterID)) == 0 { + if cluster, ok := o.clusters.GetClusterById(nodeClusterID); ok { + for _, instance := range cluster.GetTemplateBuilders() { + o.connectToClusterNode(ctx, cluster, instance) + } + } + } o.discoverNomadNodes(ctx) if len(o.GetClusterNodes(nodeClusterID)) == 0 { _ = o.connectToNode(ctx, nodemanager.NomadServiceDiscovery{ From fdb1b1e76836707fe7840b5e370920287b1173e3 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 19:42:37 -0700 Subject: [PATCH 27/36] fix(api): keep local placement capacity filters --- .../internal/orchestrator/placement/placement_best_of_K.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/packages/api/internal/orchestrator/placement/placement_best_of_K.go b/packages/api/internal/orchestrator/placement/placement_best_of_K.go index 3f07176900..334f400279 100644 --- a/packages/api/internal/orchestrator/placement/placement_best_of_K.go +++ b/packages/api/internal/orchestrator/placement/placement_best_of_K.go @@ -9,8 +9,6 @@ import ( "github.com/e2b-dev/infra/packages/api/internal/api" "github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager" - "github.com/e2b-dev/infra/packages/shared/pkg/consts" - "github.com/e2b-dev/infra/packages/shared/pkg/env" "github.com/e2b-dev/infra/packages/shared/pkg/machineinfo" ) @@ -170,10 +168,6 @@ func (b *BestOfK) isCandidate(node *nodemanager.Node, config BestOfKConfig, excl if _, ok := excludedNodes[node.ID]; ok { return false } - // Local nodes are synthetic and may not report the full production status/label set. - if env.IsLocal() && node.ClusterID == consts.LocalClusterID { - return true - } // If the node is not ready, we don't want to schedule a new sandbox on it. if node.Status() != api.NodeStatusReady { return false From eab1dc2b23429bbd5bd2521dc3a3946aa09867da Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 19:58:52 -0700 Subject: [PATCH 28/36] fix(api): wait for exhausted placement capacity --- packages/api/internal/orchestrator/placement/placement.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/api/internal/orchestrator/placement/placement.go b/packages/api/internal/orchestrator/placement/placement.go index 9b6b524150..7a13cb54e5 100644 --- a/packages/api/internal/orchestrator/placement/placement.go +++ b/packages/api/internal/orchestrator/placement/placement.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "go.opentelemetry.io/otel" "go.uber.org/zap" @@ -72,7 +73,7 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node // whole exhausted pool since capacity may free up. if len(nodesExhausted) > 0 { clear(nodesExhausted) - attempt++ + time.Sleep(100 * time.Millisecond) continue } From cc81426548d22723dbe34ddb37141379bbb21481 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 19:59:58 -0700 Subject: [PATCH 29/36] fix(api): ignore builders for placement discovery Do not use template-builder instances as placement nodes when local orchestrator discovery has no candidates. --- packages/api/internal/orchestrator/cache.go | 7 +------ packages/api/internal/orchestrator/client.go | 7 +------ .../api/internal/orchestrator/create_instance.go | 13 +++++-------- 3 files changed, 7 insertions(+), 20 deletions(-) diff --git a/packages/api/internal/orchestrator/cache.go b/packages/api/internal/orchestrator/cache.go index 3767bc4a33..f22178f28e 100644 --- a/packages/api/internal/orchestrator/cache.go +++ b/packages/api/internal/orchestrator/cache.go @@ -13,7 +13,6 @@ import ( "github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager" "github.com/e2b-dev/infra/packages/api/internal/sandbox" - "github.com/e2b-dev/infra/packages/shared/pkg/consts" "github.com/e2b-dev/infra/packages/shared/pkg/logger" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" ) @@ -148,11 +147,7 @@ func (o *Orchestrator) syncClusterDiscoveredNodes(ctx context.Context) { // Connect clustered nodes that are not in the list, yet // We need to iterate over all clusters and their nodes for _, cluster := range o.clusters.GetClusters() { - instances := cluster.GetOrchestrators() - if len(instances) == 0 && cluster.ID == consts.LocalClusterID { - instances = cluster.GetTemplateBuilders() - } - for _, n := range instances { + for _, n := range cluster.GetOrchestrators() { // If the node is not in the list, connect to it if o.GetNode(cluster.ID, n.NodeID) == nil { wg.Go(func() { diff --git a/packages/api/internal/orchestrator/client.go b/packages/api/internal/orchestrator/client.go index 8c291f3cf9..ba3447db71 100644 --- a/packages/api/internal/orchestrator/client.go +++ b/packages/api/internal/orchestrator/client.go @@ -226,12 +226,7 @@ func (o *Orchestrator) discoverClusterNode(ctx context.Context, clusterID uuid.U var wg sync.WaitGroup defer wg.Wait() - instances := cluster.GetOrchestrators() - if len(instances) == 0 && cluster.ID == consts.LocalClusterID { - instances = cluster.GetTemplateBuilders() - } - - for _, instance := range instances { + for _, instance := range cluster.GetOrchestrators() { wg.Go(func() { o.connectToClusterNode(ctx, cluster, instance) }) diff --git a/packages/api/internal/orchestrator/create_instance.go b/packages/api/internal/orchestrator/create_instance.go index 43265a2f24..259bc1632c 100644 --- a/packages/api/internal/orchestrator/create_instance.go +++ b/packages/api/internal/orchestrator/create_instance.go @@ -303,13 +303,6 @@ func (o *Orchestrator) CreateSandbox( if len(clusterNodes) == 0 { if nodeClusterID == consts.LocalClusterID { o.discoverClusterNode(ctx, nodeClusterID) - if len(o.GetClusterNodes(nodeClusterID)) == 0 { - if cluster, ok := o.clusters.GetClusterById(nodeClusterID); ok { - for _, instance := range cluster.GetTemplateBuilders() { - o.connectToClusterNode(ctx, cluster, instance) - } - } - } o.discoverNomadNodes(ctx) if len(o.GetClusterNodes(nodeClusterID)) == 0 { _ = o.connectToNode(ctx, nodemanager.NomadServiceDiscovery{ @@ -337,7 +330,11 @@ func (o *Orchestrator) CreateSandbox( affinityScores = o.placementAffinity.scores(ctx, placementCacheAffinityConfig, nodeClusterID, affinityBuildID) } - node, err = placement.PlaceSandbox(ctx, o.placementAlgorithm, clusterNodes, node, sbxRequest, builds.ToMachineInfo(sbxData.Build), labelFilteringEnabled, team.SandboxSchedulingLabels, affinityScores) + if affinityScores == nil { + node, err = placement.PlaceSandbox(ctx, o.placementAlgorithm, clusterNodes, node, sbxRequest, builds.ToMachineInfo(sbxData.Build), labelFilteringEnabled, team.SandboxSchedulingLabels) + } else { + node, err = placement.PlaceSandbox(ctx, o.placementAlgorithm, clusterNodes, node, sbxRequest, builds.ToMachineInfo(sbxData.Build), labelFilteringEnabled, team.SandboxSchedulingLabels, affinityScores) + } if err != nil { return sandbox.Sandbox{}, &api.APIError{ Code: http.StatusInternalServerError, From 4317b9153372a4dcc68fa872dc1f0ef8e8ecc35a Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 20:05:57 -0700 Subject: [PATCH 30/36] fix(api): bound exhausted placement retries --- packages/api/internal/orchestrator/placement/config.go | 1 + packages/api/internal/orchestrator/placement/placement.go | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/packages/api/internal/orchestrator/placement/config.go b/packages/api/internal/orchestrator/placement/config.go index 4a0fe17705..3db07ad5b1 100644 --- a/packages/api/internal/orchestrator/placement/config.go +++ b/packages/api/internal/orchestrator/placement/config.go @@ -2,5 +2,6 @@ package placement const ( maxRetries = 3 + maxExhaustedRetries = 100 maxStartingInstancesPerNode = 3 ) diff --git a/packages/api/internal/orchestrator/placement/placement.go b/packages/api/internal/orchestrator/placement/placement.go index 7a13cb54e5..23f63900b7 100644 --- a/packages/api/internal/orchestrator/placement/placement.go +++ b/packages/api/internal/orchestrator/placement/placement.go @@ -36,6 +36,7 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node nodesExcluded := make(map[string]struct{}) // hard failures, never retried nodesExhausted := make(map[string]struct{}) // capacity-exhausted, retried as a pool + exhaustedRetries := 0 var err error var node *nodemanager.Node @@ -72,6 +73,10 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node // No eligible node. If some were only capacity-exhausted, retry the // whole exhausted pool since capacity may free up. if len(nodesExhausted) > 0 { + exhaustedRetries++ + if exhaustedRetries >= maxExhaustedRetries { + return nil, err + } clear(nodesExhausted) time.Sleep(100 * time.Millisecond) From aa3228ac69f21e36fcfba13080562a0ddeae2098 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 20:06:44 -0700 Subject: [PATCH 31/36] fix(api): bound exhausted placement retries Count exhausted-pool retries so a fully saturated cluster exits after the normal retry budget. --- packages/api/internal/orchestrator/placement/placement.go | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/api/internal/orchestrator/placement/placement.go b/packages/api/internal/orchestrator/placement/placement.go index 7a13cb54e5..1aeac78e12 100644 --- a/packages/api/internal/orchestrator/placement/placement.go +++ b/packages/api/internal/orchestrator/placement/placement.go @@ -73,6 +73,7 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node // whole exhausted pool since capacity may free up. if len(nodesExhausted) > 0 { clear(nodesExhausted) + attempt++ time.Sleep(100 * time.Millisecond) continue From 9277fc3cd9f01e5b4816ff3a7cfbfb60badf0930 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 20:08:03 -0700 Subject: [PATCH 32/36] chore(api): trim placement affinity tests --- .../placement/placement_best_of_K_test.go | 40 ------------------- 1 file changed, 40 deletions(-) diff --git a/packages/api/internal/orchestrator/placement/placement_best_of_K_test.go b/packages/api/internal/orchestrator/placement/placement_best_of_K_test.go index 4dd9d86c53..87aed15901 100644 --- a/packages/api/internal/orchestrator/placement/placement_best_of_K_test.go +++ b/packages/api/internal/orchestrator/placement/placement_best_of_K_test.go @@ -95,46 +95,6 @@ func TestBestOfK_Score_WithPendingResources(t *testing.T) { assert.Greater(t, scorePending, scoreNormal, "Node with pending resources should receive a higher (worse) score") } -func TestBestOfK_ChooseNode_ConsidersAffinityOutsideRandomSample(t *testing.T) { - t.Parallel() - - ctx := t.Context() - config := BestOfKConfig{ - R: 10, - Alpha: 0.5, - K: 1, - } - algo := NewBestOfK(config).(*BestOfK) - - hot := nodemanager.NewTestNode("hot", api.NodeStatusReady, 0, 4) - cold := nodemanager.NewTestNode("cold", api.NodeStatusReady, 0, 4) - nodes := []*nodemanager.Node{cold, hot} - - selected, err := algo.chooseNode(ctx, nodes, nil, nodemanager.SandboxResources{CPUs: 1, MiBMemory: 512}, machineinfo.MachineInfo{}, false, nil, map[string]float64{"hot": 0.1}) - require.NoError(t, err) - require.Equal(t, "hot", selected.ID) -} - -func TestBestOfK_ChooseNode_IgnoresIneligibleAffinity(t *testing.T) { - t.Parallel() - - ctx := t.Context() - config := BestOfKConfig{ - R: 10, - Alpha: 0.5, - K: 1, - } - algo := NewBestOfK(config).(*BestOfK) - - hot := nodemanager.NewTestNode("hot", api.NodeStatusDraining, 0, 4) - cold := nodemanager.NewTestNode("cold", api.NodeStatusReady, 0, 4) - nodes := []*nodemanager.Node{cold, hot} - - selected, err := algo.chooseNode(ctx, nodes, nil, nodemanager.SandboxResources{CPUs: 1, MiBMemory: 512}, machineinfo.MachineInfo{}, false, nil, map[string]float64{"hot": 0.1}) - require.NoError(t, err) - require.Equal(t, "cold", selected.ID) -} - func TestBestOfK_CanFit(t *testing.T) { t.Parallel() config := DefaultBestOfKConfig() From e6620efae7ea02fce7d312983708e38602322fa4 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 20:46:06 -0700 Subject: [PATCH 33/36] fix(api): connect local combined service for placement --- packages/api/internal/orchestrator/create_instance.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/api/internal/orchestrator/create_instance.go b/packages/api/internal/orchestrator/create_instance.go index 259bc1632c..fc50a87bba 100644 --- a/packages/api/internal/orchestrator/create_instance.go +++ b/packages/api/internal/orchestrator/create_instance.go @@ -303,6 +303,13 @@ func (o *Orchestrator) CreateSandbox( if len(clusterNodes) == 0 { if nodeClusterID == consts.LocalClusterID { o.discoverClusterNode(ctx, nodeClusterID) + if len(o.GetClusterNodes(nodeClusterID)) == 0 { + if cluster, ok := o.clusters.GetClusterById(nodeClusterID); ok { + for _, instance := range cluster.GetTemplateBuilders() { + o.connectToClusterNode(ctx, cluster, instance) + } + } + } o.discoverNomadNodes(ctx) if len(o.GetClusterNodes(nodeClusterID)) == 0 { _ = o.connectToNode(ctx, nodemanager.NomadServiceDiscovery{ From b0d874a8da5c735bdcfdfb75c4b3c59d132b9bbd Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 20:52:24 -0700 Subject: [PATCH 34/36] fix(api): keep exhausted retries separate Let ResourceExhausted pool retries use their own retry cap instead of consuming the create-failure retry budget. --- packages/api/internal/orchestrator/placement/placement.go | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/api/internal/orchestrator/placement/placement.go b/packages/api/internal/orchestrator/placement/placement.go index 534b44d8af..23f63900b7 100644 --- a/packages/api/internal/orchestrator/placement/placement.go +++ b/packages/api/internal/orchestrator/placement/placement.go @@ -78,7 +78,6 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node return nil, err } clear(nodesExhausted) - attempt++ time.Sleep(100 * time.Millisecond) continue From 3d362021fe10a7cbd51d1a19927e180b20d5995f Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 22:10:33 -0700 Subject: [PATCH 35/36] fix(api): avoid builder-only placement fallback --- packages/api/internal/orchestrator/create_instance.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/packages/api/internal/orchestrator/create_instance.go b/packages/api/internal/orchestrator/create_instance.go index fc50a87bba..25456abb68 100644 --- a/packages/api/internal/orchestrator/create_instance.go +++ b/packages/api/internal/orchestrator/create_instance.go @@ -302,14 +302,6 @@ func (o *Orchestrator) CreateSandbox( clusterNodes := o.GetClusterNodes(nodeClusterID) if len(clusterNodes) == 0 { if nodeClusterID == consts.LocalClusterID { - o.discoverClusterNode(ctx, nodeClusterID) - if len(o.GetClusterNodes(nodeClusterID)) == 0 { - if cluster, ok := o.clusters.GetClusterById(nodeClusterID); ok { - for _, instance := range cluster.GetTemplateBuilders() { - o.connectToClusterNode(ctx, cluster, instance) - } - } - } o.discoverNomadNodes(ctx) if len(o.GetClusterNodes(nodeClusterID)) == 0 { _ = o.connectToNode(ctx, nodemanager.NomadServiceDiscovery{ From b9916c4da76bf149b251048e2a84b2c7d2a64647 Mon Sep 17 00:00:00 2001 From: Tomas Valenta Date: Sat, 30 May 2026 22:27:52 -0700 Subject: [PATCH 36/36] fix(api): keep affinity bonus on score scale --- .../api/internal/orchestrator/placement/placement_best_of_K.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/api/internal/orchestrator/placement/placement_best_of_K.go b/packages/api/internal/orchestrator/placement/placement_best_of_K.go index 334f400279..adc9f11c7a 100644 --- a/packages/api/internal/orchestrator/placement/placement_best_of_K.go +++ b/packages/api/internal/orchestrator/placement/placement_best_of_K.go @@ -63,7 +63,7 @@ func (b *BestOfK) Score(node *nodemanager.Node, resources nodemanager.SandboxRes score := (cpuRequested + float64(reserved) + config.Alpha*usageAvg) / totalCapacity if len(affinityScores) > 0 { - score -= affinityScores[0][node.ID] / totalCapacity + score -= affinityScores[0][node.ID] } return score