Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions controllers/sandbox/ipreconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package sandbox

import (
"context"
"fmt"
"log/slog"
"net/netip"
"time"

"miren.dev/runtime/pkg/netdb"
)

// IPReconciler keeps the netdb IP lease bookkeeping in agreement with the
// addresses actually live on the bridge. The two can diverge — a release path
// that ran while a container kept running, a lease lost across a restart — and a
// divergence where netdb believes a live address is free leads to that address
// being handed to a second sandbox, a duplicate assignment (MIR-1238).
//
// Each cycle it:
// - re-reserves any address that is live on the bridge but not reserved in
// netdb (the repair direction — always safe, only ever adds a reservation
// for an address already in use); and
// - releases any address reserved in netdb with no live owner, but only after
// it has been absent for several consecutive cycles (the reap direction —
// conservative, to avoid racing a sandbox that is mid-create or mid-teardown).
type IPReconciler struct {
Log *slog.Logger
Subnet *netdb.Subnet

// LiveIPs returns the set of addresses currently assigned to running sandbox
// containers, determined independently of netdb.
LiveIPs func(ctx context.Context) (map[netip.Addr]bool, error)

// CheckInterval is how often a reconcile cycle runs.
CheckInterval time.Duration
// ReleaseAfterMisses is how many consecutive cycles an address must be
// reserved-but-not-live before it is released. With the default interval this
// is a multi-minute grace window, comfortably longer than any normal sandbox
// create (whose saga releases the reservation on failure anyway).
ReleaseAfterMisses int

misses map[netip.Addr]int
cancel context.CancelFunc
}

// Start begins the periodic reconcile loop.
func (r *IPReconciler) Start(ctx context.Context) {
if r.CheckInterval == 0 {
r.CheckInterval = 5 * time.Minute
}
if r.ReleaseAfterMisses == 0 {
r.ReleaseAfterMisses = 3
}
r.misses = make(map[netip.Addr]int)

r.Log.Info("starting ip reconciler",
"check_interval", r.CheckInterval,
"release_after_misses", r.ReleaseAfterMisses)

ctx, r.cancel = context.WithCancel(ctx)
go r.monitor(ctx)
}

// Stop gracefully stops the reconciler.
func (r *IPReconciler) Stop() {
if r.cancel != nil {
r.cancel()
}
}

func (r *IPReconciler) monitor(ctx context.Context) {
ticker := time.NewTicker(r.CheckInterval)
defer ticker.Stop()

// Run an initial reconcile on startup. This also serves boot reconciliation:
// it re-reserves the IPs of every container that survived a restart, using the
// live containers as the source of truth rather than the entity store.
if err := r.reconcile(ctx); err != nil {
r.Log.Error("initial ip reconcile failed", "error", err)
}

for {
select {
case <-ticker.C:
if err := r.reconcile(ctx); err != nil {
r.Log.Error("ip reconcile failed", "error", err)
}
case <-ctx.Done():
r.Log.Info("ip reconciler stopped")
return
}
}
}

func (r *IPReconciler) reconcile(ctx context.Context) error {
if r.Subnet == nil || r.LiveIPs == nil {
return nil
}

live, err := r.LiveIPs(ctx)
if err != nil {
return fmt.Errorf("enumerating live IPs: %w", err)
}

reserved, err := r.Subnet.ReservedAddrs()
if err != nil {
return fmt.Errorf("listing reserved addresses: %w", err)
}
reservedSet := make(map[netip.Addr]bool, len(reserved))
for _, addr := range reserved {
reservedSet[addr] = true
}

// Repair: an address live on the bridge but not reserved in netdb is the
// exact divergence behind MIR-1238. Re-reserve it so it can never be handed
// to a second sandbox.
for addr := range live {
if reservedSet[addr] {
continue
}
if err := r.Subnet.ReserveSpecificAddr(addr); err != nil {
r.Log.Error("ip reconciler failed to re-reserve live address", "addr", addr, "error", err)
continue
}
r.Log.Warn("ip reconciler re-reserved a live address netdb had lost", "addr", addr)
}

// Reap: an address reserved in netdb with no live owner is a leaked lease.
// Release it only after it has been absent for ReleaseAfterMisses consecutive
// cycles, so a sandbox that has reserved its address but not yet booted its
// container (or is mid-teardown) is not disturbed.
for addr := range reservedSet {
if live[addr] {
delete(r.misses, addr)
continue
}

r.misses[addr]++
if r.misses[addr] < r.ReleaseAfterMisses {
continue
}

if err := r.Subnet.ReleaseAddr(addr); err != nil {
r.Log.Error("ip reconciler failed to release leaked address", "addr", addr, "error", err)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When ReleaseAddr fails, r.misses[addr] is left at the threshold value. On the next cycle, r.misses[addr]++ pushes it above the threshold and another release is attempted — which is probably the desired retry behaviour. However, the miss counter for this address will never be cleaned up by the guard block below (lines 154–158) as long as the address stays reserved, so it grows unboundedly on each failed cycle. Consider either capping the counter at ReleaseAfterMisses after a failure, or adding a comment explaining the retry-forever intent.

// Cap the counter so a persistently failing release retries every
// cycle without letting the miss count (and the map entry) grow
// without bound.
r.misses[addr] = r.ReleaseAfterMisses
continue
}
delete(r.misses, addr)
r.Log.Warn("ip reconciler released leaked reservation with no live owner",
"addr", addr, "after_misses", r.ReleaseAfterMisses)
}

// Forget miss counters for addresses that are no longer reserved (e.g. they
// were released elsewhere) so the map cannot grow without bound.
for addr := range r.misses {
if !reservedSet[addr] {
delete(r.misses, addr)
}
}

return nil
}
100 changes: 100 additions & 0 deletions controllers/sandbox/ipreconciler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package sandbox

