-
Notifications
You must be signed in to change notification settings - Fork 328
Add cache-aware placement affinity #2867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e7a75c0
57f7c27
001c7a9
3eb3484
497d6cf
c8066f5
428d6ab
f20296d
17db418
922bd06
4afd0cc
efe533c
a466269
38d8022
35c3f9e
75d9d73
5b6375c
81d074e
a3c5f19
579a441
edbb4d7
d337651
c8b6882
8a6080c
08c664d
c6e1e0c
fdb1b1e
eab1dc2
cc81426
53bfe74
4317b91
aa3228a
dafa27b
9277fc3
e6620ef
b0d874a
202bab4
3d36202
b9916c4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ import ( | |
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "net" | ||
| "net/http" | ||
| "time" | ||
|
|
||
|
|
@@ -22,6 +23,7 @@ import ( | |
| "github.com/e2b-dev/infra/packages/db/queries" | ||
| "github.com/e2b-dev/infra/packages/shared/pkg/clusters" | ||
| "github.com/e2b-dev/infra/packages/shared/pkg/consts" | ||
| "github.com/e2b-dev/infra/packages/shared/pkg/env" | ||
| "github.com/e2b-dev/infra/packages/shared/pkg/fcversion" | ||
| "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" | ||
| "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator" | ||
|
|
@@ -298,10 +300,40 @@ func (o *Orchestrator) CreateSandbox( | |
|
|
||
| nodeClusterID := clusters.WithClusterFallback(team.ClusterID) | ||
| clusterNodes := o.GetClusterNodes(nodeClusterID) | ||
| if len(clusterNodes) == 0 { | ||
| if nodeClusterID == consts.LocalClusterID { | ||
| o.discoverNomadNodes(ctx) | ||
| if len(o.GetClusterNodes(nodeClusterID)) == 0 { | ||
| _ = o.connectToNode(ctx, nodemanager.NomadServiceDiscovery{ | ||
| NomadNodeShortID: "local", | ||
| OrchestratorAddress: net.JoinHostPort("localhost", env.GetEnv("ORCHESTRATOR_PORT", "5008")), | ||
| IPAddress: "localhost", | ||
| }) | ||
| } | ||
| } else { | ||
| o.discoverClusterNode(ctx, nodeClusterID) | ||
| } | ||
| clusterNodes = o.GetClusterNodes(nodeClusterID) | ||
| } | ||
|
|
||
| labelFilteringEnabled := o.featureFlagsClient.BoolFlag(ctx, featureflags.SandboxLabelBasedSchedulingFlag, featureflags.TeamContext(team.ID.String()), featureflags.SandboxContext(sandboxID)) | ||
| affinityBuildID := sbxData.Build.ID.String() | ||
| if isResume { | ||
| // Resume targets the previous sandbox's node first; avoid cache affinity fighting that pin. | ||
| affinityBuildID = "" | ||
| } | ||
|
ValentaTomas marked this conversation as resolved.
|
||
| affinityFlagCtx := featureflags.AddToContext(ctx, featureflags.TeamContext(team.ID.String()), featureflags.TemplateContext(sbxData.TemplateID), featureflags.SandboxContext(sandboxID)) | ||
| placementCacheAffinityConfig := placementAffinityConfigFromFlags(affinityFlagCtx, o.featureFlagsClient) | ||
| var affinityScores map[string]float64 | ||
| if placementCacheAffinityConfig.enabled && node == nil { | ||
| affinityScores = o.placementAffinity.scores(ctx, placementCacheAffinityConfig, nodeClusterID, affinityBuildID) | ||
| } | ||
|
|
||
| node, err = placement.PlaceSandbox(ctx, o.placementAlgorithm, clusterNodes, node, sbxRequest, builds.ToMachineInfo(sbxData.Build), labelFilteringEnabled, team.SandboxSchedulingLabels) | ||
| if affinityScores == nil { | ||
| node, err = placement.PlaceSandbox(ctx, o.placementAlgorithm, clusterNodes, node, sbxRequest, builds.ToMachineInfo(sbxData.Build), labelFilteringEnabled, team.SandboxSchedulingLabels) | ||
| } else { | ||
| node, err = placement.PlaceSandbox(ctx, o.placementAlgorithm, clusterNodes, node, sbxRequest, builds.ToMachineInfo(sbxData.Build), labelFilteringEnabled, team.SandboxSchedulingLabels, affinityScores) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the nil check here might not be required |
||
| } | ||
| if err != nil { | ||
| return sandbox.Sandbox{}, &api.APIError{ | ||
| Code: http.StatusInternalServerError, | ||
|
|
@@ -317,6 +349,9 @@ func (o *Orchestrator) CreateSandbox( | |
| attribute.Bool("node_affinity_success", sbxData.NodeID != nil && node.ID == *sbxData.NodeID), | ||
| } | ||
| o.createdSandboxesCounter.Add(ctx, 1, metric.WithAttributes(attributes...)) | ||
| if placementCacheAffinityConfig.enabled { | ||
| go o.placementAffinity.record(ctx, placementCacheAffinityConfig, nodeClusterID, node.ID, affinityBuildID) | ||
| } | ||
|
|
||
| telemetry.SetAttributes(ctx, attribute.String("node.id", node.ID)) | ||
| telemetry.ReportEvent(ctx, "Created sandbox") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,5 +2,6 @@ package placement | |
|
|
||
| const ( | ||
| maxRetries = 3 | ||
| maxExhaustedRetries = 100 | ||
| maxStartingInstancesPerNode = 3 | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,7 +36,7 @@ func DefaultBestOfKConfig() BestOfKConfig { | |
| } | ||
|
|
||
| // Score calculates the placement score for this node | ||
| func (b *BestOfK) Score(node *nodemanager.Node, resources nodemanager.SandboxResources, config BestOfKConfig) float64 { | ||
| func (b *BestOfK) Score(node *nodemanager.Node, resources nodemanager.SandboxResources, config BestOfKConfig, affinityScores ...map[string]float64) float64 { | ||
| metrics := node.Metrics() | ||
|
|
||
| // Get locally recorded resources that haven't been reported yet. | ||
|
|
@@ -61,7 +61,12 @@ func (b *BestOfK) Score(node *nodemanager.Node, resources nodemanager.SandboxRes | |
|
|
||
| cpuRequested := float64(resources.CPUs) | ||
|
|
||
| return (cpuRequested + float64(reserved) + config.Alpha*usageAvg) / totalCapacity | ||
| score := (cpuRequested + float64(reserved) + config.Alpha*usageAvg) / totalCapacity | ||
| if len(affinityScores) > 0 { | ||
| score -= affinityScores[0][node.ID] | ||
| } | ||
|
ValentaTomas marked this conversation as resolved.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Affinity bonus not normalized by totalCapacity in ScoreMedium Severity The base placement score is divided by Reviewed by Cursor Bugbot for commit b9916c4. Configure here. |
||
|
|
||
| return score | ||
| } | ||
|
|
||
| // CanFit checks if the node can fit a new VM with the given quota | ||
|
|
@@ -111,19 +116,40 @@ func (b *BestOfK) UpdateConfig(config BestOfKConfig) { | |
| } | ||
|
|
||
| // chooseNode selects the best node for placing a VM with the given quota | ||
| func (b *BestOfK) chooseNode(_ context.Context, nodes []*nodemanager.Node, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) (bestNode *nodemanager.Node, err error) { | ||
| func (b *BestOfK) chooseNode(_ context.Context, nodes []*nodemanager.Node, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string, affinityScores ...map[string]float64) (bestNode *nodemanager.Node, err error) { | ||
| // Fix the config, we want to dynamically update it | ||
| config := b.getConfig() | ||
| var affinity map[string]float64 | ||
| if len(affinityScores) > 0 { | ||
| affinity = affinityScores[0] | ||
| } | ||
|
|
||
| // Filter eligible nodes | ||
| candidates := b.sample(nodes, config, excludedNodes, resources, buildMachineInfo, filterByLabels, requiredLabels) | ||
| if len(affinity) > 0 { | ||
| seen := make(map[string]struct{}, len(candidates)) | ||
| for _, n := range candidates { | ||
| seen[n.ID] = struct{}{} | ||
| } | ||
| for _, n := range nodes { | ||
| if affinity[n.ID] <= 0 { | ||
| continue | ||
| } | ||
| if _, ok := seen[n.ID]; ok { | ||
| continue | ||
| } | ||
| if b.isCandidate(n, config, excludedNodes, resources, buildMachineInfo, filterByLabels, requiredLabels) { | ||
|
ValentaTomas marked this conversation as resolved.
|
||
| candidates = append(candidates, n) | ||
| } | ||
| } | ||
| } | ||
|
cursor[bot] marked this conversation as resolved.
|
||
|
|
||
| // Find the best node among candidates | ||
| bestScore := math.MaxFloat64 | ||
|
|
||
| for _, node := range candidates { | ||
| // Calculate score | ||
| score := b.Score(node, resources, config) | ||
| score := b.Score(node, resources, config, affinity) | ||
|
|
||
| if score < bestScore { | ||
| bestNode = node | ||
|
|
@@ -138,6 +164,34 @@ func (b *BestOfK) chooseNode(_ context.Context, nodes []*nodemanager.Node, exclu | |
| return bestNode, nil | ||
| } | ||
|
|
||
| func (b *BestOfK) isCandidate(node *nodemanager.Node, config BestOfKConfig, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) bool { | ||
| if _, ok := excludedNodes[node.ID]; ok { | ||
| return false | ||
| } | ||
| // If the node is not ready, we don't want to schedule a new sandbox on it. | ||
| if node.Status() != api.NodeStatusReady { | ||
| return false | ||
| } | ||
| // If the node CPU doesn't match the requested machine, we skip it. | ||
| if !isNodeCPUCompatible(node, buildMachineInfo) { | ||
| return false | ||
| } | ||
| // If label filtering is enabled, the node has to match the team's required labels. | ||
| if filterByLabels && !isNodeLabelsCompatible(node, requiredLabels) { | ||
| return false | ||
| } | ||
| // If can-fit is enabled, the node must have enough capacity for the requested resources. | ||
| if config.CanFit && !b.CanFit(node, resources, config) { | ||
| return false | ||
| } | ||
| // Avoid placing on nodes that already have too many sandboxes starting. | ||
| if config.TooManyStarting && node.PlacementMetrics.InProgressCount() > maxStartingInstancesPerNode { | ||
| return false | ||
| } | ||
|
|
||
| return true | ||
| } | ||
|
|
||
| // sample returns up to k items chosen uniformly from those passing ok. | ||
| func (b *BestOfK) sample(items []*nodemanager.Node, config BestOfKConfig, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) []*nodemanager.Node { | ||
| if config.K <= 0 || len(items) == 0 { | ||
|
|
@@ -163,40 +217,9 @@ func (b *BestOfK) sample(items []*nodemanager.Node, config BestOfKConfig, exclud | |
|
|
||
| n := items[pick] | ||
|
|
||
| // Excluded filter | ||
| if _, ok := excludedNodes[n.ID]; ok { | ||
| continue | ||
| } | ||
|
|
||
| // If the node is not ready, skip it | ||
| if n.Status() != api.NodeStatusReady { | ||
| continue | ||
| } | ||
|
|
||
| // Skip if node is not CPU compatible | ||
| if !isNodeCPUCompatible(n, buildMachineInfo) { | ||
| continue | ||
| } | ||
|
|
||
| // Skip if node doesn't have the required labels | ||
| if filterByLabels && !isNodeLabelsCompatible(n, requiredLabels) { | ||
| continue | ||
| } | ||
|
|
||
| if config.CanFit { | ||
| if !b.CanFit(n, resources, config) { | ||
| continue | ||
| } | ||
| } | ||
|
|
||
| if config.TooManyStarting { | ||
| // To prevent overloading the node | ||
|
ValentaTomas marked this conversation as resolved.
|
||
| if n.PlacementMetrics.InProgressCount() > maxStartingInstancesPerNode { | ||
| continue | ||
| } | ||
| if b.isCandidate(n, config, excludedNodes, resources, buildMachineInfo, filterByLabels, requiredLabels) { | ||
| candidates = append(candidates, n) | ||
| } | ||
|
|
||
| candidates = append(candidates, n) | ||
| } | ||
|
|
||
| return candidates | ||
|
|
||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this fix?