-
Notifications
You must be signed in to change notification settings - Fork 4
sandbox: prevent duplicate bridge IP assignment (MIR-1238) #856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,165 @@ | ||
| package sandbox | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "log/slog" | ||
| "net/netip" | ||
| "time" | ||
|
|
||
| "miren.dev/runtime/pkg/netdb" | ||
| ) | ||
|
|
||
| // IPReconciler keeps the netdb IP lease bookkeeping in agreement with the | ||
| // addresses actually live on the bridge. The two can diverge — a release path | ||
| // that ran while a container kept running, a lease lost across a restart — and a | ||
| // divergence where netdb believes a live address is free leads to that address | ||
| // being handed to a second sandbox, a duplicate assignment (MIR-1238). | ||
| // | ||
| // Each cycle it: | ||
| // - re-reserves any address that is live on the bridge but not reserved in | ||
| // netdb (the repair direction — always safe, only ever adds a reservation | ||
| // for an address already in use); and | ||
| // - releases any address reserved in netdb with no live owner, but only after | ||
| // it has been absent for several consecutive cycles (the reap direction — | ||
| // conservative, to avoid racing a sandbox that is mid-create or mid-teardown). | ||
| type IPReconciler struct { | ||
| Log *slog.Logger | ||
| Subnet *netdb.Subnet | ||
|
|
||
| // LiveIPs returns the set of addresses currently assigned to running sandbox | ||
| // containers, determined independently of netdb. | ||
| LiveIPs func(ctx context.Context) (map[netip.Addr]bool, error) | ||
|
|
||
| // CheckInterval is how often a reconcile cycle runs. | ||
| CheckInterval time.Duration | ||
| // ReleaseAfterMisses is how many consecutive cycles an address must be | ||
| // reserved-but-not-live before it is released. With the default interval this | ||
| // is a multi-minute grace window, comfortably longer than any normal sandbox | ||
| // create (whose saga releases the reservation on failure anyway). | ||
| ReleaseAfterMisses int | ||
|
|
||
| misses map[netip.Addr]int | ||
| cancel context.CancelFunc | ||
| } | ||
|
|
||
| // Start begins the periodic reconcile loop. | ||
| func (r *IPReconciler) Start(ctx context.Context) { | ||
| if r.CheckInterval == 0 { | ||
| r.CheckInterval = 5 * time.Minute | ||
| } | ||
| if r.ReleaseAfterMisses == 0 { | ||
| r.ReleaseAfterMisses = 3 | ||
| } | ||
| r.misses = make(map[netip.Addr]int) | ||
|
|
||
| r.Log.Info("starting ip reconciler", | ||
| "check_interval", r.CheckInterval, | ||
| "release_after_misses", r.ReleaseAfterMisses) | ||
|
|
||
| ctx, r.cancel = context.WithCancel(ctx) | ||
| go r.monitor(ctx) | ||
| } | ||
|
|
||
| // Stop gracefully stops the reconciler. | ||
| func (r *IPReconciler) Stop() { | ||
| if r.cancel != nil { | ||
| r.cancel() | ||
| } | ||
| } | ||
|
|
||
| func (r *IPReconciler) monitor(ctx context.Context) { | ||
| ticker := time.NewTicker(r.CheckInterval) | ||
| defer ticker.Stop() | ||
|
|
||
| // Run an initial reconcile on startup. This also serves boot reconciliation: | ||
| // it re-reserves the IPs of every container that survived a restart, using the | ||
| // live containers as the source of truth rather than the entity store. | ||
| if err := r.reconcile(ctx); err != nil { | ||
| r.Log.Error("initial ip reconcile failed", "error", err) | ||
| } | ||
|
|
||
| for { | ||
| select { | ||
| case <-ticker.C: | ||
| if err := r.reconcile(ctx); err != nil { | ||
| r.Log.Error("ip reconcile failed", "error", err) | ||
| } | ||
| case <-ctx.Done(): | ||
| r.Log.Info("ip reconciler stopped") | ||
| return | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (r *IPReconciler) reconcile(ctx context.Context) error { | ||
| if r.Subnet == nil || r.LiveIPs == nil { | ||
| return nil | ||
| } | ||
|
|
||
| live, err := r.LiveIPs(ctx) | ||
| if err != nil { | ||
| return fmt.Errorf("enumerating live IPs: %w", err) | ||
| } | ||
|
|
||
| reserved, err := r.Subnet.ReservedAddrs() | ||
| if err != nil { | ||
| return fmt.Errorf("listing reserved addresses: %w", err) | ||
| } | ||
| reservedSet := make(map[netip.Addr]bool, len(reserved)) | ||
| for _, addr := range reserved { | ||
| reservedSet[addr] = true | ||
| } | ||
|
|
||
| // Repair: an address live on the bridge but not reserved in netdb is the | ||
| // exact divergence behind MIR-1238. Re-reserve it so it can never be handed | ||
| // to a second sandbox. | ||
| for addr := range live { | ||
| if reservedSet[addr] { | ||
| continue | ||
| } | ||
| if err := r.Subnet.ReserveSpecificAddr(addr); err != nil { | ||
| r.Log.Error("ip reconciler failed to re-reserve live address", "addr", addr, "error", err) | ||
| continue | ||
| } | ||
| r.Log.Warn("ip reconciler re-reserved a live address netdb had lost", "addr", addr) | ||
| } | ||
|
|
||
| // Reap: an address reserved in netdb with no live owner is a leaked lease. | ||
| // Release it only after it has been absent for ReleaseAfterMisses consecutive | ||
| // cycles, so a sandbox that has reserved its address but not yet booted its | ||
| // container (or is mid-teardown) is not disturbed. | ||
| for addr := range reservedSet { | ||
| if live[addr] { | ||
| delete(r.misses, addr) | ||
| continue | ||
| } | ||
|
|
||
| r.misses[addr]++ | ||
| if r.misses[addr] < r.ReleaseAfterMisses { | ||
| continue | ||
| } | ||
|
|
||
| if err := r.Subnet.ReleaseAddr(addr); err != nil { | ||
| r.Log.Error("ip reconciler failed to release leaked address", "addr", addr, "error", err) | ||
| // Cap the counter so a persistently failing release retries every | ||
| // cycle without letting the miss count (and the map entry) grow | ||
| // without bound. | ||
| r.misses[addr] = r.ReleaseAfterMisses | ||
| continue | ||
| } | ||
| delete(r.misses, addr) | ||
| r.Log.Warn("ip reconciler released leaked reservation with no live owner", | ||
| "addr", addr, "after_misses", r.ReleaseAfterMisses) | ||
| } | ||
|
|
||
| // Forget miss counters for addresses that are no longer reserved (e.g. they | ||
| // were released elsewhere) so the map cannot grow without bound. | ||
| for addr := range r.misses { | ||
| if !reservedSet[addr] { | ||
| delete(r.misses, addr) | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,100 @@ | ||
| package sandbox | ||
|
|
||
| import ( | ||
| "context" | ||
| "io" | ||
| "log/slog" | ||
| "net/netip" | ||
| "path/filepath" | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
|
|
||
| "miren.dev/runtime/pkg/netdb" | ||
| ) | ||
|
|
||
| func TestIPReconciler(t *testing.T) { | ||
| newSubnet := func(t *testing.T) *netdb.Subnet { | ||
| n, err := netdb.New(filepath.Join(t.TempDir(), "net.db")) | ||
| require.NoError(t, err) | ||
| s, err := n.Subnet("10.8.64.0/24") | ||
| require.NoError(t, err) | ||
| return s | ||
| } | ||
| log := slog.New(slog.NewTextHandler(io.Discard, nil)) | ||
|
|
||
| reservedSet := func(t *testing.T, s *netdb.Subnet) map[string]bool { | ||
| reserved, err := s.ReservedAddrs() | ||
| require.NoError(t, err) | ||
| got := make(map[string]bool, len(reserved)) | ||
| for _, a := range reserved { | ||
| got[a.String()] = true | ||
| } | ||
| return got | ||
| } | ||
|
|
||
| t.Run("re-reserves a live address netdb lost", func(t *testing.T) { | ||
| r := require.New(t) | ||
| subnet := newSubnet(t) | ||
|
|
||
| live := map[netip.Addr]bool{netip.MustParseAddr("10.8.64.5"): true} | ||
| rec := &IPReconciler{ | ||
| Log: log, | ||
| Subnet: subnet, | ||
| LiveIPs: func(context.Context) (map[netip.Addr]bool, error) { return live, nil }, | ||
| ReleaseAfterMisses: 2, | ||
| misses: make(map[netip.Addr]int), | ||
| } | ||
|
|
||
| r.NoError(rec.reconcile(context.Background())) | ||
|
|
||
| r.True(reservedSet(t, subnet)["10.8.64.5"], "live-but-unreserved address should be re-reserved") | ||
| }) | ||
|
|
||
| t.Run("reaps a leaked reservation only after the miss threshold", func(t *testing.T) { | ||
| r := require.New(t) | ||
| subnet := newSubnet(t) | ||
|
|
||
| _, err := subnet.Reserve() // .2 reserved, but never live | ||
| r.NoError(err) | ||
|
|
||
| rec := &IPReconciler{ | ||
| Log: log, | ||
| Subnet: subnet, | ||
| LiveIPs: func(context.Context) (map[netip.Addr]bool, error) { | ||
| return map[netip.Addr]bool{}, nil // nothing live | ||
| }, | ||
| ReleaseAfterMisses: 2, | ||
| misses: make(map[netip.Addr]int), | ||
| } | ||
|
|
||
| // First cycle: miss = 1 (< threshold), still reserved. | ||
| r.NoError(rec.reconcile(context.Background())) | ||
| r.Len(reservedSet(t, subnet), 1, "should not reap on the first miss") | ||
|
|
||
| // Second cycle: miss = 2 (== threshold), released. | ||
| r.NoError(rec.reconcile(context.Background())) | ||
| r.Empty(reservedSet(t, subnet), "should reap after reaching the miss threshold") | ||
| }) | ||
|
|
||
| t.Run("never reaps an address that is live", func(t *testing.T) { | ||
| r := require.New(t) | ||
| subnet := newSubnet(t) | ||
|
|
||
| ip, err := subnet.Reserve() // .2 | ||
| r.NoError(err) | ||
|
|
||
| live := map[netip.Addr]bool{ip.Addr(): true} | ||
| rec := &IPReconciler{ | ||
| Log: log, | ||
| Subnet: subnet, | ||
| LiveIPs: func(context.Context) (map[netip.Addr]bool, error) { return live, nil }, | ||
| ReleaseAfterMisses: 1, | ||
| misses: make(map[netip.Addr]int), | ||
| } | ||
|
|
||
| r.NoError(rec.reconcile(context.Background())) | ||
| r.NoError(rec.reconcile(context.Background())) | ||
| r.Len(reservedSet(t, subnet), 1, "a live address must never be reaped") | ||
| }) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -131,6 +131,7 @@ type SandboxController struct { | |
|
|
||
| watchdog *ContainerWatchdog | ||
| imageWatchdog *ImageWatchdog | ||
| ipReconciler *IPReconciler | ||
|
|
||
| // writeTracker tracks entity write revisions to skip self-generated watch events | ||
| writeTracker controller.WriteTracker | ||
|
|
@@ -555,6 +556,21 @@ func (c *SandboxController) Init(ctx context.Context) error { | |
| } | ||
| c.imageWatchdog.Start(c.topCtx) | ||
|
|
||
| // Initialize and start the IP reconciler, which keeps netdb lease bookkeeping | ||
| // in agreement with the addresses actually live on the bridge (MIR-1238). Its | ||
| // initial run also re-reserves the IPs of containers that survived a restart. | ||
| if c.Subnet != nil { | ||
| c.ipReconciler = &IPReconciler{ | ||
| Log: c.Log.With("module", "ip-reconciler"), | ||
| Subnet: c.Subnet, | ||
| LiveIPs: func(ctx context.Context) (map[netip.Addr]bool, error) { | ||
| return c.liveBridgeIPs(ctx) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| }, | ||
| // CheckInterval is left at the IPReconciler default (see Start). | ||
| } | ||
| c.ipReconciler.Start(c.topCtx) | ||
| } | ||
|
|
||
| // Start workload identity token refresh loop and token request server | ||
| // (tokenRefresher and tokenSecrets were created earlier, before reconcile) | ||
| if c.WorkloadIssuer != nil { | ||
|
|
@@ -592,6 +608,10 @@ func (c *SandboxController) Close() error { | |
| c.imageWatchdog.Stop() | ||
| } | ||
|
|
||
| if c.ipReconciler != nil { | ||
| c.ipReconciler.Stop() | ||
| } | ||
|
|
||
| c.running.Wait() | ||
|
|
||
| // Shutdown DNS and other network services | ||
|
|
@@ -1350,7 +1370,9 @@ func (c *SandboxController) AllocateNetwork( | |
| } | ||
|
|
||
| } else { | ||
| ep, err = network.AllocateOnBridge(c.Bridge, c.Subnet) | ||
| ep, err = network.AllocateOnBridge(c.Log, c.Bridge, c.Subnet, func() (map[netip.Addr]bool, error) { | ||
| return c.liveBridgeIPs(ctx) | ||
| }) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
@@ -1366,6 +1388,48 @@ func (c *SandboxController) AllocateNetwork( | |
| return ep, nil | ||
| } | ||
|
|
||
| // liveBridgeIPs returns the set of bridge IP addresses currently assigned to | ||
| // sandbox containers, read from their containerd labels. It is the in-use set | ||
| // used by AllocateOnBridge to avoid handing out an address that netdb's lease | ||
| // bookkeeping has lost track of but a sandbox is still using (MIR-1238). The | ||
| // labels are the same source the watchdog trusts and require no netns entry. | ||
| // | ||
| // It does not inspect task state, so a container whose task has exited but whose | ||
| // containerd record still exists (e.g. between SIGKILL and removeContainer) | ||
| // still contributes its IP. This is deliberately conservative: counting a | ||
| // not-quite-gone address as live at worst keeps it reserved for one extra | ||
| // reconciler cycle, whereas missing a live address risks a duplicate assignment. | ||
| func (c *SandboxController) liveBridgeIPs(ctx context.Context) (map[netip.Addr]bool, error) { | ||
| ctx = namespaces.WithNamespace(ctx, c.Namespace) | ||
|
|
||
| containerList, err := c.CC.Containers(ctx) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to list containers: %w", err) | ||
| } | ||
|
|
||
| live := make(map[netip.Addr]bool) | ||
| for _, cont := range containerList { | ||
| labels, err := cont.Labels(ctx) | ||
| if err != nil { | ||
| c.Log.Warn("failed to read container labels while enumerating live IPs", "id", cont.ID(), "error", err) | ||
| continue | ||
| } | ||
| for label, value := range labels { | ||
| if !strings.HasPrefix(label, "runtime.computer/ip") { | ||
| continue | ||
| } | ||
| addr, err := netip.ParseAddr(value) | ||
| if err != nil { | ||
| c.Log.Warn("failed to parse IP from container label", "id", cont.ID(), "label", label, "value", value, "error", err) | ||
| continue | ||
| } | ||
| live[addr] = true | ||
| } | ||
| } | ||
|
|
||
| return live, nil | ||
| } | ||
|
|
||
| func (c *SandboxController) setupHosts(sb *compute.Sandbox, name string, ep *network.EndpointConfig) error { | ||
| if ep == nil || len(ep.Addresses) == 0 { | ||
| return fmt.Errorf("no addresses allocated for sandbox %s", sb.ID) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When
ReleaseAddrfails,r.misses[addr]is left at the threshold value. On the next cycle,r.misses[addr]++pushes it above the threshold and another release is attempted — which is probably the desired retry behaviour. However, the miss counter for this address will never be cleaned up by the guard block below (lines 154–158) as long as the address stays reserved, so it grows unboundedly on each failed cycle. Consider either capping the counter atReleaseAfterMissesafter a failure, or adding a comment explaining the retry-forever intent.