From 194b69f2297867712cc4f2384787839c864f7938 Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Thu, 11 Jun 2026 21:00:38 -0700 Subject: [PATCH 1/2] sandbox: prevent duplicate bridge IP assignment (MIR-1238) The sandbox IPAM handed the same rt0 address to two live sandboxes on a runner, causing ARP conflicts, martian-source drops, and connection flapping for the affected apps. netdb cannot itself hold two reserved rows for one IP (ip is a PRIMARY KEY); the bug is a divergence between netdb's lease bookkeeping and the addresses actually live on the bridge. A sandbox's lease was released in netdb while its container kept running, and the now-"free" address was later handed to a second sandbox. The container watchdog is the source of the premature release: it freed an IP whenever a labeled container was missing from the node-scoped sandbox list (which is built from an index that can lag), released it before the container was actually removed, and released it even when removal failed. Defense in depth: - watchdog: release an IP only after the container is confirmed removed (leave it reserved on failure); before treating a container as orphaned, re-fetch its sandbox entity directly by ID (a linearizable key lookup, not an index read) so a sandbox the index transiently omitted is never killed or has its IP released. - AllocateOnBridge: verify a freshly reserved address is not already live on the bridge (from running-container labels); on conflict keep it quarantined and retry, so a divergence can never surface as a duplicate handed to a new sandbox. - IPReconciler: periodically (and on boot) re-reserve any address live on the bridge but free in netdb, and conservatively reap reservations with no live owner after several consecutive misses. Adds unit tests for netdb concurrency/ReservedAddrs, the conflict-free reserve loop, and the reconciler, plus an iso regression test that a running sandbox missing from the node index is kept. --- controllers/sandbox/ipreconciler.go | 161 ++++++++++++++++++ controllers/sandbox/ipreconciler_test.go | 100 +++++++++++ controllers/sandbox/sandbox.go | 61 ++++++- controllers/sandbox/sandbox_frozen_test.go | 2 +- controllers/sandbox/watchdog.go | 155 +++++++++++++---- .../sandbox/watchdogtest/container_test.go | 45 +++++ network/config.go | 55 +++++- network/config_test.go | 61 +++++++ pkg/netdb/netdb.go | 31 ++++ pkg/netdb/netdb_test.go | 86 ++++++++++ 10 files changed, 720 insertions(+), 37 deletions(-) create mode 100644 controllers/sandbox/ipreconciler.go create mode 100644 controllers/sandbox/ipreconciler_test.go create mode 100644 network/config_test.go diff --git a/controllers/sandbox/ipreconciler.go b/controllers/sandbox/ipreconciler.go new file mode 100644 index 000000000..d53695215 --- /dev/null +++ b/controllers/sandbox/ipreconciler.go @@ -0,0 +1,161 @@ +package sandbox + +import ( + "context" + "fmt" + "log/slog" + "net/netip" + "time" + + "miren.dev/runtime/pkg/netdb" +) + +// IPReconciler keeps the netdb IP lease bookkeeping in agreement with the +// addresses actually live on the bridge. The two can diverge — a release path +// that ran while a container kept running, a lease lost across a restart — and a +// divergence where netdb believes a live address is free leads to that address +// being handed to a second sandbox, a duplicate assignment (MIR-1238). +// +// Each cycle it: +// - re-reserves any address that is live on the bridge but not reserved in +// netdb (the repair direction — always safe, only ever adds a reservation +// for an address already in use); and +// - releases any address reserved in netdb with no live owner, but only after +// it has been absent for several consecutive cycles (the reap direction — +// conservative, to avoid racing a sandbox that is mid-create or mid-teardown). +type IPReconciler struct { + Log *slog.Logger + Subnet *netdb.Subnet + + // LiveIPs returns the set of addresses currently assigned to running sandbox + // containers, determined independently of netdb. + LiveIPs func(ctx context.Context) (map[netip.Addr]bool, error) + + // CheckInterval is how often a reconcile cycle runs. + CheckInterval time.Duration + // ReleaseAfterMisses is how many consecutive cycles an address must be + // reserved-but-not-live before it is released. With the default interval this + // is a multi-minute grace window, comfortably longer than any normal sandbox + // create (whose saga releases the reservation on failure anyway). + ReleaseAfterMisses int + + misses map[netip.Addr]int + cancel context.CancelFunc +} + +// Start begins the periodic reconcile loop. +func (r *IPReconciler) Start(ctx context.Context) { + if r.CheckInterval == 0 { + r.CheckInterval = 5 * time.Minute + } + if r.ReleaseAfterMisses == 0 { + r.ReleaseAfterMisses = 3 + } + r.misses = make(map[netip.Addr]int) + + r.Log.Info("starting ip reconciler", + "check_interval", r.CheckInterval, + "release_after_misses", r.ReleaseAfterMisses) + + ctx, r.cancel = context.WithCancel(ctx) + go r.monitor(ctx) +} + +// Stop gracefully stops the reconciler. +func (r *IPReconciler) Stop() { + if r.cancel != nil { + r.cancel() + } +} + +func (r *IPReconciler) monitor(ctx context.Context) { + ticker := time.NewTicker(r.CheckInterval) + defer ticker.Stop() + + // Run an initial reconcile on startup. This also serves boot reconciliation: + // it re-reserves the IPs of every container that survived a restart, using the + // live containers as the source of truth rather than the entity store. + if err := r.reconcile(ctx); err != nil { + r.Log.Error("initial ip reconcile failed", "error", err) + } + + for { + select { + case <-ticker.C: + if err := r.reconcile(ctx); err != nil { + r.Log.Error("ip reconcile failed", "error", err) + } + case <-ctx.Done(): + r.Log.Info("ip reconciler stopped") + return + } + } +} + +func (r *IPReconciler) reconcile(ctx context.Context) error { + if r.Subnet == nil || r.LiveIPs == nil { + return nil + } + + live, err := r.LiveIPs(ctx) + if err != nil { + return fmt.Errorf("enumerating live IPs: %w", err) + } + + reserved, err := r.Subnet.ReservedAddrs() + if err != nil { + return fmt.Errorf("listing reserved addresses: %w", err) + } + reservedSet := make(map[netip.Addr]bool, len(reserved)) + for _, addr := range reserved { + reservedSet[addr] = true + } + + // Repair: an address live on the bridge but not reserved in netdb is the + // exact divergence behind MIR-1238. Re-reserve it so it can never be handed + // to a second sandbox. + for addr := range live { + if reservedSet[addr] { + continue + } + if err := r.Subnet.ReserveSpecificAddr(addr); err != nil { + r.Log.Error("ip reconciler failed to re-reserve live address", "addr", addr, "error", err) + continue + } + r.Log.Warn("ip reconciler re-reserved a live address netdb had lost", "addr", addr) + } + + // Reap: an address reserved in netdb with no live owner is a leaked lease. + // Release it only after it has been absent for ReleaseAfterMisses consecutive + // cycles, so a sandbox that has reserved its address but not yet booted its + // container (or is mid-teardown) is not disturbed. + for addr := range reservedSet { + if live[addr] { + delete(r.misses, addr) + continue + } + + r.misses[addr]++ + if r.misses[addr] < r.ReleaseAfterMisses { + continue + } + + if err := r.Subnet.ReleaseAddr(addr); err != nil { + r.Log.Error("ip reconciler failed to release leaked address", "addr", addr, "error", err) + continue + } + delete(r.misses, addr) + r.Log.Warn("ip reconciler released leaked reservation with no live owner", + "addr", addr, "after_misses", r.ReleaseAfterMisses) + } + + // Forget miss counters for addresses that are no longer reserved (e.g. they + // were released elsewhere) so the map cannot grow without bound. + for addr := range r.misses { + if !reservedSet[addr] { + delete(r.misses, addr) + } + } + + return nil +} diff --git a/controllers/sandbox/ipreconciler_test.go b/controllers/sandbox/ipreconciler_test.go new file mode 100644 index 000000000..034fbcb45 --- /dev/null +++ b/controllers/sandbox/ipreconciler_test.go @@ -0,0 +1,100 @@ +package sandbox + +import ( + "context" + "io" + "log/slog" + "net/netip" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "miren.dev/runtime/pkg/netdb" +) + +func TestIPReconciler(t *testing.T) { + newSubnet := func(t *testing.T) *netdb.Subnet { + n, err := netdb.New(filepath.Join(t.TempDir(), "net.db")) + require.NoError(t, err) + s, err := n.Subnet("10.8.64.0/24") + require.NoError(t, err) + return s + } + log := slog.New(slog.NewTextHandler(io.Discard, nil)) + + reservedSet := func(t *testing.T, s *netdb.Subnet) map[string]bool { + reserved, err := s.ReservedAddrs() + require.NoError(t, err) + got := make(map[string]bool, len(reserved)) + for _, a := range reserved { + got[a.String()] = true + } + return got + } + + t.Run("re-reserves a live address netdb lost", func(t *testing.T) { + r := require.New(t) + subnet := newSubnet(t) + + live := map[netip.Addr]bool{netip.MustParseAddr("10.8.64.5"): true} + rec := &IPReconciler{ + Log: log, + Subnet: subnet, + LiveIPs: func(context.Context) (map[netip.Addr]bool, error) { return live, nil }, + ReleaseAfterMisses: 2, + misses: make(map[netip.Addr]int), + } + + r.NoError(rec.reconcile(context.Background())) + + r.True(reservedSet(t, subnet)["10.8.64.5"], "live-but-unreserved address should be re-reserved") + }) + + t.Run("reaps a leaked reservation only after the miss threshold", func(t *testing.T) { + r := require.New(t) + subnet := newSubnet(t) + + _, err := subnet.Reserve() // .2 reserved, but never live + r.NoError(err) + + rec := &IPReconciler{ + Log: log, + Subnet: subnet, + LiveIPs: func(context.Context) (map[netip.Addr]bool, error) { + return map[netip.Addr]bool{}, nil // nothing live + }, + ReleaseAfterMisses: 2, + misses: make(map[netip.Addr]int), + } + + // First cycle: miss = 1 (< threshold), still reserved. + r.NoError(rec.reconcile(context.Background())) + r.Len(reservedSet(t, subnet), 1, "should not reap on the first miss") + + // Second cycle: miss = 2 (== threshold), released. + r.NoError(rec.reconcile(context.Background())) + r.Empty(reservedSet(t, subnet), "should reap after reaching the miss threshold") + }) + + t.Run("never reaps an address that is live", func(t *testing.T) { + r := require.New(t) + subnet := newSubnet(t) + + ip, err := subnet.Reserve() // .2 + r.NoError(err) + + live := map[netip.Addr]bool{ip.Addr(): true} + rec := &IPReconciler{ + Log: log, + Subnet: subnet, + LiveIPs: func(context.Context) (map[netip.Addr]bool, error) { return live, nil }, + ReleaseAfterMisses: 1, + misses: make(map[netip.Addr]int), + } + + r.NoError(rec.reconcile(context.Background())) + r.NoError(rec.reconcile(context.Background())) + r.Len(reservedSet(t, subnet), 1, "a live address must never be reaped") + }) +} diff --git a/controllers/sandbox/sandbox.go b/controllers/sandbox/sandbox.go index be98d890a..0c4867aa9 100644 --- a/controllers/sandbox/sandbox.go +++ b/controllers/sandbox/sandbox.go @@ -131,6 +131,7 @@ type SandboxController struct { watchdog *ContainerWatchdog imageWatchdog *ImageWatchdog + ipReconciler *IPReconciler // writeTracker tracks entity write revisions to skip self-generated watch events writeTracker controller.WriteTracker @@ -555,6 +556,21 @@ func (c *SandboxController) Init(ctx context.Context) error { } c.imageWatchdog.Start(c.topCtx) + // Initialize and start the IP reconciler, which keeps netdb lease bookkeeping + // in agreement with the addresses actually live on the bridge (MIR-1238). Its + // initial run also re-reserves the IPs of containers that survived a restart. + if c.Subnet != nil { + c.ipReconciler = &IPReconciler{ + Log: c.Log.With("module", "ip-reconciler"), + Subnet: c.Subnet, + LiveIPs: func(ctx context.Context) (map[netip.Addr]bool, error) { + return c.liveBridgeIPs(ctx) + }, + CheckInterval: 5 * time.Minute, + } + c.ipReconciler.Start(c.topCtx) + } + // Start workload identity token refresh loop and token request server // (tokenRefresher and tokenSecrets were created earlier, before reconcile) if c.WorkloadIssuer != nil { @@ -592,6 +608,10 @@ func (c *SandboxController) Close() error { c.imageWatchdog.Stop() } + if c.ipReconciler != nil { + c.ipReconciler.Stop() + } + c.running.Wait() // Shutdown DNS and other network services @@ -1350,7 +1370,9 @@ func (c *SandboxController) AllocateNetwork( } } else { - ep, err = network.AllocateOnBridge(c.Bridge, c.Subnet) + ep, err = network.AllocateOnBridge(c.Log, c.Bridge, c.Subnet, func() (map[netip.Addr]bool, error) { + return c.liveBridgeIPs(ctx) + }) if err != nil { return nil, err } @@ -1366,6 +1388,43 @@ func (c *SandboxController) AllocateNetwork( return ep, nil } +// liveBridgeIPs returns the set of bridge IP addresses currently assigned to +// running sandbox containers, read from their containerd labels. It is the +// authoritative in-use set used by AllocateOnBridge to avoid handing out an +// address that netdb's lease bookkeeping has lost track of but a live sandbox is +// still using (MIR-1238). The labels are the same source the watchdog trusts and +// require no netns entry. +func (c *SandboxController) liveBridgeIPs(ctx context.Context) (map[netip.Addr]bool, error) { + ctx = namespaces.WithNamespace(ctx, c.Namespace) + + containerList, err := c.CC.Containers(ctx) + if err != nil { + return nil, fmt.Errorf("failed to list containers: %w", err) + } + + live := make(map[netip.Addr]bool) + for _, cont := range containerList { + labels, err := cont.Labels(ctx) + if err != nil { + c.Log.Warn("failed to read container labels while enumerating live IPs", "id", cont.ID(), "error", err) + continue + } + for label, value := range labels { + if !strings.HasPrefix(label, "runtime.computer/ip") { + continue + } + addr, err := netip.ParseAddr(value) + if err != nil { + c.Log.Warn("failed to parse IP from container label", "id", cont.ID(), "label", label, "value", value, "error", err) + continue + } + live[addr] = true + } + } + + return live, nil +} + func (c *SandboxController) setupHosts(sb *compute.Sandbox, name string, ep *network.EndpointConfig) error { if ep == nil || len(ep.Addresses) == 0 { return fmt.Errorf("no addresses allocated for sandbox %s", sb.ID) diff --git a/controllers/sandbox/sandbox_frozen_test.go b/controllers/sandbox/sandbox_frozen_test.go index bb63be4a7..2ce60c0ac 100644 --- a/controllers/sandbox/sandbox_frozen_test.go +++ b/controllers/sandbox/sandbox_frozen_test.go @@ -24,7 +24,7 @@ import ( // sha256sum controllers/sandbox/sandbox.go controllers/sandbox/volume.go controllers/sandbox/firewall.go func TestSandboxControllerFrozen(t *testing.T) { frozen := map[string]string{ - "sandbox.go": "2cb139828e42cae2a41459c3e8a699dfdcd60e9e30d191abcbc4a2c432497c9e", + "sandbox.go": "8fea7bd970871a0f2b2c4759296822c98473ca13b93ba38e6244c438c9224e7a", "volume.go": "b4697764d48a90adc04ce47968ccef11ceba50da8d19c889906c5c3a539065b3", "firewall.go": "648cb5d91091d5eb7400152b19695a8045585feae59c5dd36c12d663a27bb91f", } diff --git a/controllers/sandbox/watchdog.go b/controllers/sandbox/watchdog.go index 3344c149a..94f4eae47 100644 --- a/controllers/sandbox/watchdog.go +++ b/controllers/sandbox/watchdog.go @@ -2,6 +2,7 @@ package sandbox import ( "context" + "errors" "fmt" "log/slog" "net/netip" @@ -14,6 +15,7 @@ import ( compute "miren.dev/runtime/api/compute/compute_v1alpha" "miren.dev/runtime/api/entityserver/entityserver_v1alpha" + "miren.dev/runtime/pkg/cond" "miren.dev/runtime/pkg/entity" "miren.dev/runtime/pkg/netdb" ) @@ -180,20 +182,21 @@ func (w *ContainerWatchdog) CleanupOrphanedContainers(ctx context.Context) (*Cle } } - // Identify orphaned containers and store their labels for IP cleanup + // Collect the sandbox containers (those carrying our entity label) up front so + // we can reason about the overall picture before killing or releasing anything. type orphanedContainer struct { container containerd.Container labels map[string]string } - var orphanedContainers []orphanedContainer + type sandboxContainer struct { + container containerd.Container + labels map[string]string + sandboxID string + } + var sandboxContainers []sandboxContainer for _, container := range containerList { containerID := container.ID() - // Skip if this is a valid container - if validContainers[containerID] { - continue - } - // Check labels to see if this is a sandbox container labels, err := container.Labels(cleanupCtx) if err != nil { @@ -202,12 +205,52 @@ func (w *ContainerWatchdog) CleanupOrphanedContainers(ctx context.Context) (*Cle } // Skip if not a sandbox container (check for our labels) - if _, ok := labels[sandboxEntityLabel]; !ok { + sandboxID, ok := labels[sandboxEntityLabel] + if !ok { + continue + } + + sandboxContainers = append(sandboxContainers, sandboxContainer{ + container: container, + labels: labels, + sandboxID: sandboxID, + }) + } + + // An empty node-scoped sandbox list while sandbox containers are clearly + // running is a strong signal that the node index is lagging. We do not act on + // the empty list directly — the per-container re-fetch below is authoritative + // and both protects index-missed live sandboxes and still reclaims genuinely + // drained ones — but surface it so the staleness is visible (MIR-1238). + if len(resp.Values()) == 0 && len(sandboxContainers) > 0 { + w.Log.Warn("watchdog: node sandbox list empty but sandbox containers exist; relying on per-container entity re-fetch", + "sandbox_containers", len(sandboxContainers)) + } + + // Identify orphaned containers and store their labels for IP cleanup. + var orphanedContainers []orphanedContainer + for _, sc := range sandboxContainers { + containerID := sc.container.ID() + + // Skip if this is a valid container + if validContainers[containerID] { continue } - w.Log.Info("watchdog found orphaned container", "id", containerID, "labels", labels) - orphanedContainers = append(orphanedContainers, orphanedContainer{container: container, labels: labels}) + // The node-scoped list did not include this container's sandbox. That + // list is built from a node index that can transiently lag; before + // treating the container as orphaned, re-fetch the sandbox entity + // directly by ID — a linearizable key lookup, not an index read. If the + // sandbox still exists and is not DEAD past the grace window, the index + // merely missed it: never kill it or release its IP (MIR-1238). + if w.sandboxStillValid(cleanupCtx, sc.sandboxID, now, graceWindow) { + w.Log.Warn("watchdog: container missing from node list but sandbox entity is still valid; skipping (stale index)", + "id", containerID, "sandbox_id", sc.sandboxID) + continue + } + + w.Log.Info("watchdog found orphaned container", "id", containerID, "labels", sc.labels) + orphanedContainers = append(orphanedContainers, orphanedContainer{container: sc.container, labels: sc.labels}) } if len(orphanedContainers) == 0 { @@ -247,37 +290,83 @@ func (w *ContainerWatchdog) CleanupOrphanedContainers(ctx context.Context) (*Cle } } - // Release IPs from container labels before removing the container - if w.Subnet != nil { - for label, value := range oc.labels { - if strings.HasPrefix(label, "runtime.computer/ip") { - addr, err := netip.ParseAddr(value) - if err != nil { - w.Log.Warn("watchdog failed to parse IP from label", "id", containerID, "label", label, "value", value, "error", err) - continue - } - if err := w.Subnet.ReleaseAddr(addr); err != nil { - w.Log.Error("watchdog failed to release IP", "id", containerID, "addr", addr, "error", err) - } else { - w.Log.Debug("watchdog released IP", "id", containerID, "addr", addr) - } - } - } - } - - // Aggressively remove the container + // Aggressively remove the container. Only once removal succeeds — and the + // container's network namespace and veth are actually torn down — do we + // return its IP to the pool. Releasing before confirmed removal can hand + // a still-live sandbox's IP to a new sandbox, producing a duplicate + // assignment on the bridge (MIR-1238). if err := w.removeContainer(cleanupCtx, oc.container); err != nil { - w.Log.Error("watchdog failed to remove orphaned container", "id", containerID, "error", err) + w.Log.Error("watchdog failed to remove orphaned container; leaving its IP reserved", "id", containerID, "error", err) result.FailedContainers[containerID] = err - } else { - w.Log.Info("watchdog successfully removed orphaned container", "id", containerID) - result.DeletedContainers = append(result.DeletedContainers, containerID) + continue } + + w.Log.Info("watchdog successfully removed orphaned container", "id", containerID) + result.DeletedContainers = append(result.DeletedContainers, containerID) + + w.releaseContainerIPs(containerID, oc.labels) } return result, nil } +// releaseContainerIPs returns the IP addresses recorded in a container's labels +// to the subnet pool. It must only be called after the container has been +// confirmed removed, so the addresses are no longer live on the bridge. +func (w *ContainerWatchdog) releaseContainerIPs(containerID string, labels map[string]string) { + if w.Subnet == nil { + return + } + for label, value := range labels { + if !strings.HasPrefix(label, "runtime.computer/ip") { + continue + } + addr, err := netip.ParseAddr(value) + if err != nil { + w.Log.Warn("watchdog failed to parse IP from label", "id", containerID, "label", label, "value", value, "error", err) + continue + } + if err := w.Subnet.ReleaseAddr(addr); err != nil { + w.Log.Error("watchdog failed to release IP", "id", containerID, "addr", addr, "error", err) + } else { + w.Log.Debug("watchdog released IP", "id", containerID, "addr", addr) + } + } +} + +// sandboxStillValid reports whether the sandbox with the given ID should still be +// treated as live. It re-fetches the entity directly by ID — a linearizable key +// lookup that, unlike the node-scoped index used to build the valid set, does not +// lag — so it reliably distinguishes a sandbox the index transiently omitted from +// one that is genuinely gone. A sandbox is valid if it exists and is not DEAD, or +// is DEAD but was updated within the grace window. On any non not-found error it +// returns true (fail-safe: keep the sandbox rather than risk reclaiming a live +// one). +func (w *ContainerWatchdog) sandboxStillValid(ctx context.Context, sandboxID string, now time.Time, graceWindow time.Duration) bool { + resp, err := w.EAC.Get(ctx, sandboxID) + if err != nil { + if errors.Is(err, cond.ErrNotFound{}) { + return false + } + w.Log.Warn("watchdog: failed to re-fetch sandbox entity; treating as valid (fail-safe)", + "sandbox_id", sandboxID, "error", err) + return true + } + if !resp.HasEntity() { + return false + } + + ent := resp.Entity().Entity() + + var sb compute.Sandbox + sb.Decode(ent) + + if sb.Status != compute.DEAD { + return true + } + return now.Sub(ent.GetUpdatedAt()) < graceWindow +} + // removeContainer removes a container and its task. // Note: The task should already have been killed before calling this function. func (w *ContainerWatchdog) removeContainer(ctx context.Context, container containerd.Container) error { diff --git a/controllers/sandbox/watchdogtest/container_test.go b/controllers/sandbox/watchdogtest/container_test.go index 232bf8073..f2bcf4fd7 100644 --- a/controllers/sandbox/watchdogtest/container_test.go +++ b/controllers/sandbox/watchdogtest/container_test.go @@ -312,6 +312,51 @@ func TestContainerWatchdog(t *testing.T) { r.NoError(err, "recently DEAD sandbox container should NOT be removed due to grace period") }) + t.Run("keeps a running sandbox missing from the node index", func(t *testing.T) { + r := require.New(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + ctx = namespaces.WithNamespace(ctx, ns) + + // A RUNNING sandbox WITHOUT a schedule key for this node, so the + // node-scoped list the watchdog builds its valid set from will not + // return it — simulating a lagging or incomplete index. The direct + // per-ID entity re-fetch must still recognize it as live and keep its + // container, rather than orphaning it and releasing its IP (MIR-1238). + sbID := entity.Id(idgen.GenNS("sb")) + sb := &compute.Sandbox{ID: sbID, Status: compute.RUNNING} + + var rpcE entityserver_v1alpha.Entity + rpcE.SetId(sbID.String()) + rpcE.SetAttrs(entity.New(entity.DBId, sbID, sb.Encode).Attrs()) + _, err := eac.Put(ctx, &rpcE) + r.NoError(err) + + containerID := sandbox.PauseContainerID(sbID) + container, err := createTestContainer(ctx, cc, containerID, map[string]string{ + sandbox.SandboxEntityLabel: sbID.String(), + }) + r.NoError(err) + defer testutils.ClearContainer(ctx, container) + + watchdog := &sandbox.ContainerWatchdog{ + Log: slog.Default(), + CC: cc, + EAC: eac, + Namespace: ns, + NodeId: testNodeId, + } + + result, err := watchdog.CleanupOrphanedContainers(ctx) + r.NoError(err) + r.Empty(result.DeletedContainers, "a running sandbox missing from the node index must not be removed") + + _, err = cc.LoadContainer(ctx, containerID) + r.NoError(err, "container for an index-missed running sandbox must be kept") + }) + t.Run("starts and stops gracefully", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() diff --git a/network/config.go b/network/config.go index de2d8a2ec..3cd4eeaeb 100644 --- a/network/config.go +++ b/network/config.go @@ -2,6 +2,7 @@ package network import ( "fmt" + "log/slog" "net/netip" "github.com/vishvananda/netlink" @@ -81,7 +82,45 @@ type BridgeConfig struct { PromiscMode bool } -func AllocateOnBridge(name string, subnet *netdb.Subnet) (*EndpointConfig, error) { +// InUseFunc returns the set of IP addresses currently live on the bridge, +// determined independently of the netdb bookkeeping (e.g. from running +// containers). It lets AllocateOnBridge detect and skip an address that netdb +// believes is free but that a running sandbox is actually using — a bookkeeping +// divergence that would otherwise produce a duplicate assignment (MIR-1238). +type InUseFunc func() (map[netip.Addr]bool, error) + +// allocBridgeMaxAttempts bounds how many live-but-unreserved addresses +// AllocateOnBridge will skip past before giving up. +const allocBridgeMaxAttempts = 32 + +// reserveConflictFree reserves an address from the subnet that is not already +// live on the bridge. netdb believes every address it hands out is free, but its +// bookkeeping can diverge from reality (MIR-1238); when Reserve returns an +// address that live shows in use, that reservation is simply retained (Reserve +// already marked it reserved, so it is quarantined and won't be handed out +// again) and the next address is tried. +func reserveConflictFree(log *slog.Logger, bridge string, subnet *netdb.Subnet, live map[netip.Addr]bool) (netip.Prefix, error) { + for attempt := 0; ; attempt++ { + ep, err := subnet.Reserve() + if err != nil { + return netip.Prefix{}, err + } + + if live == nil || !live[ep.Addr()] { + return ep, nil + } + + log.Error("netdb allocated an address already live on the bridge; quarantining and retrying", + "bridge", bridge, "addr", ep.Addr()) + + if attempt+1 >= allocBridgeMaxAttempts { + return netip.Prefix{}, fmt.Errorf("no conflict-free address available on bridge %s after %d attempts", + bridge, allocBridgeMaxAttempts) + } + } +} + +func AllocateOnBridge(log *slog.Logger, name string, subnet *netdb.Subnet, inUse InUseFunc) (*EndpointConfig, error) { if name == "" { return nil, fmt.Errorf("bridge name must be provided") } @@ -93,7 +132,19 @@ func AllocateOnBridge(name string, subnet *netdb.Subnet) (*EndpointConfig, error bridge := subnet.Router() - ep, err := subnet.Reserve() + var live map[netip.Addr]bool + if inUse != nil { + live, err = inUse() + if err != nil { + // A failure to enumerate live addresses must not block allocation; + // fall back to netdb-only behavior (the prior contract). + log.Warn("failed to enumerate in-use bridge addresses; allocating without conflict check", + "bridge", name, "error", err) + live = nil + } + } + + ep, err := reserveConflictFree(log, name, subnet, live) if err != nil { return nil, err } diff --git a/network/config_test.go b/network/config_test.go new file mode 100644 index 000000000..9ddde26bf --- /dev/null +++ b/network/config_test.go @@ -0,0 +1,61 @@ +package network + +import ( + "io" + "log/slog" + "net/netip" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "miren.dev/runtime/pkg/netdb" +) + +func discardLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +func TestReserveConflictFree(t *testing.T) { + t.Run("skips an address that is live on the bridge but free in netdb", func(t *testing.T) { + r := require.New(t) + + n, err := netdb.New(filepath.Join(t.TempDir(), "net.db")) + r.NoError(err) + subnet, err := n.Subnet("10.8.64.0/24") + r.NoError(err) + + // .2 is in use on the wire even though netdb thinks it is free. + live := map[netip.Addr]bool{netip.MustParseAddr("10.8.64.2"): true} + + ep, err := reserveConflictFree(discardLogger(), "rt0", subnet, live) + r.NoError(err) + r.Equal("10.8.64.3/24", ep.String(), "should skip the live .2 and return .3") + + // .2 must stay reserved (quarantined), so it is never handed out later. + ep2, err := subnet.Reserve() + r.NoError(err) + r.Equal("10.8.64.4/24", ep2.String()) + + reserved, err := subnet.ReservedAddrs() + r.NoError(err) + got := make(map[string]bool) + for _, a := range reserved { + got[a.String()] = true + } + r.True(got["10.8.64.2"], "conflicting address must remain quarantined as reserved") + }) + + t.Run("returns the first free address when nothing is live", func(t *testing.T) { + r := require.New(t) + + n, err := netdb.New(filepath.Join(t.TempDir(), "net.db")) + r.NoError(err) + subnet, err := n.Subnet("10.8.64.0/24") + r.NoError(err) + + ep, err := reserveConflictFree(discardLogger(), "rt0", subnet, nil) + r.NoError(err) + r.Equal("10.8.64.2/24", ep.String()) + }) +} diff --git a/pkg/netdb/netdb.go b/pkg/netdb/netdb.go index 2340435fa..d7f3d3c12 100644 --- a/pkg/netdb/netdb.go +++ b/pkg/netdb/netdb.go @@ -416,6 +416,37 @@ func (s *Subnet) ReserveSpecificAddr(addr netip.Addr) error { return err } +// ReservedAddrs returns all currently reserved addresses within this subnet. +func (s *Subnet) ReservedAddrs() ([]netip.Addr, error) { + s.mu.Lock() + defer s.mu.Unlock() + + rows, err := s.db.Query("SELECT ip FROM ips WHERE reserved = 1 AND subnet = ?", s.net.String()) + if err != nil { + return nil, err + } + defer rows.Close() + + var addrs []netip.Addr + for rows.Next() { + var ipStr string + if err := rows.Scan(&ipStr); err != nil { + return nil, err + } + addr, err := netip.ParseAddr(ipStr) + if err != nil { + // Skip rows that can't be parsed rather than failing the whole scan. + continue + } + addrs = append(addrs, addr) + } + if err := rows.Err(); err != nil { + return nil, err + } + + return addrs, nil +} + func (s *Subnet) Release(prefix netip.Prefix) error { return s.ReleaseAddr(prefix.Addr()) } diff --git a/pkg/netdb/netdb_test.go b/pkg/netdb/netdb_test.go index 168b1240f..907fde39e 100644 --- a/pkg/netdb/netdb_test.go +++ b/pkg/netdb/netdb_test.go @@ -4,6 +4,7 @@ import ( "fmt" "net/netip" "path/filepath" + "sync" "testing" "time" @@ -269,6 +270,91 @@ func TestNetDB(t *testing.T) { r.Equal("rt1", iface3) }) + t.Run("ReservedAddrs returns only currently reserved addresses", func(t *testing.T) { + r := require.New(t) + + n, err := New(filepath.Join(t.TempDir(), "net.db")) + r.NoError(err) + + subnet, err := n.Subnet("172.16.8.0/24") + r.NoError(err) + + ip1, err := subnet.Reserve() + r.NoError(err) + ip2, err := subnet.Reserve() + r.NoError(err) + ip3, err := subnet.Reserve() + r.NoError(err) + + // Release the middle one; it should drop out of the reserved set. + r.NoError(subnet.Release(ip2)) + + reserved, err := subnet.ReservedAddrs() + r.NoError(err) + + got := make(map[string]bool) + for _, a := range reserved { + got[a.String()] = true + } + r.True(got[ip1.Addr().String()]) + r.False(got[ip2.Addr().String()], "released address must not be reported as reserved") + r.True(got[ip3.Addr().String()]) + r.Len(reserved, 2) + }) + + t.Run("concurrent Reserve never hands out a duplicate address", func(t *testing.T) { + r := require.New(t) + + n, err := New(filepath.Join(t.TempDir(), "net.db")) + r.NoError(err) + + // Each goroutine fetches its own Subnet handle from the same NetDB — + // Subnet returns a fresh struct (and mutex) per call, so this exercises + // the SQLite-level uniqueness guarantee, not just the per-handle mutex. + const workers = 32 + + var ( + mu sync.Mutex + seen = make(map[string]bool) + dup string + wg sync.WaitGroup + ) + + wg.Add(workers) + for i := 0; i < workers; i++ { + go func() { + defer wg.Done() + + subnet, err := n.Subnet("172.16.8.0/24") + if err != nil { + return + } + // Retry past transient SQLite busy errors; the property under + // test is uniqueness of successful reservations, not liveness. + var ip netip.Prefix + for attempt := 0; attempt < 100; attempt++ { + ip, err = subnet.Reserve() + if err == nil { + break + } + } + if err != nil { + return + } + + mu.Lock() + if seen[ip.Addr().String()] { + dup = ip.Addr().String() + } + seen[ip.Addr().String()] = true + mu.Unlock() + }() + } + wg.Wait() + + r.Empty(dup, "the same address was reserved by two workers") + }) + t.Run("persists and retrieves leased subnet", func(t *testing.T) { r := require.New(t) From 01d7300304faefbd60c8d1e3865d121e94f4d472 Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Thu, 11 Jun 2026 21:12:54 -0700 Subject: [PATCH 2/2] sandbox: address PR review feedback on duplicate-IP fix - ipreconciler: cap the miss counter on a failed ReleaseAddr so a persistently failing release retries every cycle without the counter (and map entry) growing without bound. - watchdog: only re-fetch a sandbox entity when the node index has no record of it; when the index returned it but it is DEAD past grace, that verdict is authoritative, so skip the redundant per-cycle RPC. - sandbox: rely on the IPReconciler's default CheckInterval instead of duplicating the 5-minute value; document that liveBridgeIPs does not filter on task state and why that is conservative. - network: test the allocBridgeMaxAttempts exhaustion path returns a clear error. --- controllers/sandbox/ipreconciler.go | 4 ++++ controllers/sandbox/sandbox.go | 17 +++++++++------ controllers/sandbox/sandbox_frozen_test.go | 2 +- controllers/sandbox/watchdog.go | 24 +++++++++++++++------- network/config_test.go | 23 +++++++++++++++++++++ 5 files changed, 56 insertions(+), 14 deletions(-) diff --git a/controllers/sandbox/ipreconciler.go b/controllers/sandbox/ipreconciler.go index d53695215..f011e47b2 100644 --- a/controllers/sandbox/ipreconciler.go +++ b/controllers/sandbox/ipreconciler.go @@ -142,6 +142,10 @@ func (r *IPReconciler) reconcile(ctx context.Context) error { if err := r.Subnet.ReleaseAddr(addr); err != nil { r.Log.Error("ip reconciler failed to release leaked address", "addr", addr, "error", err) + // Cap the counter so a persistently failing release retries every + // cycle without letting the miss count (and the map entry) grow + // without bound. + r.misses[addr] = r.ReleaseAfterMisses continue } delete(r.misses, addr) diff --git a/controllers/sandbox/sandbox.go b/controllers/sandbox/sandbox.go index 0c4867aa9..7eff80648 100644 --- a/controllers/sandbox/sandbox.go +++ b/controllers/sandbox/sandbox.go @@ -566,7 +566,7 @@ func (c *SandboxController) Init(ctx context.Context) error { LiveIPs: func(ctx context.Context) (map[netip.Addr]bool, error) { return c.liveBridgeIPs(ctx) }, - CheckInterval: 5 * time.Minute, + // CheckInterval is left at the IPReconciler default (see Start). } c.ipReconciler.Start(c.topCtx) } @@ -1389,11 +1389,16 @@ func (c *SandboxController) AllocateNetwork( } // liveBridgeIPs returns the set of bridge IP addresses currently assigned to -// running sandbox containers, read from their containerd labels. It is the -// authoritative in-use set used by AllocateOnBridge to avoid handing out an -// address that netdb's lease bookkeeping has lost track of but a live sandbox is -// still using (MIR-1238). The labels are the same source the watchdog trusts and -// require no netns entry. +// sandbox containers, read from their containerd labels. It is the in-use set +// used by AllocateOnBridge to avoid handing out an address that netdb's lease +// bookkeeping has lost track of but a sandbox is still using (MIR-1238). The +// labels are the same source the watchdog trusts and require no netns entry. +// +// It does not inspect task state, so a container whose task has exited but whose +// containerd record still exists (e.g. between SIGKILL and removeContainer) +// still contributes its IP. This is deliberately conservative: counting a +// not-quite-gone address as live at worst keeps it reserved for one extra +// reconciler cycle, whereas missing a live address risks a duplicate assignment. func (c *SandboxController) liveBridgeIPs(ctx context.Context) (map[netip.Addr]bool, error) { ctx = namespaces.WithNamespace(ctx, c.Namespace) diff --git a/controllers/sandbox/sandbox_frozen_test.go b/controllers/sandbox/sandbox_frozen_test.go index 2ce60c0ac..30f312e4b 100644 --- a/controllers/sandbox/sandbox_frozen_test.go +++ b/controllers/sandbox/sandbox_frozen_test.go @@ -24,7 +24,7 @@ import ( // sha256sum controllers/sandbox/sandbox.go controllers/sandbox/volume.go controllers/sandbox/firewall.go func TestSandboxControllerFrozen(t *testing.T) { frozen := map[string]string{ - "sandbox.go": "8fea7bd970871a0f2b2c4759296822c98473ca13b93ba38e6244c438c9224e7a", + "sandbox.go": "9568271c07785d6b3fd15fe4ac6956a811e9944e7959e8787e35ebe9addcf4db", "volume.go": "b4697764d48a90adc04ce47968ccef11ceba50da8d19c889906c5c3a539065b3", "firewall.go": "648cb5d91091d5eb7400152b19695a8045585feae59c5dd36c12d663a27bb91f", } diff --git a/controllers/sandbox/watchdog.go b/controllers/sandbox/watchdog.go index 94f4eae47..7254208b0 100644 --- a/controllers/sandbox/watchdog.go +++ b/controllers/sandbox/watchdog.go @@ -129,8 +129,13 @@ func (w *ContainerWatchdog) CleanupOrphanedContainers(ctx context.Context) (*Cle return result, fmt.Errorf("failed to list containers: %w", err) } - // Build a set of valid container IDs from sandboxes scheduled to this node. + // Build a set of valid container IDs from sandboxes scheduled to this node, + // plus the set of sandbox IDs the index returned at all (valid or expired) so + // we can tell "the index has no record of this sandbox" (which warrants a + // direct re-fetch) apart from "the index returned it but it is past grace" + // (already authoritative — no re-fetch needed). validContainers := make(map[string]bool) + indexedSandboxes := make(map[string]bool) resp, err := w.EAC.List(cleanupCtx, compute.Index(compute.KindSandbox, entity.Id("node/"+w.NodeId))) if err != nil { @@ -149,6 +154,8 @@ func (w *ContainerWatchdog) CleanupOrphanedContainers(ctx context.Context) (*Cle ent := e.Entity() + indexedSandboxes[sb.ID.String()] = true + // Consider all non DEAD sandboes as valid. isRunning := sb.Status != compute.DEAD @@ -237,14 +244,17 @@ func (w *ContainerWatchdog) CleanupOrphanedContainers(ctx context.Context) (*Cle continue } - // The node-scoped list did not include this container's sandbox. That - // list is built from a node index that can transiently lag; before - // treating the container as orphaned, re-fetch the sandbox entity - // directly by ID — a linearizable key lookup, not an index read. If the + // If the index returned this sandbox at all, it was already evaluated + // above and found not valid (DEAD past the grace window) — that verdict + // is authoritative, so reclaim it without a redundant re-fetch. + // + // If the index has no record of it, the node index may simply be lagging. + // Re-fetch the sandbox entity directly by ID — a linearizable key lookup, + // not an index read — before treating the container as orphaned. If the // sandbox still exists and is not DEAD past the grace window, the index // merely missed it: never kill it or release its IP (MIR-1238). - if w.sandboxStillValid(cleanupCtx, sc.sandboxID, now, graceWindow) { - w.Log.Warn("watchdog: container missing from node list but sandbox entity is still valid; skipping (stale index)", + if !indexedSandboxes[sc.sandboxID] && w.sandboxStillValid(cleanupCtx, sc.sandboxID, now, graceWindow) { + w.Log.Warn("watchdog: container missing from node index but sandbox entity is still valid; skipping (stale index)", "id", containerID, "sandbox_id", sc.sandboxID) continue } diff --git a/network/config_test.go b/network/config_test.go index 9ddde26bf..89c94d6f7 100644 --- a/network/config_test.go +++ b/network/config_test.go @@ -46,6 +46,29 @@ func TestReserveConflictFree(t *testing.T) { r.True(got["10.8.64.2"], "conflicting address must remain quarantined as reserved") }) + t.Run("errors when every candidate up to the attempt limit is live", func(t *testing.T) { + r := require.New(t) + + n, err := netdb.New(filepath.Join(t.TempDir(), "net.db")) + r.NoError(err) + subnet, err := n.Subnet("10.8.64.0/24") + r.NoError(err) + + // Mark the first allocBridgeMaxAttempts host addresses (.2 .. ) as live, + // so every reservation the loop makes collides and it gives up. + live := make(map[netip.Addr]bool) + addr := netip.MustParseAddr("10.8.64.2") + for i := 0; i < allocBridgeMaxAttempts; i++ { + live[addr] = true + addr = addr.Next() + } + + ep, err := reserveConflictFree(discardLogger(), "rt0", subnet, live) + r.Error(err, "should fail rather than return a live address") + r.False(ep.IsValid()) + r.Contains(err.Error(), "no conflict-free address") + }) + t.Run("returns the first free address when nothing is live", func(t *testing.T) { r := require.New(t)