import (
"context"
"io"
"log/slog"
"net/netip"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"

"miren.dev/runtime/pkg/netdb"
)

func TestIPReconciler(t *testing.T) {
newSubnet := func(t *testing.T) *netdb.Subnet {
n, err := netdb.New(filepath.Join(t.TempDir(), "net.db"))
require.NoError(t, err)
s, err := n.Subnet("10.8.64.0/24")
require.NoError(t, err)
return s
}
log := slog.New(slog.NewTextHandler(io.Discard, nil))

reservedSet := func(t *testing.T, s *netdb.Subnet) map[string]bool {
reserved, err := s.ReservedAddrs()
require.NoError(t, err)
got := make(map[string]bool, len(reserved))
for _, a := range reserved {
got[a.String()] = true
}
return got
}

t.Run("re-reserves a live address netdb lost", func(t *testing.T) {
r := require.New(t)
subnet := newSubnet(t)

live := map[netip.Addr]bool{netip.MustParseAddr("10.8.64.5"): true}
rec := &IPReconciler{
Log: log,
Subnet: subnet,
LiveIPs: func(context.Context) (map[netip.Addr]bool, error) { return live, nil },
ReleaseAfterMisses: 2,
misses: make(map[netip.Addr]int),
}

r.NoError(rec.reconcile(context.Background()))

r.True(reservedSet(t, subnet)["10.8.64.5"], "live-but-unreserved address should be re-reserved")
})

t.Run("reaps a leaked reservation only after the miss threshold", func(t *testing.T) {
r := require.New(t)
subnet := newSubnet(t)

_, err := subnet.Reserve() // .2 reserved, but never live
r.NoError(err)

rec := &IPReconciler{
Log: log,
Subnet: subnet,
LiveIPs: func(context.Context) (map[netip.Addr]bool, error) {
return map[netip.Addr]bool{}, nil // nothing live
},
ReleaseAfterMisses: 2,
misses: make(map[netip.Addr]int),
}

// First cycle: miss = 1 (< threshold), still reserved.
r.NoError(rec.reconcile(context.Background()))
r.Len(reservedSet(t, subnet), 1, "should not reap on the first miss")

// Second cycle: miss = 2 (== threshold), released.
r.NoError(rec.reconcile(context.Background()))
r.Empty(reservedSet(t, subnet), "should reap after reaching the miss threshold")
})

t.Run("never reaps an address that is live", func(t *testing.T) {
r := require.New(t)
subnet := newSubnet(t)

ip, err := subnet.Reserve() // .2
r.NoError(err)

live := map[netip.Addr]bool{ip.Addr(): true}
rec := &IPReconciler{
Log: log,
Subnet: subnet,
LiveIPs: func(context.Context) (map[netip.Addr]bool, error) { return live, nil },
ReleaseAfterMisses: 1,
misses: make(map[netip.Addr]int),
}

r.NoError(rec.reconcile(context.Background()))
r.NoError(rec.reconcile(context.Background()))
r.Len(reservedSet(t, subnet), 1, "a live address must never be reaped")
})
}
66 changes: 65 additions & 1 deletion controllers/sandbox/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type SandboxController struct {

watchdog *ContainerWatchdog
imageWatchdog *ImageWatchdog
ipReconciler *IPReconciler

// writeTracker tracks entity write revisions to skip self-generated watch events
writeTracker controller.WriteTracker
Expand Down Expand Up @@ -555,6 +556,21 @@ func (c *SandboxController) Init(ctx context.Context) error {
}
c.imageWatchdog.Start(c.topCtx)

// Initialize and start the IP reconciler, which keeps netdb lease bookkeeping
// in agreement with the addresses actually live on the bridge (MIR-1238). Its
// initial run also re-reserves the IPs of containers that survived a restart.
if c.Subnet != nil {
c.ipReconciler = &IPReconciler{
Log: c.Log.With("module", "ip-reconciler"),
Subnet: c.Subnet,
LiveIPs: func(ctx context.Context) (map[netip.Addr]bool, error) {
return c.liveBridgeIPs(ctx)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CheckInterval: 5 * time.Minute is already the default applied in IPReconciler.Start() when CheckInterval == 0. Setting it here makes the two places diverge silently when one is changed. Either remove the explicit assignment here and rely on the default, or add a named constant that both sites share.

},
// CheckInterval is left at the IPReconciler default (see Start).
}
c.ipReconciler.Start(c.topCtx)
}

// Start workload identity token refresh loop and token request server
// (tokenRefresher and tokenSecrets were created earlier, before reconcile)
if c.WorkloadIssuer != nil {
Expand Down Expand Up @@ -592,6 +608,10 @@ func (c *SandboxController) Close() error {
c.imageWatchdog.Stop()
}

if c.ipReconciler != nil {
c.ipReconciler.Stop()
}

c.running.Wait()

// Shutdown DNS and other network services
Expand Down Expand Up @@ -1350,7 +1370,9 @@ func (c *SandboxController) AllocateNetwork(
}

} else {
ep, err = network.AllocateOnBridge(c.Bridge, c.Subnet)
ep, err = network.AllocateOnBridge(c.Log, c.Bridge, c.Subnet, func() (map[netip.Addr]bool, error) {
return c.liveBridgeIPs(ctx)
})
if err != nil {
return nil, err
}
Expand All @@ -1366,6 +1388,48 @@ func (c *SandboxController) AllocateNetwork(
return ep, nil
}

// liveBridgeIPs returns the set of bridge IP addresses currently assigned to
// sandbox containers, read from their containerd labels. It is the in-use set
// used by AllocateOnBridge to avoid handing out an address that netdb's lease
// bookkeeping has lost track of but a sandbox is still using (MIR-1238). The
// labels are the same source the watchdog trusts and require no netns entry.
//
// It does not inspect task state, so a container whose task has exited but whose
// containerd record still exists (e.g. between SIGKILL and removeContainer)
// still contributes its IP. This is deliberately conservative: counting a
// not-quite-gone address as live at worst keeps it reserved for one extra
// reconciler cycle, whereas missing a live address risks a duplicate assignment.
func (c *SandboxController) liveBridgeIPs(ctx context.Context) (map[netip.Addr]bool, error) {
ctx = namespaces.WithNamespace(ctx, c.Namespace)

containerList, err := c.CC.Containers(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list containers: %w", err)
}

live := make(map[netip.Addr]bool)
for _, cont := range containerList {
labels, err := cont.Labels(ctx)
if err != nil {
c.Log.Warn("failed to read container labels while enumerating live IPs", "id", cont.ID(), "error", err)
continue
}
for label, value := range labels {
if !strings.HasPrefix(label, "runtime.computer/ip") {
continue
}
addr, err := netip.ParseAddr(value)
if err != nil {
c.Log.Warn("failed to parse IP from container label", "id", cont.ID(), "label", label, "value", value, "error", err)
continue
}
live[addr] = true
}
}

return live, nil
}

func (c *SandboxController) setupHosts(sb *compute.Sandbox, name string, ep *network.EndpointConfig) error {
if ep == nil || len(ep.Addresses) == 0 {
return fmt.Errorf("no addresses allocated for sandbox %s", sb.ID)
Expand Down
2 changes: 1 addition & 1 deletion controllers/sandbox/sandbox_frozen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
// sha256sum controllers/sandbox/sandbox.go controllers/sandbox/volume.go controllers/sandbox/firewall.go
func TestSandboxControllerFrozen(t *testing.T) {
frozen := map[string]string{
"sandbox.go": "2cb139828e42cae2a41459c3e8a699dfdcd60e9e30d191abcbc4a2c432497c9e",
"sandbox.go": "9568271c07785d6b3fd15fe4ac6956a811e9944e7959e8787e35ebe9addcf4db",
"volume.go": "b4697764d48a90adc04ce47968ccef11ceba50da8d19c889906c5c3a539065b3",
"firewall.go": "648cb5d91091d5eb7400152b19695a8045585feae59c5dd36c12d663a27bb91f",
}
Expand Down
Loading
Loading