Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
e7a75c0
feat(api): add cache-aware placement affinity
ValentaTomas May 30, 2026
57f7c27
fix(api): batch placement affinity redis calls
ValentaTomas May 30, 2026
001c7a9
fix(api): resolve placement affinity lint
ValentaTomas May 30, 2026
3eb3484
fix(api): make placement affinity tunable
ValentaTomas May 30, 2026
497d6cf
fix(api): avoid high-cardinality resume affinity keys
ValentaTomas May 30, 2026
c8066f5
fix(api): skip unused placement affinity reads
ValentaTomas May 30, 2026
428d6ab
fix(api): scope placement affinity by team template
ValentaTomas May 30, 2026
f20296d
fix(api): add low-weight team placement affinity
ValentaTomas May 30, 2026
17db418
fix(api): default placement affinity to build keys
ValentaTomas May 30, 2026
922bd06
fix(api): keep placement affinity build-only
ValentaTomas May 30, 2026
4afd0cc
fix(api): skip exhausted affinity nodes
ValentaTomas May 30, 2026
efe533c
fix(api): keep single-node placement retries
ValentaTomas May 30, 2026
a466269
fix(api): bound exhausted-node placement retries
ValentaTomas May 30, 2026
38d8022
fix(api): refresh empty placement node pools
ValentaTomas May 30, 2026
35c3f9e
fix(api): discover local combined orchestrators
ValentaTomas May 30, 2026
75d9d73
fix(api): connect local orchestrator on empty placement
ValentaTomas May 30, 2026
5b6375c
fix(api): allow local placement while metrics settle
ValentaTomas May 30, 2026
81d074e
fix(api): use nomad refresh for local placement cache
ValentaTomas May 30, 2026
a3c5f19
fix(api): tighten placement affinity refresh and scoring
ValentaTomas May 30, 2026
579a441
fix(api): record placement affinity asynchronously
ValentaTomas May 31, 2026
edbb4d7
fix(api): restore local placement fallback
ValentaTomas May 31, 2026
d337651
fix(api): sync local combined orchestrator nodes
ValentaTomas May 31, 2026
c8b6882
fix(api): retry full exhausted pool instead of pinning last node
ValentaTomas May 31, 2026
8a6080c
fix(api): refresh local cluster before placement
ValentaTomas May 31, 2026
08c664d
fix(api): drive exhausted-pool retry off node selection failure
ValentaTomas May 31, 2026
c6e1e0c
fix(api): connect local builders for placement fallback
ValentaTomas May 31, 2026
fdb1b1e
fix(api): keep local placement capacity filters
ValentaTomas May 31, 2026
eab1dc2
fix(api): wait for exhausted placement capacity
ValentaTomas May 31, 2026
cc81426
fix(api): ignore builders for placement discovery
ValentaTomas May 31, 2026
53bfe74
Merge branch 'valenta/api-placement-affinity' of https://github.com/e…
ValentaTomas May 31, 2026
4317b91
fix(api): bound exhausted placement retries
ValentaTomas May 31, 2026
aa3228a
fix(api): bound exhausted placement retries
ValentaTomas May 31, 2026
dafa27b
Merge branch 'valenta/api-placement-affinity' of https://github.com/e…
ValentaTomas May 31, 2026
9277fc3
chore(api): trim placement affinity tests
ValentaTomas May 31, 2026
e6620ef
fix(api): connect local combined service for placement
ValentaTomas May 31, 2026
b0d874a
fix(api): keep exhausted retries separate
ValentaTomas May 31, 2026
202bab4
Merge branch 'valenta/api-placement-affinity' of https://github.com/e…
ValentaTomas May 31, 2026
3d36202
fix(api): avoid builder-only placement fallback
ValentaTomas May 31, 2026
b9916c4
fix(api): keep affinity bonus on score scale
ValentaTomas May 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion packages/api/internal/orchestrator/create_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"net"
"net/http"
"time"

