diff --git a/packages/api/internal/orchestrator/create_instance.go b/packages/api/internal/orchestrator/create_instance.go index c4f2239ee7..25456abb68 100644 --- a/packages/api/internal/orchestrator/create_instance.go +++ b/packages/api/internal/orchestrator/create_instance.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "net" "net/http" "time" @@ -22,6 +23,7 @@ import ( "github.com/e2b-dev/infra/packages/db/queries" "github.com/e2b-dev/infra/packages/shared/pkg/clusters" "github.com/e2b-dev/infra/packages/shared/pkg/consts" + "github.com/e2b-dev/infra/packages/shared/pkg/env" "github.com/e2b-dev/infra/packages/shared/pkg/fcversion" "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator" @@ -298,10 +300,40 @@ func (o *Orchestrator) CreateSandbox( nodeClusterID := clusters.WithClusterFallback(team.ClusterID) clusterNodes := o.GetClusterNodes(nodeClusterID) + if len(clusterNodes) == 0 { + if nodeClusterID == consts.LocalClusterID { + o.discoverNomadNodes(ctx) + if len(o.GetClusterNodes(nodeClusterID)) == 0 { + _ = o.connectToNode(ctx, nodemanager.NomadServiceDiscovery{ + NomadNodeShortID: "local", + OrchestratorAddress: net.JoinHostPort("localhost", env.GetEnv("ORCHESTRATOR_PORT", "5008")), + IPAddress: "localhost", + }) + } + } else { + o.discoverClusterNode(ctx, nodeClusterID) + } + clusterNodes = o.GetClusterNodes(nodeClusterID) + } labelFilteringEnabled := o.featureFlagsClient.BoolFlag(ctx, featureflags.SandboxLabelBasedSchedulingFlag, featureflags.TeamContext(team.ID.String()), featureflags.SandboxContext(sandboxID)) + affinityBuildID := sbxData.Build.ID.String() + if isResume { + // Resume targets the previous sandbox's node first; avoid cache affinity fighting that pin. + affinityBuildID = "" + } + affinityFlagCtx := featureflags.AddToContext(ctx, featureflags.TeamContext(team.ID.String()), featureflags.TemplateContext(sbxData.TemplateID), featureflags.SandboxContext(sandboxID)) + placementCacheAffinityConfig := placementAffinityConfigFromFlags(affinityFlagCtx, o.featureFlagsClient) + var affinityScores map[string]float64 + if placementCacheAffinityConfig.enabled && node == nil { + affinityScores = o.placementAffinity.scores(ctx, placementCacheAffinityConfig, nodeClusterID, affinityBuildID) + } - node, err = placement.PlaceSandbox(ctx, o.placementAlgorithm, clusterNodes, node, sbxRequest, builds.ToMachineInfo(sbxData.Build), labelFilteringEnabled, team.SandboxSchedulingLabels) + if affinityScores == nil { + node, err = placement.PlaceSandbox(ctx, o.placementAlgorithm, clusterNodes, node, sbxRequest, builds.ToMachineInfo(sbxData.Build), labelFilteringEnabled, team.SandboxSchedulingLabels) + } else { + node, err = placement.PlaceSandbox(ctx, o.placementAlgorithm, clusterNodes, node, sbxRequest, builds.ToMachineInfo(sbxData.Build), labelFilteringEnabled, team.SandboxSchedulingLabels, affinityScores) + } if err != nil { return sandbox.Sandbox{}, &api.APIError{ Code: http.StatusInternalServerError, @@ -317,6 +349,9 @@ func (o *Orchestrator) CreateSandbox( attribute.Bool("node_affinity_success", sbxData.NodeID != nil && node.ID == *sbxData.NodeID), } o.createdSandboxesCounter.Add(ctx, 1, metric.WithAttributes(attributes...)) + if placementCacheAffinityConfig.enabled { + go o.placementAffinity.record(ctx, placementCacheAffinityConfig, nodeClusterID, node.ID, affinityBuildID) + } telemetry.SetAttributes(ctx, attribute.String("node.id", node.ID)) telemetry.ReportEvent(ctx, "Created sandbox") diff --git a/packages/api/internal/orchestrator/orchestrator.go b/packages/api/internal/orchestrator/orchestrator.go index 472fb34128..9034d74381 100644 --- a/packages/api/internal/orchestrator/orchestrator.go +++ b/packages/api/internal/orchestrator/orchestrator.go @@ -65,6 +65,7 @@ type Orchestrator struct { snapshotUpsertSem *utils.AdjustableSemaphore redisStorage *redisbackend.Storage + placementAffinity *placementAffinity // connectGroup deduplicates concurrent dial+register attempts for the same // physical node. It is keyed by NomadNodeShortID (Nomad-managed nodes) or @@ -147,6 +148,7 @@ func New( tel: tel, clusters: clusters, redisStorage: redisStorage, + placementAffinity: newPlacementAffinity(redisClient), createdCounter: createdCounter, diff --git a/packages/api/internal/orchestrator/placement/config.go b/packages/api/internal/orchestrator/placement/config.go index 4a0fe17705..3db07ad5b1 100644 --- a/packages/api/internal/orchestrator/placement/config.go +++ b/packages/api/internal/orchestrator/placement/config.go @@ -2,5 +2,6 @@ package placement const ( maxRetries = 3 + maxExhaustedRetries = 100 maxStartingInstancesPerNode = 3 ) diff --git a/packages/api/internal/orchestrator/placement/placement.go b/packages/api/internal/orchestrator/placement/placement.go index 54a3c78029..23f63900b7 100644 --- a/packages/api/internal/orchestrator/placement/placement.go +++ b/packages/api/internal/orchestrator/placement/placement.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "go.opentelemetry.io/otel" "go.uber.org/zap" @@ -26,14 +27,16 @@ var errSandboxCreateFailed = errors.New("failed to create a new sandbox, if the // Implementations should choose an optimal node based on available resources // and current load distribution. type Algorithm interface { - chooseNode(ctx context.Context, nodes []*nodemanager.Node, nodesExcluded map[string]struct{}, requested nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) (*nodemanager.Node, error) + chooseNode(ctx context.Context, nodes []*nodemanager.Node, nodesExcluded map[string]struct{}, requested nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string, affinityScores ...map[string]float64) (*nodemanager.Node, error) } -func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*nodemanager.Node, preferredNode *nodemanager.Node, sbxRequest *orchestrator.SandboxCreateRequest, buildMachineInfo machineinfo.MachineInfo, labelFilteringEnabled bool, requiredLabels []string) (*nodemanager.Node, error) { +func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*nodemanager.Node, preferredNode *nodemanager.Node, sbxRequest *orchestrator.SandboxCreateRequest, buildMachineInfo machineinfo.MachineInfo, labelFilteringEnabled bool, requiredLabels []string, affinityScores ...map[string]float64) (*nodemanager.Node, error) { ctx, span := tracer.Start(ctx, "place-sandbox") defer span.End() - nodesExcluded := make(map[string]struct{}) + nodesExcluded := make(map[string]struct{}) // hard failures, never retried + nodesExhausted := make(map[string]struct{}) // capacity-exhausted, retried as a pool + exhaustedRetries := 0 var err error var node *nodemanager.Node @@ -53,12 +56,33 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node if node != nil { telemetry.ReportEvent(ctx, "Placing sandbox on the preferred node", telemetry.WithNodeID(node.ID)) } else { - if len(nodesExcluded) >= len(clusterNodes) { - return nil, errors.New("no nodes available") + skip := make(map[string]struct{}, len(nodesExcluded)+len(nodesExhausted)) + for id := range nodesExcluded { + skip[id] = struct{}{} + } + for id := range nodesExhausted { + skip[id] = struct{}{} } - node, err = algorithm.chooseNode(ctx, clusterNodes, nodesExcluded, nodemanager.SandboxResources{CPUs: sbxRequest.GetSandbox().GetVcpu(), MiBMemory: sbxRequest.GetSandbox().GetRamMb()}, buildMachineInfo, labelFilteringEnabled, requiredLabels) + if len(skip) < len(clusterNodes) { + node, err = algorithm.chooseNode(ctx, clusterNodes, skip, nodemanager.SandboxResources{CPUs: sbxRequest.GetSandbox().GetVcpu(), MiBMemory: sbxRequest.GetSandbox().GetRamMb()}, buildMachineInfo, labelFilteringEnabled, requiredLabels, affinityScores...) + } else { + node, err = nil, errors.New("no nodes available") + } if err != nil { + // No eligible node. If some were only capacity-exhausted, retry the + // whole exhausted pool since capacity may free up. + if len(nodesExhausted) > 0 { + exhaustedRetries++ + if exhaustedRetries >= maxExhaustedRetries { + return nil, err + } + clear(nodesExhausted) + time.Sleep(100 * time.Millisecond) + + continue + } + return nil, err } @@ -102,6 +126,7 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node switch statusCode { case codes.ResourceExhausted: + nodesExhausted[failedNode.ID] = struct{}{} failedNode.PlacementMetrics.Skip(sbxRequest.GetSandbox().GetSandboxId()) logger.L().Warn(ctx, "Node exhausted, trying another node", logger.WithSandboxID(sbxRequest.GetSandbox().GetSandboxId()), logger.WithNodeID(failedNode.ID)) default: diff --git a/packages/api/internal/orchestrator/placement/placement_best_of_K.go b/packages/api/internal/orchestrator/placement/placement_best_of_K.go index d06f2fb926..adc9f11c7a 100644 --- a/packages/api/internal/orchestrator/placement/placement_best_of_K.go +++ b/packages/api/internal/orchestrator/placement/placement_best_of_K.go @@ -36,7 +36,7 @@ func DefaultBestOfKConfig() BestOfKConfig { } // Score calculates the placement score for this node -func (b *BestOfK) Score(node *nodemanager.Node, resources nodemanager.SandboxResources, config BestOfKConfig) float64 { +func (b *BestOfK) Score(node *nodemanager.Node, resources nodemanager.SandboxResources, config BestOfKConfig, affinityScores ...map[string]float64) float64 { metrics := node.Metrics() // Get locally recorded resources that haven't been reported yet. @@ -61,7 +61,12 @@ func (b *BestOfK) Score(node *nodemanager.Node, resources nodemanager.SandboxRes cpuRequested := float64(resources.CPUs) - return (cpuRequested + float64(reserved) + config.Alpha*usageAvg) / totalCapacity + score := (cpuRequested + float64(reserved) + config.Alpha*usageAvg) / totalCapacity + if len(affinityScores) > 0 { + score -= affinityScores[0][node.ID] + } + + return score } // CanFit checks if the node can fit a new VM with the given quota @@ -111,19 +116,40 @@ func (b *BestOfK) UpdateConfig(config BestOfKConfig) { } // chooseNode selects the best node for placing a VM with the given quota -func (b *BestOfK) chooseNode(_ context.Context, nodes []*nodemanager.Node, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) (bestNode *nodemanager.Node, err error) { +func (b *BestOfK) chooseNode(_ context.Context, nodes []*nodemanager.Node, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string, affinityScores ...map[string]float64) (bestNode *nodemanager.Node, err error) { // Fix the config, we want to dynamically update it config := b.getConfig() + var affinity map[string]float64 + if len(affinityScores) > 0 { + affinity = affinityScores[0] + } // Filter eligible nodes candidates := b.sample(nodes, config, excludedNodes, resources, buildMachineInfo, filterByLabels, requiredLabels) + if len(affinity) > 0 { + seen := make(map[string]struct{}, len(candidates)) + for _, n := range candidates { + seen[n.ID] = struct{}{} + } + for _, n := range nodes { + if affinity[n.ID] <= 0 { + continue + } + if _, ok := seen[n.ID]; ok { + continue + } + if b.isCandidate(n, config, excludedNodes, resources, buildMachineInfo, filterByLabels, requiredLabels) { + candidates = append(candidates, n) + } + } + } // Find the best node among candidates bestScore := math.MaxFloat64 for _, node := range candidates { // Calculate score - score := b.Score(node, resources, config) + score := b.Score(node, resources, config, affinity) if score < bestScore { bestNode = node @@ -138,6 +164,34 @@ func (b *BestOfK) chooseNode(_ context.Context, nodes []*nodemanager.Node, exclu return bestNode, nil } +func (b *BestOfK) isCandidate(node *nodemanager.Node, config BestOfKConfig, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) bool { + if _, ok := excludedNodes[node.ID]; ok { + return false + } + // If the node is not ready, we don't want to schedule a new sandbox on it. + if node.Status() != api.NodeStatusReady { + return false + } + // If the node CPU doesn't match the requested machine, we skip it. + if !isNodeCPUCompatible(node, buildMachineInfo) { + return false + } + // If label filtering is enabled, the node has to match the team's required labels. + if filterByLabels && !isNodeLabelsCompatible(node, requiredLabels) { + return false + } + // If can-fit is enabled, the node must have enough capacity for the requested resources. + if config.CanFit && !b.CanFit(node, resources, config) { + return false + } + // Avoid placing on nodes that already have too many sandboxes starting. + if config.TooManyStarting && node.PlacementMetrics.InProgressCount() > maxStartingInstancesPerNode { + return false + } + + return true +} + // sample returns up to k items chosen uniformly from those passing ok. func (b *BestOfK) sample(items []*nodemanager.Node, config BestOfKConfig, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) []*nodemanager.Node { if config.K <= 0 || len(items) == 0 { @@ -163,40 +217,9 @@ func (b *BestOfK) sample(items []*nodemanager.Node, config BestOfKConfig, exclud n := items[pick] - // Excluded filter - if _, ok := excludedNodes[n.ID]; ok { - continue - } - - // If the node is not ready, skip it - if n.Status() != api.NodeStatusReady { - continue - } - - // Skip if node is not CPU compatible - if !isNodeCPUCompatible(n, buildMachineInfo) { - continue - } - - // Skip if node doesn't have the required labels - if filterByLabels && !isNodeLabelsCompatible(n, requiredLabels) { - continue - } - - if config.CanFit { - if !b.CanFit(n, resources, config) { - continue - } - } - - if config.TooManyStarting { - // To prevent overloading the node - if n.PlacementMetrics.InProgressCount() > maxStartingInstancesPerNode { - continue - } + if b.isCandidate(n, config, excludedNodes, resources, buildMachineInfo, filterByLabels, requiredLabels) { + candidates = append(candidates, n) } - - candidates = append(candidates, n) } return candidates diff --git a/packages/api/internal/orchestrator/placement/placement_test.go b/packages/api/internal/orchestrator/placement/placement_test.go index 3d27d43a8d..d91d49ccd4 100644 --- a/packages/api/internal/orchestrator/placement/placement_test.go +++ b/packages/api/internal/orchestrator/placement/placement_test.go @@ -25,7 +25,7 @@ type mockAlgorithm struct { mock.Mock } -func (m *mockAlgorithm) chooseNode(ctx context.Context, nodes []*nodemanager.Node, nodesExcluded map[string]struct{}, requested nodemanager.SandboxResources, buildCPUInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) (*nodemanager.Node, error) { +func (m *mockAlgorithm) chooseNode(ctx context.Context, nodes []*nodemanager.Node, nodesExcluded map[string]struct{}, requested nodemanager.SandboxResources, buildCPUInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string, _ ...map[string]float64) (*nodemanager.Node, error) { args := m.Called(ctx, nodes, nodesExcluded, requested, buildCPUInfo, filterByLabels, requiredLabels) if args.Get(0) == nil { return nil, args.Error(1) diff --git a/packages/api/internal/orchestrator/placement_affinity.go b/packages/api/internal/orchestrator/placement_affinity.go new file mode 100644 index 0000000000..aa641ba016 --- /dev/null +++ b/packages/api/internal/orchestrator/placement_affinity.go @@ -0,0 +1,146 @@ +package orchestrator + +import ( + "context" + "errors" + "fmt" + "math" + "time" + + "github.com/google/uuid" + "github.com/launchdarkly/go-sdk-common/v3/ldvalue" + "github.com/redis/go-redis/v9" + "go.uber.org/zap" + + "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" + "github.com/e2b-dev/infra/packages/shared/pkg/logger" +) + +const ( + placementAffinityMinTimeoutMs = 10 + placementAffinityMaxTimeoutMs = 2000 + defaultPlacementAffinityTTLSeconds = 90000 + defaultPlacementAffinityTopNodes = 20 + defaultPlacementAffinityReadTimeoutMs = 100 + defaultPlacementAffinityWriteTimeoutMs = 1000 + defaultPlacementAffinityMaxScore = 10 + defaultPlacementAffinityMaxScoreBonusPpm = 20000 + defaultPlacementAffinityBuildWeightPpm = 1000 + + placementAffinityKindBuild = "build" +) + +type placementAffinity struct { + redis redis.UniversalClient +} + +type placementAffinityConfig struct { + enabled bool + ttl time.Duration + topNodes int64 + readTimeout time.Duration + writeTimeout time.Duration + maxAffinityScore float64 + maxScoreBonus float64 + buildWeight float64 +} + +func newPlacementAffinity(redisClient redis.UniversalClient) *placementAffinity { + if redisClient == nil { + return nil + } + + return &placementAffinity{redis: redisClient} +} + +func placementAffinityConfigFromFlags(ctx context.Context, ff *featureflags.Client) placementAffinityConfig { + v := ff.JSONFlag(ctx, featureflags.SandboxPlacementCacheAffinityFlag) + + return placementAffinityConfig{ + enabled: v.GetByKey("enabled").BoolValue(), + ttl: time.Duration(jsonInt(v, "ttlSeconds", defaultPlacementAffinityTTLSeconds, 60, 90000)) * time.Second, + topNodes: int64(jsonInt(v, "topNodes", defaultPlacementAffinityTopNodes, 1, 100)), + readTimeout: time.Duration(jsonInt(v, "readTimeoutMs", defaultPlacementAffinityReadTimeoutMs, placementAffinityMinTimeoutMs, placementAffinityMaxTimeoutMs)) * time.Millisecond, + writeTimeout: time.Duration(jsonInt(v, "writeTimeoutMs", defaultPlacementAffinityWriteTimeoutMs, placementAffinityMinTimeoutMs, placementAffinityMaxTimeoutMs)) * time.Millisecond, + maxAffinityScore: float64(jsonInt(v, "maxAffinityScore", defaultPlacementAffinityMaxScore, 1, 1000)), + maxScoreBonus: jsonPPM(v, "maxScoreBonusPpm", defaultPlacementAffinityMaxScoreBonusPpm), + buildWeight: jsonPPM(v, "buildWeightPpm", defaultPlacementAffinityBuildWeightPpm), + } +} + +func jsonInt(v ldvalue.Value, key string, fallback, minValue, maxValue int) int { + value := v.GetByKey(key) + if value.IsNull() { + return fallback + } + + return clampInt(value.IntValue(), minValue, maxValue) +} + +func jsonPPM(v ldvalue.Value, key string, fallback int) float64 { + return float64(jsonInt(v, key, fallback, 0, 100000)) / 1000000 +} + +func clampInt(v, minValue, maxValue int) int { + return max(minValue, min(v, maxValue)) +} + +func placementAffinityKey(clusterID uuid.UUID, kind, id string) string { + return fmt.Sprintf("placement-affinity:%s:%s:%s", clusterID, kind, id) +} + +func (a *placementAffinity) record(ctx context.Context, cfg placementAffinityConfig, clusterID uuid.UUID, nodeID, buildID string) { + if a == nil || nodeID == "" || buildID == "" || cfg.buildWeight == 0 { + return + } + ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), cfg.writeTimeout) + defer cancel() + + pipe := a.redis.Pipeline() + key := placementAffinityKey(clusterID, placementAffinityKindBuild, buildID) + pipe.ZIncrBy(ctx, key, 1, nodeID) + pipe.ZRemRangeByRank(ctx, key, 0, -cfg.topNodes-1) + pipe.Expire(ctx, key, cfg.ttl) + + _, err := pipe.Exec(ctx) + if err != nil { + logger.L().Debug(ctx, "failed to record placement affinity", zap.Error(err)) + } +} + +func (a *placementAffinity) scores(ctx context.Context, cfg placementAffinityConfig, clusterID uuid.UUID, buildID string) map[string]float64 { + if a == nil || buildID == "" || cfg.buildWeight == 0 { + return nil + } + ctx, cancel := context.WithTimeout(ctx, cfg.readTimeout) + defer cancel() + + pipe := a.redis.Pipeline() + cmd := pipe.ZRevRangeWithScores(ctx, placementAffinityKey(clusterID, placementAffinityKindBuild, buildID), 0, cfg.topNodes-1) + _, err := pipe.Exec(ctx) + if err != nil && !errors.Is(err, redis.Nil) { + logger.L().Debug(ctx, "failed to read placement affinity", zap.Error(err)) + + return nil + } + + scores := make(map[string]float64) + rows, err := cmd.Result() + if err != nil { + return nil + } + for _, row := range rows { + nodeID, ok := row.Member.(string) + if !ok || nodeID == "" { + continue + } + scores[nodeID] += math.Min(row.Score, cfg.maxAffinityScore) * cfg.buildWeight + scores[nodeID] = math.Min(scores[nodeID], cfg.maxScoreBonus) + } + + if len(scores) == 0 { + return nil + } + + return scores +} diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index 068308e811..fee99566a1 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -161,6 +161,17 @@ var ( FreePageReportingFlag = NewBoolFlag("free-page-reporting", false) FreezeUserCgroupFlag = NewBoolFlag("freeze-user-cgroup", env.IsDevelopment()) + SandboxPlacementCacheAffinityFlag = NewJSONFlag("sandbox-placement-cache-affinity", ldvalue.FromJSONMarshal(map[string]any{ + "enabled": false, + "ttlSeconds": 90000, + "topNodes": 20, + "readTimeoutMs": 100, + "writeTimeoutMs": 1000, + "maxAffinityScore": 10, + "maxScoreBonusPpm": 20000, + "buildWeightPpm": 1000, + })) + NetworkTransformRulesFlag = NewBoolFlag("network-transform-rules", env.IsDevelopment()) // V4HeaderForUncompressedFlag forces the V4 header layout on uncompressed