refactor(orchestrator): Fix leaking resources on shutdown and add framework for better lifecycle tracking#2770
refactor(orchestrator): Fix leaking resources on shutdown and add framework for better lifecycle tracking#2770wj-e2b wants to merge 7 commits into
Conversation
PR SummaryHigh Risk Overview It also changes teardown semantics in ways that can cause cross-sandbox impact if incorrect, notably deleting the nftables table by a constant name in Reviewed by Cursor Bugbot for commit 6443f22. Bugbot is set up for automated code reviews on this repo. Configure here. |
❌ 3 Tests Failed:
View the full list of 7 ❄️ flaky test(s)
To view more test analytics, go to the Test Analytics Dashboard |
There was a problem hiding this comment.
Code Review
The use of wg.Go in ForceStopSandboxes will cause a compilation error because sync.WaitGroup does not provide that method. In doStop, returning early when WaitWithContext fails prevents memory resource cleanup, which can lead to leaked userfaultfd processes.
7324b01 to
8c5b901
Compare
8c5b901 to
6443f22
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 6443f22c3b
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if err := closer.close(closeCtx); err != nil { | ||
| clog.Error(ctx, "error during shutdown", zap.Error(err)) | ||
| success = false | ||
| registerClose("sandbox drain", []string{"orchestrator server", "network pool", "device pool"}, func(context.Context) error { |
There was a problem hiding this comment.
Close request ingress before starting sandbox drain
shutdown.Stop runs units in reverse topological order, and this registers sandbox drain after grpc server without any dependency between them, so drain executes while gRPC is still serving. In that window new template build requests can still start sandboxes (the earlier tmpl.Wait only waits for currently running builds and does not stop new ones), which can keep the sandbox count non-zero until timeout and trigger unnecessary forced shutdown. Add ordering so gRPC/HTTP listeners are stopped before entering the drain phase.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
To double down on codex's comment, this close needs to happen as literally the first thing to happen on any signals, before any other closing can happen, and it needs to happen on its own (no concurrent closing can happen at the same time). it's not clear that's happening here.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 6443f22. Configure here.
| return s.waitSandboxLifecycles(ctx) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
ForceStopSandboxes uses canceled closeCtx for waitSandboxLifecycles indirectly
Medium Severity
DrainSandboxes checks s.sandboxFactory.Sandboxes.Count() which only counts the live map. Once the count reaches zero, it calls waitSandboxLifecycles(ctx) to wait for lifecycle goroutines. However, the drain loop polls only every 5 seconds (sandboxDrainLogInterval). If the last sandbox exits immediately after a tick fires and shows remaining > 0, the drain will wait up to 5 additional seconds before detecting completion. More importantly, Count() reflects the live map (cleared by MarkStopping), but sandboxes could be removed from live by the normal stop path while their lifecycle goroutine is still running Close. The drain correctly calls waitSandboxLifecycles afterward, but the gap between "live count is 0" and "all lifecycle goroutines done" is only bridged if the drain context doesn't expire in between.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 6443f22. Configure here.
| } | ||
|
|
||
| return nil | ||
| }) |
There was a problem hiding this comment.
Sandbox drain closure captures outer closeCtx ignoring parameter
Low Severity
The closure passed to registerClose for "sandbox drain" has an unnamed context.Context parameter but references closeCtx from the outer scope instead of using its parameter. While they happen to be the same value today (since shutdown.Stop(closeCtx) passes it through), this creates a fragile coupling. If the lifecycle manager's Stop method ever passes a different context (e.g., with additional deadline), the closure would ignore it.
Reviewed by Cursor Bugbot for commit 6443f22. Configure here.
| registerUnit := func(unit lifecycle.Unit) { | ||
| if err := shutdown.Register(unit); err != nil { | ||
| logger.L().Fatal(ctx, "failed to register lifecycle unit", zap.String("unit", unit.Name), zap.Error(err)) | ||
| } | ||
| } | ||
| registerClose := func(name string, after []string, closeFn func(context.Context) error) { | ||
| registerUnit(lifecycle.Unit{ | ||
| Name: name, | ||
| After: after, | ||
| Stop: func(closeCtx context.Context) error { | ||
| clog := globalLogger.With(zap.String("service", name), zap.Bool("forced", config.ForceStop)) | ||
| clog.Info(ctx, "closing") | ||
|
|
||
| if err := closeFn(closeCtx); err != nil { | ||
| clog.Error(ctx, "error during shutdown", zap.Error(err)) | ||
|
|
||
| return err | ||
| } | ||
|
|
||
| return nil | ||
| }, | ||
| }) | ||
| } |
There was a problem hiding this comment.
These feel like they should be part of the lifecycle.Manager struct, rather than helper functions here, no?
| } | ||
| templateCache.Start(ctx) | ||
| closers = append(closers, closer{"template cache", func(context.Context) error { | ||
| registerClose("template cache", []string{"feature flags", "limiter"}, func(context.Context) error { |
There was a problem hiding this comment.
The dependencies-as-strings make me a little nervous, in terms of resiliency to future changes. At the least they should be consts, but if you're going to build dependency injection from scratch, may as well use types. Or use fx!
| if unit.Stop == nil { | ||
| registerUnit(unit) | ||
| } else { | ||
| registerClose(unit.Name, unit.After, unit.Stop) | ||
| } |
| } | ||
|
|
||
| if _, ok := m.names[unit.Name]; ok { | ||
| return fmt.Errorf("duplicate lifecycle unit %q", unit.Name) |
There was a problem hiding this comment.
If we allow these units to be replaced, it gives us an extension point to swap out implementations before executing in the future.
| } | ||
|
|
||
| m.names[unit.Name] = struct{}{} | ||
| unit.After = append([]string(nil), unit.After...) |
| return errors.Join(errs...) | ||
| } | ||
|
|
||
| func (m *Manager) startOrder() ([]Unit, error) { |
There was a problem hiding this comment.
This function in particular makes me want to use a library sooner rather than later.
| live *smap.Map[*Sandbox] | ||
| network *smap.Map[*Sandbox] | ||
| live *smap.Map[*Sandbox] | ||
| lifecycles *smap.Map[lifecycleEntry] |
There was a problem hiding this comment.
Is this a superset of the live and network maps? Can we remove either of those and exclusively use this new map?
| // MaxStartingInstancesPerNode feature flag and resize the semaphore. | ||
| const startingSandboxesLimitRefreshInterval = 30 * time.Second | ||
|
|
||
| const sandboxDrainLogInterval = 5 * time.Second |
There was a problem hiding this comment.
For sandboxes that may remain up for 48 hours or more, logging every 5 seconds "still not done" is a lot of logs. maybe once a minute is better? or maybe a sliding scale:
- every 5 seconds for a minute
- every minute for an hour
- every 15 minutes for the next 48 hours
| if err := s.proxy.RemoveFromPool(sbx.LifecycleID); err != nil { | ||
| sbxLog.Warn(ctx, "failed to remove sandbox from proxy pool after force stop", zap.Error(err)) | ||
| } |
There was a problem hiding this comment.
Why is this necessary on force stop but not drain?
| if err := closer.close(closeCtx); err != nil { | ||
| clog.Error(ctx, "error during shutdown", zap.Error(err)) | ||
| success = false | ||
| registerClose("sandbox drain", []string{"orchestrator server", "network pool", "device pool"}, func(context.Context) error { |
There was a problem hiding this comment.
To double down on codex's comment, this close needs to happen as literally the first thing to happen on any signals, before any other closing can happen, and it needs to happen on its own (no concurrent closing can happen at the same time). it's not clear that's happening here.
|
Before merging, we should do the following:
That should give us enough confidence to say "this is fixed" |


No description provided.