Expand All @@ -22,6 +23,7 @@ import (
"github.com/e2b-dev/infra/packages/db/queries"
"github.com/e2b-dev/infra/packages/shared/pkg/clusters"
"github.com/e2b-dev/infra/packages/shared/pkg/consts"
"github.com/e2b-dev/infra/packages/shared/pkg/env"
"github.com/e2b-dev/infra/packages/shared/pkg/fcversion"
"github.com/e2b-dev/infra/packages/shared/pkg/featureflags"
"github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator"
Expand Down Expand Up @@ -298,10 +300,40 @@ func (o *Orchestrator) CreateSandbox(

nodeClusterID := clusters.WithClusterFallback(team.ClusterID)
clusterNodes := o.GetClusterNodes(nodeClusterID)
if len(clusterNodes) == 0 {
if nodeClusterID == consts.LocalClusterID {
o.discoverNomadNodes(ctx)
if len(o.GetClusterNodes(nodeClusterID)) == 0 {
_ = o.connectToNode(ctx, nodemanager.NomadServiceDiscovery{
NomadNodeShortID: "local",
OrchestratorAddress: net.JoinHostPort("localhost", env.GetEnv("ORCHESTRATOR_PORT", "5008")),
IPAddress: "localhost",
})
}
} else {
o.discoverClusterNode(ctx, nodeClusterID)
}
clusterNodes = o.GetClusterNodes(nodeClusterID)
}
Comment on lines +303 to +317
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this fix?


labelFilteringEnabled := o.featureFlagsClient.BoolFlag(ctx, featureflags.SandboxLabelBasedSchedulingFlag, featureflags.TeamContext(team.ID.String()), featureflags.SandboxContext(sandboxID))
affinityBuildID := sbxData.Build.ID.String()
if isResume {
// Resume targets the previous sandbox's node first; avoid cache affinity fighting that pin.
affinityBuildID = ""
}
Comment thread
ValentaTomas marked this conversation as resolved.
affinityFlagCtx := featureflags.AddToContext(ctx, featureflags.TeamContext(team.ID.String()), featureflags.TemplateContext(sbxData.TemplateID), featureflags.SandboxContext(sandboxID))
placementCacheAffinityConfig := placementAffinityConfigFromFlags(affinityFlagCtx, o.featureFlagsClient)
var affinityScores map[string]float64
if placementCacheAffinityConfig.enabled && node == nil {
affinityScores = o.placementAffinity.scores(ctx, placementCacheAffinityConfig, nodeClusterID, affinityBuildID)
}

node, err = placement.PlaceSandbox(ctx, o.placementAlgorithm, clusterNodes, node, sbxRequest, builds.ToMachineInfo(sbxData.Build), labelFilteringEnabled, team.SandboxSchedulingLabels)
if affinityScores == nil {
node, err = placement.PlaceSandbox(ctx, o.placementAlgorithm, clusterNodes, node, sbxRequest, builds.ToMachineInfo(sbxData.Build), labelFilteringEnabled, team.SandboxSchedulingLabels)
} else {
node, err = placement.PlaceSandbox(ctx, o.placementAlgorithm, clusterNodes, node, sbxRequest, builds.ToMachineInfo(sbxData.Build), labelFilteringEnabled, team.SandboxSchedulingLabels, affinityScores)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the nil check here might not be required

}
if err != nil {
return sandbox.Sandbox{}, &api.APIError{
Code: http.StatusInternalServerError,
Expand All @@ -317,6 +349,9 @@ func (o *Orchestrator) CreateSandbox(
attribute.Bool("node_affinity_success", sbxData.NodeID != nil && node.ID == *sbxData.NodeID),
}
o.createdSandboxesCounter.Add(ctx, 1, metric.WithAttributes(attributes...))
if placementCacheAffinityConfig.enabled {
go o.placementAffinity.record(ctx, placementCacheAffinityConfig, nodeClusterID, node.ID, affinityBuildID)
}

telemetry.SetAttributes(ctx, attribute.String("node.id", node.ID))
telemetry.ReportEvent(ctx, "Created sandbox")
Expand Down
2 changes: 2 additions & 0 deletions packages/api/internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Orchestrator struct {

snapshotUpsertSem *utils.AdjustableSemaphore
redisStorage *redisbackend.Storage
placementAffinity *placementAffinity

// connectGroup deduplicates concurrent dial+register attempts for the same
// physical node. It is keyed by NomadNodeShortID (Nomad-managed nodes) or
Expand Down Expand Up @@ -147,6 +148,7 @@ func New(
tel: tel,
clusters: clusters,
redisStorage: redisStorage,
placementAffinity: newPlacementAffinity(redisClient),

createdCounter: createdCounter,

Expand Down
1 change: 1 addition & 0 deletions packages/api/internal/orchestrator/placement/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ package placement

const (
maxRetries = 3
maxExhaustedRetries = 100
maxStartingInstancesPerNode = 3
)
37 changes: 31 additions & 6 deletions packages/api/internal/orchestrator/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"go.opentelemetry.io/otel"
"go.uber.org/zap"
Expand All @@ -26,14 +27,16 @@ var errSandboxCreateFailed = errors.New("failed to create a new sandbox, if the
// Implementations should choose an optimal node based on available resources
// and current load distribution.
type Algorithm interface {
chooseNode(ctx context.Context, nodes []*nodemanager.Node, nodesExcluded map[string]struct{}, requested nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) (*nodemanager.Node, error)
chooseNode(ctx context.Context, nodes []*nodemanager.Node, nodesExcluded map[string]struct{}, requested nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string, affinityScores ...map[string]float64) (*nodemanager.Node, error)
}

func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*nodemanager.Node, preferredNode *nodemanager.Node, sbxRequest *orchestrator.SandboxCreateRequest, buildMachineInfo machineinfo.MachineInfo, labelFilteringEnabled bool, requiredLabels []string) (*nodemanager.Node, error) {
func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*nodemanager.Node, preferredNode *nodemanager.Node, sbxRequest *orchestrator.SandboxCreateRequest, buildMachineInfo machineinfo.MachineInfo, labelFilteringEnabled bool, requiredLabels []string, affinityScores ...map[string]float64) (*nodemanager.Node, error) {
ctx, span := tracer.Start(ctx, "place-sandbox")
defer span.End()

nodesExcluded := make(map[string]struct{})
nodesExcluded := make(map[string]struct{}) // hard failures, never retried
nodesExhausted := make(map[string]struct{}) // capacity-exhausted, retried as a pool
exhaustedRetries := 0
var err error

var node *nodemanager.Node
Expand All @@ -53,12 +56,33 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node
if node != nil {
telemetry.ReportEvent(ctx, "Placing sandbox on the preferred node", telemetry.WithNodeID(node.ID))
} else {
if len(nodesExcluded) >= len(clusterNodes) {
return nil, errors.New("no nodes available")
skip := make(map[string]struct{}, len(nodesExcluded)+len(nodesExhausted))
for id := range nodesExcluded {
skip[id] = struct{}{}
}
for id := range nodesExhausted {
skip[id] = struct{}{}
}

node, err = algorithm.chooseNode(ctx, clusterNodes, nodesExcluded, nodemanager.SandboxResources{CPUs: sbxRequest.GetSandbox().GetVcpu(), MiBMemory: sbxRequest.GetSandbox().GetRamMb()}, buildMachineInfo, labelFilteringEnabled, requiredLabels)
if len(skip) < len(clusterNodes) {
node, err = algorithm.chooseNode(ctx, clusterNodes, skip, nodemanager.SandboxResources{CPUs: sbxRequest.GetSandbox().GetVcpu(), MiBMemory: sbxRequest.GetSandbox().GetRamMb()}, buildMachineInfo, labelFilteringEnabled, requiredLabels, affinityScores...)
} else {
node, err = nil, errors.New("no nodes available")
}
if err != nil {
// No eligible node. If some were only capacity-exhausted, retry the
// whole exhausted pool since capacity may free up.
if len(nodesExhausted) > 0 {
exhaustedRetries++
if exhaustedRetries >= maxExhaustedRetries {
return nil, err
}
clear(nodesExhausted)
time.Sleep(100 * time.Millisecond)

continue
Comment thread
ValentaTomas marked this conversation as resolved.
}

return nil, err
}

Expand Down Expand Up @@ -102,6 +126,7 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node

switch statusCode {
case codes.ResourceExhausted:
nodesExhausted[failedNode.ID] = struct{}{}
failedNode.PlacementMetrics.Skip(sbxRequest.GetSandbox().GetSandboxId())
logger.L().Warn(ctx, "Node exhausted, trying another node", logger.WithSandboxID(sbxRequest.GetSandbox().GetSandboxId()), logger.WithNodeID(failedNode.ID))
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func DefaultBestOfKConfig() BestOfKConfig {
}

// Score calculates the placement score for this node
func (b *BestOfK) Score(node *nodemanager.Node, resources nodemanager.SandboxResources, config BestOfKConfig) float64 {
func (b *BestOfK) Score(node *nodemanager.Node, resources nodemanager.SandboxResources, config BestOfKConfig, affinityScores ...map[string]float64) float64 {
metrics := node.Metrics()

// Get locally recorded resources that haven't been reported yet.
Expand All @@ -61,7 +61,12 @@ func (b *BestOfK) Score(node *nodemanager.Node, resources nodemanager.SandboxRes

cpuRequested := float64(resources.CPUs)

return (cpuRequested + float64(reserved) + config.Alpha*usageAvg) / totalCapacity
score := (cpuRequested + float64(reserved) + config.Alpha*usageAvg) / totalCapacity
if len(affinityScores) > 0 {
score -= affinityScores[0][node.ID]
}
Comment thread
ValentaTomas marked this conversation as resolved.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Affinity bonus not normalized by totalCapacity in Score

Medium Severity

The base placement score is divided by totalCapacity (R * cpuCount), but the affinity bonus subtraction on line 66 is applied as a raw absolute value without the same normalization. The PR discussion explicitly states this was fixed ("normalizing the affinity bonus in Score on the same totalCapacity denominator as the base score"), but the code subtracts affinityScores[0][node.ID] directly from the already-normalized score. This causes the affinity bonus to have a disproportionately larger relative effect on high-CPU nodes (where base scores are smaller) compared to low-CPU nodes.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit b9916c4. Configure here.


return score
}

// CanFit checks if the node can fit a new VM with the given quota
Expand Down Expand Up @@ -111,19 +116,40 @@ func (b *BestOfK) UpdateConfig(config BestOfKConfig) {
}

// chooseNode selects the best node for placing a VM with the given quota
func (b *BestOfK) chooseNode(_ context.Context, nodes []*nodemanager.Node, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) (bestNode *nodemanager.Node, err error) {
func (b *BestOfK) chooseNode(_ context.Context, nodes []*nodemanager.Node, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string, affinityScores ...map[string]float64) (bestNode *nodemanager.Node, err error) {
// Fix the config, we want to dynamically update it
config := b.getConfig()
var affinity map[string]float64
if len(affinityScores) > 0 {
affinity = affinityScores[0]
}

// Filter eligible nodes
candidates := b.sample(nodes, config, excludedNodes, resources, buildMachineInfo, filterByLabels, requiredLabels)
if len(affinity) > 0 {
seen := make(map[string]struct{}, len(candidates))
for _, n := range candidates {
seen[n.ID] = struct{}{}
}
for _, n := range nodes {
if affinity[n.ID] <= 0 {
continue
}
if _, ok := seen[n.ID]; ok {
continue
}
if b.isCandidate(n, config, excludedNodes, resources, buildMachineInfo, filterByLabels, requiredLabels) {
Comment thread
ValentaTomas marked this conversation as resolved.
candidates = append(candidates, n)
}
}
}
Comment thread
cursor[bot] marked this conversation as resolved.

// Find the best node among candidates
bestScore := math.MaxFloat64

for _, node := range candidates {
// Calculate score
score := b.Score(node, resources, config)
score := b.Score(node, resources, config, affinity)

if score < bestScore {
bestNode = node
Expand All @@ -138,6 +164,34 @@ func (b *BestOfK) chooseNode(_ context.Context, nodes []*nodemanager.Node, exclu
return bestNode, nil
}

func (b *BestOfK) isCandidate(node *nodemanager.Node, config BestOfKConfig, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) bool {
if _, ok := excludedNodes[node.ID]; ok {
return false
}
// If the node is not ready, we don't want to schedule a new sandbox on it.
if node.Status() != api.NodeStatusReady {
return false
}
// If the node CPU doesn't match the requested machine, we skip it.
if !isNodeCPUCompatible(node, buildMachineInfo) {
return false
}
// If label filtering is enabled, the node has to match the team's required labels.
if filterByLabels && !isNodeLabelsCompatible(node, requiredLabels) {
return false
}
// If can-fit is enabled, the node must have enough capacity for the requested resources.
if config.CanFit && !b.CanFit(node, resources, config) {
return false
}
// Avoid placing on nodes that already have too many sandboxes starting.
if config.TooManyStarting && node.PlacementMetrics.InProgressCount() > maxStartingInstancesPerNode {
return false
}

return true
}

// sample returns up to k items chosen uniformly from those passing ok.
func (b *BestOfK) sample(items []*nodemanager.Node, config BestOfKConfig, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) []*nodemanager.Node {
if config.K <= 0 || len(items) == 0 {
Expand All @@ -163,40 +217,9 @@ func (b *BestOfK) sample(items []*nodemanager.Node, config BestOfKConfig, exclud

n := items[pick]

// Excluded filter
if _, ok := excludedNodes[n.ID]; ok {
continue
}

// If the node is not ready, skip it
if n.Status() != api.NodeStatusReady {
continue
}

// Skip if node is not CPU compatible
if !isNodeCPUCompatible(n, buildMachineInfo) {
continue
}

// Skip if node doesn't have the required labels
if filterByLabels && !isNodeLabelsCompatible(n, requiredLabels) {
continue
}

if config.CanFit {
if !b.CanFit(n, resources, config) {
continue
}
}

if config.TooManyStarting {
// To prevent overloading the node
Comment thread
ValentaTomas marked this conversation as resolved.
if n.PlacementMetrics.InProgressCount() > maxStartingInstancesPerNode {
continue
}
if b.isCandidate(n, config, excludedNodes, resources, buildMachineInfo, filterByLabels, requiredLabels) {
candidates = append(candidates, n)
}

candidates = append(candidates, n)
}

return candidates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type mockAlgorithm struct {
mock.Mock
}

func (m *mockAlgorithm) chooseNode(ctx context.Context, nodes []*nodemanager.Node, nodesExcluded map[string]struct{}, requested nodemanager.SandboxResources, buildCPUInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) (*nodemanager.Node, error) {
func (m *mockAlgorithm) chooseNode(ctx context.Context, nodes []*nodemanager.Node, nodesExcluded map[string]struct{}, requested nodemanager.SandboxResources, buildCPUInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string, _ ...map[string]float64) (*nodemanager.Node, error) {
args := m.Called(ctx, nodes, nodesExcluded, requested, buildCPUInfo, filterByLabels, requiredLabels)
if args.Get(0) == nil {
return nil, args.Error(1)
Expand Down
Loading
Loading