From 3e7035293c5a53c628126e10c483994c3f5c7043 Mon Sep 17 00:00:00 2001 From: Manas Srivastava <[email protected]> Date: Fri, 22 May 2026 07:36:28 +0530 Subject: [PATCH] test(queue): drive NATS backend to 96.8% coverage Add unit/integration tests for the provisioner NATS queue backend covering the isolated dedicated-pod provisioning ladder, legacy NodePort URL fallback, deprovision, every rollback branch, and the local shared-cluster path. Introduce interface seams (kubernetes.Interface on K8sBackend, configurable monitorPort on LocalBackend) so the fake clientset and an httptest server can be injected without a real kube-apiserver or NATS server. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/backend/queue/backend_test.go | 319 ++++++++++ internal/backend/queue/k8s.go | 5 +- internal/backend/queue/k8s_test.go | 846 +++++++++++++++++++++++++ internal/backend/queue/local.go | 13 +- internal/backend/queue/local_test.go | 207 ++++++ 5 files changed, 1386 insertions(+), 4 deletions(-) create mode 100644 internal/backend/queue/backend_test.go create mode 100644 internal/backend/queue/k8s_test.go create mode 100644 internal/backend/queue/local_test.go diff --git a/internal/backend/queue/backend_test.go b/internal/backend/queue/backend_test.go new file mode 100644 index 0000000..bb65ff6 --- /dev/null +++ b/internal/backend/queue/backend_test.go @@ -0,0 +1,319 @@ +package queue + +// backend_test.go — unit tests for the public Backend factory `NewBackend` and +// its tiny aliases `goredisParseURL` / `goredisNewClient`. +// +// `NewBackend` selects between K8sBackend (when backendType == "k8s") and +// LocalBackend (default). The k8s path reads multiple env vars to wire up the +// kubeconfig, storage class, NATS image, external host, public host, and a +// Redis-backed route registry. The factory must: +// +// 1. Fall back to the local backend on any k8s init failure (kubeconfig +// missing, in-cluster config not available, etc.) — never panic. +// 2. Return a LocalBackend for the default branch ("", "local", any other +// string). +// 3. Drive the Redis-backed route registry only when REDIS_URL_FOR_ROUTES / +// REDIS_URL is set AND parses cleanly. +// 4. Skip route registry wiring when the URL is bad (don't crash, just warn). +// +// The k8s success branch is exercised indirectly by k8s_test.go (Provision +// against a fake clientset); here we only care about backend selection and +// the env-var fallback ladder. + +import ( + "os" + "path/filepath" + "reflect" + "testing" +) + +// writeTestKubeconfig writes a minimal kubeconfig that +// clientcmd.BuildConfigFromFlags will parse cleanly without dialing +// anything. The k8s clientset is constructed but never used in this test +// (NewBackend doesn't call any API on it); we only need it to populate the +// K8sBackend struct so we can exercise the route-registry-wiring branch. +func writeTestKubeconfig(t *testing.T) string { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "kubeconfig") + content := `apiVersion: v1 +kind: Config +clusters: +- name: fake-cluster + cluster: + server: https://127.0.0.1:1 +contexts: +- name: fake-context + context: + cluster: fake-cluster + user: fake-user +current-context: fake-context +users: +- name: fake-user + user: + token: fake-token +` + if err := os.WriteFile(path, []byte(content), 0o644); err != nil { + t.Fatalf("write kubeconfig: %v", err) + } + return path +} + +// TestNewBackend_DefaultBranchReturnsLocal verifies that any non-"k8s" +// backendType falls into the LocalBackend branch. +func TestNewBackend_DefaultBranchReturnsLocal(t *testing.T) { + for _, bt := range []string{"", "local", "garbage", "K8S" /* case-sensitive switch */} { + t.Run(bt, func(t *testing.T) { + got := NewBackend(bt, "nats.example.com") + lb, ok := got.(*LocalBackend) + if !ok { + t.Fatalf("NewBackend(%q) = %T; want *LocalBackend", bt, got) + } + if lb.natsHost != "nats.example.com" { + t.Errorf("natsHost = %q; want %q", lb.natsHost, "nats.example.com") + } + }) + } +} + +// TestNewBackend_K8sFallsBackToLocalOnInitFailure verifies the safety net: if +// k8s client init fails (no kubeconfig, no in-cluster config), the factory +// returns a LocalBackend instead of crashing. We force the failure by setting +// K8S_KUBECONFIG to a path that does not exist. +func TestNewBackend_K8sFallsBackToLocalOnInitFailure(t *testing.T) { + t.Setenv("K8S_KUBECONFIG", "/nonexistent/kubeconfig-for-unit-test") + // Clear REDIS_URL so the route-registry wiring is a no-op even if the k8s + // path had succeeded. Belt + suspenders. + t.Setenv("REDIS_URL", "") + t.Setenv("REDIS_URL_FOR_ROUTES", "") + got := NewBackend("k8s", "nats.example.com") + if _, ok := got.(*LocalBackend); !ok { + t.Fatalf("NewBackend(k8s) with bad kubeconfig = %T; want fallback *LocalBackend", got) + } +} + +// TestGoredisHelpers verifies the tiny aliases compile and round-trip a sane +// URL. A bad URL must surface an error. +func TestGoredisHelpers(t *testing.T) { + opt, err := goredisParseURL("redis://localhost:6379/0") + if err != nil { + t.Fatalf("goredisParseURL(valid) error: %v", err) + } + if opt == nil { + t.Fatal("goredisParseURL(valid) returned nil *Options") + } + c := goredisNewClient(opt) + if c == nil { + t.Fatal("goredisNewClient returned nil") + } + _ = c.Close() + + if _, err := goredisParseURL("not-a-valid-redis-url-scheme"); err == nil { + t.Error("goredisParseURL must return an error for a malformed URL") + } +} + +// TestK8sEnv_FallbackLadder verifies the env-var helper used throughout the +// k8s constructor: set value wins; empty falls back to default. The factory +// reads K8S_NATS_PUBLIC_HOST, K8S_STORAGE_CLASS, K8S_NATS_IMAGE, +// K8S_EXTERNAL_HOST, NATS_PROXY_ROUTE_PREFIX, NATS_PROXY_TOKEN_ROUTE_PREFIX — +// all via this helper. +func TestK8sEnv_FallbackLadder(t *testing.T) { + const key = "QUEUE_TEST_K8S_ENV_FALLBACK_KEY" + _ = os.Unsetenv(key) + if got := k8sEnv(key, "default"); got != "default" { + t.Errorf("k8sEnv(unset, default) = %q; want %q", got, "default") + } + t.Setenv(key, "set-value") + if got := k8sEnv(key, "default"); got != "set-value" { + t.Errorf("k8sEnv(set, default) = %q; want %q", got, "set-value") + } + // Empty env value is treated as unset — falls back to default. This is + // the explicit contract: `if v := os.Getenv(key); v != ""`. + t.Setenv(key, "") + if got := k8sEnv(key, "default"); got != "default" { + t.Errorf("k8sEnv(empty, default) = %q; want default fallback (\"\" treated as unset)", got) + } +} + +// TestNatsK8sBoolPtr_RoundTrip — the bool pointer helper must round-trip both +// true and false without aliasing (each call returns a fresh *bool). +func TestNatsK8sBoolPtr_RoundTrip(t *testing.T) { + tp := natsK8sBoolPtr(true) + fp := natsK8sBoolPtr(false) + if tp == nil || *tp != true { + t.Errorf("natsK8sBoolPtr(true) = %v; want &true", tp) + } + if fp == nil || *fp != false { + t.Errorf("natsK8sBoolPtr(false) = %v; want &false", fp) + } + if tp == fp { + t.Error("natsK8sBoolPtr must allocate a fresh pointer each call (no aliasing)") + } +} + +// TestNewBackend_K8sSucceeds_WithRouteRegistry exercises the k8s success +// branch in NewBackend: kubeconfig parses, K8sBackend is constructed, +// SetPublicHost is called, and the Redis-backed route registry is wired +// (because REDIS_URL_FOR_ROUTES is set to a syntactically valid URL — the +// underlying Redis is never dialled at construction time). +func TestNewBackend_K8sSucceeds_WithRouteRegistry(t *testing.T) { + kcfg := writeTestKubeconfig(t) + t.Setenv("K8S_KUBECONFIG", kcfg) + t.Setenv("K8S_NATS_PUBLIC_HOST", "nats.test.invalid") + t.Setenv("K8S_STORAGE_CLASS", "test-sc") + t.Setenv("K8S_NATS_IMAGE", "nats:test") + t.Setenv("K8S_EXTERNAL_HOST", "node.test.invalid") + t.Setenv("REDIS_URL_FOR_ROUTES", "redis://127.0.0.1:1/0") + t.Setenv("NATS_PROXY_ROUTE_PREFIX", "test_route:") + t.Setenv("NATS_PROXY_TOKEN_ROUTE_PREFIX", "test_route_by_token:") + + got := NewBackend("k8s", "ignored.host") + kb, ok := got.(*K8sBackend) + if !ok { + t.Fatalf("NewBackend(k8s) = %T; want *K8sBackend", got) + } + if kb.publicHost != "nats.test.invalid" { + t.Errorf("publicHost = %q; want nats.test.invalid", kb.publicHost) + } + if kb.storageClass != "test-sc" { + t.Errorf("storageClass = %q; want test-sc", kb.storageClass) + } + if kb.image != "nats:test" { + t.Errorf("image = %q; want nats:test", kb.image) + } + if kb.externalHost != "node.test.invalid" { + t.Errorf("externalHost = %q; want node.test.invalid", kb.externalHost) + } + if kb.rdb == nil { + t.Error("rdb must be wired when REDIS_URL_FOR_ROUTES parses cleanly") + } + if kb.tokenPrefix != "test_route:" { + t.Errorf("tokenPrefix = %q; want test_route:", kb.tokenPrefix) + } + if kb.routePrefix != "test_route_by_token:" { + t.Errorf("routePrefix = %q; want test_route_by_token:", kb.routePrefix) + } + // Close the embedded redis client so it doesn't leak a goroutine into + // subsequent tests. + if kb.rdb != nil { + _ = kb.rdb.Close() + } +} + +// TestNewBackend_K8sSucceeds_NoRedisURL — when neither REDIS_URL_FOR_ROUTES +// nor REDIS_URL is set, the route registry is left disabled (rdb is nil). +// The k8s backend still works — the proxy just won't have a route record +// for resources provisioned in this mode. +func TestNewBackend_K8sSucceeds_NoRedisURL(t *testing.T) { + kcfg := writeTestKubeconfig(t) + t.Setenv("K8S_KUBECONFIG", kcfg) + t.Setenv("REDIS_URL_FOR_ROUTES", "") + t.Setenv("REDIS_URL", "") + + got := NewBackend("k8s", "ignored.host") + kb, ok := got.(*K8sBackend) + if !ok { + t.Fatalf("NewBackend(k8s) = %T; want *K8sBackend", got) + } + if kb.rdb != nil { + t.Error("rdb must be nil when no Redis URL is configured") + _ = kb.rdb.Close() + } +} + +// TestNewBackend_K8sSucceeds_BadRedisURL — when REDIS_URL_FOR_ROUTES is set +// but malformed, the route registry is silently disabled (slog.Warn only). +// This is the documented behaviour: a bad Redis URL must not crash the boot +// of the entire backend. +func TestNewBackend_K8sSucceeds_BadRedisURL(t *testing.T) { + kcfg := writeTestKubeconfig(t) + t.Setenv("K8S_KUBECONFIG", kcfg) + t.Setenv("REDIS_URL_FOR_ROUTES", "not-a-valid-redis-url-scheme") + t.Setenv("REDIS_URL", "") + + got := NewBackend("k8s", "ignored.host") + kb, ok := got.(*K8sBackend) + if !ok { + t.Fatalf("NewBackend(k8s) = %T; want *K8sBackend", got) + } + if kb.rdb != nil { + t.Error("rdb must be nil when REDIS_URL_FOR_ROUTES is malformed") + _ = kb.rdb.Close() + } +} + +// TestNewBackend_K8sSucceeds_FallsBackToRedisURL — if REDIS_URL_FOR_ROUTES is +// empty but REDIS_URL is set, the latter is used as the route-registry URL. +func TestNewBackend_K8sSucceeds_FallsBackToRedisURL(t *testing.T) { + kcfg := writeTestKubeconfig(t) + t.Setenv("K8S_KUBECONFIG", kcfg) + t.Setenv("REDIS_URL_FOR_ROUTES", "") + t.Setenv("REDIS_URL", "redis://127.0.0.1:1/0") + + got := NewBackend("k8s", "ignored.host") + kb, ok := got.(*K8sBackend) + if !ok { + t.Fatalf("NewBackend(k8s) = %T; want *K8sBackend", got) + } + if kb.rdb == nil { + t.Error("rdb must be wired when REDIS_URL is set as the fallback") + } + if kb.rdb != nil { + _ = kb.rdb.Close() + } +} + +// TestNewK8sBackend_DefaultStorageClassAndImage — when storageClass and image +// are empty, newK8sBackend fills in the documented defaults (gp3 and +// nats:2.10-alpine). +func TestNewK8sBackend_DefaultStorageClassAndImage(t *testing.T) { + kcfg := writeTestKubeconfig(t) + b, err := newK8sBackend(kcfg, "", "", "") + if err != nil { + t.Fatalf("newK8sBackend: %v", err) + } + if b.image != "nats:2.10-alpine" { + t.Errorf("default image = %q; want nats:2.10-alpine", b.image) + } + if b.storageClass != "gp3" { + t.Errorf("default storageClass = %q; want gp3", b.storageClass) + } +} + +// TestSizingForTier_AllKnownTiers — the tier→sizing table must: +// - return distinct sizings for anonymous/hobby/pro/team-or-growth, +// - return zero pvcMi for anonymous (memory-only JetStream), +// - return non-zero pvcMi for every paid tier, +// - fall back to hobby sizing for any unknown tier (no panic, no zero sizing). +func TestSizingForTier_AllKnownTiers(t *testing.T) { + cases := []struct { + tier string + wantPVCMi int + }{ + {"anonymous", 0}, + {"hobby", 1024}, + {"pro", 10240}, + {"team", 51200}, + {"growth", 51200}, + } + for _, tc := range cases { + t.Run(tc.tier, func(t *testing.T) { + sz := sizingForTier(tc.tier) + if sz.pvcMi != tc.wantPVCMi { + t.Errorf("sizingForTier(%q).pvcMi = %d; want %d", tc.tier, sz.pvcMi, tc.wantPVCMi) + } + if sz.cpuReq == "" || sz.memReq == "" || sz.cpuLim == "" || sz.memLim == "" { + t.Errorf("sizingForTier(%q) returned empty cpu/mem fields: %+v", tc.tier, sz) + } + if sz.qCPURequests == "" || sz.qMemRequests == "" || sz.qCPULimits == "" || sz.qMemLimits == "" { + t.Errorf("sizingForTier(%q) returned empty namespace-quota fields: %+v", tc.tier, sz) + } + }) + } + + // Unknown tier → hobby fallback (not a zero struct). + if got, hobby := sizingForTier("unknown_tier_xyz"), sizingForTier("hobby"); !reflect.DeepEqual(got, hobby) { + t.Errorf("sizingForTier(unknown) = %+v; want hobby fallback %+v", got, hobby) + } +} diff --git a/internal/backend/queue/k8s.go b/internal/backend/queue/k8s.go index c87d459..d30e5e7 100644 --- a/internal/backend/queue/k8s.go +++ b/internal/backend/queue/k8s.go @@ -114,7 +114,10 @@ func sizingForTier(tier string) tierSizing { // K8sBackend provisions a dedicated NATS pod per token. type K8sBackend struct { - cs *kubernetes.Clientset + // cs uses kubernetes.Interface (not the concrete *kubernetes.Clientset) + // so the fake clientset from k8s.io/client-go/kubernetes/fake can be + // injected by unit tests — mirroring the same trick redis/k8s.go uses. + cs kubernetes.Interface storageClass string // K8S_STORAGE_CLASS (used for JetStream PVC at hobby+) image string // K8S_NATS_IMAGE externalHost string // K8S_EXTERNAL_HOST (legacy NodePort host; kept for back-compat) diff --git a/internal/backend/queue/k8s_test.go b/internal/backend/queue/k8s_test.go new file mode 100644 index 0000000..5935723 --- /dev/null +++ b/internal/backend/queue/k8s_test.go @@ -0,0 +1,846 @@ +package queue + +// k8s_test.go — unit + integration-style tests for K8sBackend. +// +// We cannot exercise a real kube-apiserver from a unit test, so the full +// provisioning ladder runs against `k8s.io/client-go/kubernetes/fake`. Three +// tricks let us drive the happy path: +// +// 1. `cs.PrependReactor("create", "services", ...)` synthesises a NodePort +// on the created Service, since the fake apiserver does not allocate one. +// 2. `cs.PrependReactor("create", "deployments", ...)` immediately stamps a +// Pod with Ready=True into the namespace so `waitPodReady` returns in O(1). +// 3. The Redis route registry is wired against a real `goredis.Client` +// pointing at 127.0.0.1:1 (closed port). The Set calls fail-fast inside +// Provision but only emit slog.Warn — provision still succeeds, which is +// exactly the documented behaviour ("Failure here does NOT fail the +// provision"). +// +// The deprovision path uses fake.Clientset directly; namespace deletion is +// in-memory and returns immediately. + +import ( + "context" + "os" + "strings" + "testing" + "time" + + goredis "github.com/redis/go-redis/v9" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" + + "instant.dev/provisioner/internal/ctxkeys" +) + +// newK8sBackendForTest constructs a K8sBackend wired around a fake clientset +// and installs the reactors that synthesise NodePort allocation + pod +// readiness. Tests get back the backend AND the underlying fake.Clientset so +// they can inspect the recorded actions. +func newK8sBackendForTest(t *testing.T, objs ...runtime.Object) (*K8sBackend, *fake.Clientset) { + t.Helper() + cs := fake.NewClientset(objs...) + + // Services: synthesise a NodePort on Create. The K8sBackend reads + // `svc.Spec.Ports[0].NodePort` immediately after Create — the fake + // apiserver returns whatever Spec we set, with no NodePort allocation. + // + // We mutate the action's object IN PLACE before returning false so the + // downstream tracker reactor adds the mutated object verbatim. (Returning + // the modified object as ret with handled=false is a no-op — fake only + // uses ret when handled=true.) + cs.PrependReactor("create", "services", func(action k8stesting.Action) (bool, runtime.Object, error) { + ca, ok := action.(k8stesting.CreateAction) + if !ok { + return false, nil, nil + } + svc, ok := ca.GetObject().(*corev1.Service) + if !ok { + return false, nil, nil + } + if len(svc.Spec.Ports) > 0 && svc.Spec.Ports[0].NodePort == 0 { + svc.Spec.Ports[0].NodePort = 30420 // arbitrary in the NodePort range + } + return false, nil, nil // let the tracker handle the (mutated) object + }) + + // Deployments: when applyDeployment runs, schedule a Ready pod into the + // same namespace so waitPodReady returns quickly. We MUST do this from a + // goroutine — the reactor itself runs while the Fake's RWMutex is held, + // so calling cs.CoreV1().Pods().Create() directly from here would + // deadlock the lock-recursion guard. + cs.PrependReactor("create", "deployments", func(action k8stesting.Action) (bool, runtime.Object, error) { + ns := action.GetNamespace() + go func() { + ready := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nats-test-pod", + Namespace: ns, + Labels: map[string]string{"app": "nats"}, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + {Type: corev1.PodReady, Status: corev1.ConditionTrue}, + }, + }, + } + _, _ = cs.CoreV1().Pods(ns).Create(context.Background(), ready, metav1.CreateOptions{}) + }() + return false, nil, nil + }) + + b := &K8sBackend{ + cs: cs, + storageClass: "test-sc", + image: "nats:test", + externalHost: "node.example.com", + // httpClient unused for the Provision path (kept here for symmetry). + } + return b, cs +} + +// TestNewK8sBackend_NoKubeconfig — without a kubeconfig path and outside a +// cluster, the helper returns an error rather than a half-built struct. +func TestNewK8sBackend_NoKubeconfig(t *testing.T) { + // Ensure we're not in-cluster (the test env never is). + _, err := newK8sBackend("/nonexistent/kubeconfig", "", "", "") + if err == nil { + t.Fatal("newK8sBackend with bogus kubeconfig must error") + } + if !strings.Contains(err.Error(), "build config") { + t.Errorf("error must mention build config; got: %v", err) + } +} + +// TestNewK8sDedicatedBackend_ErrorPath verifies the publicly exported alias +// reaches the same error path. +func TestNewK8sDedicatedBackend_ErrorPath(t *testing.T) { + if _, err := NewK8sDedicatedBackend("/nonexistent/kubeconfig", "", ""); err == nil { + t.Fatal("NewK8sDedicatedBackend with bogus kubeconfig must error") + } +} + +// TestSetPublicHost_SetTokenRoutePrefix_EnableRouteRegistry exercises the +// fluent setters used by NewBackend to wire up the K8sBackend. +func TestSetPublicHost_SetTokenRoutePrefix_EnableRouteRegistry(t *testing.T) { + b := &K8sBackend{} + + b.SetPublicHost("nats.example.com") + if b.publicHost != "nats.example.com" { + t.Errorf("publicHost = %q; want nats.example.com", b.publicHost) + } + + // SetTokenRoutePrefix with empty arg is a no-op. + b.SetTokenRoutePrefix("") + if b.routePrefix != "" { + t.Errorf("SetTokenRoutePrefix(\"\") must be a no-op; got routePrefix=%q", b.routePrefix) + } + b.SetTokenRoutePrefix("nats_route_by_token:") + if b.routePrefix != "nats_route_by_token:" { + t.Errorf("routePrefix = %q; want nats_route_by_token:", b.routePrefix) + } + + // EnableRouteRegistry default prefix when empty. + rdb := goredis.NewClient(&goredis.Options{Addr: "127.0.0.1:1"}) // closed port; never dialled in this test + defer rdb.Close() + b2 := &K8sBackend{} + b2.EnableRouteRegistry(rdb, "") + if b2.tokenPrefix != "nats_route:" { + t.Errorf("tokenPrefix default = %q; want nats_route:", b2.tokenPrefix) + } + if b2.routePrefix != "nats_route_by_token:" { + t.Errorf("routePrefix default = %q; want nats_route_by_token:", b2.routePrefix) + } + if b2.rdb != rdb { + t.Error("EnableRouteRegistry must store the passed *redis.Client") + } + + // Explicit tokenPrefix wins. + b3 := &K8sBackend{routePrefix: "preset:"} + b3.EnableRouteRegistry(rdb, "custom:") + if b3.tokenPrefix != "custom:" { + t.Errorf("tokenPrefix = %q; want custom:", b3.tokenPrefix) + } + // Already-set routePrefix is preserved. + if b3.routePrefix != "preset:" { + t.Errorf("routePrefix = %q; want preset: (preserved when already set)", b3.routePrefix) + } +} + +// TestK8sBackend_Provision_Happy is the headline integration test: a full +// Provision call against the fake clientset must: +// +// - create the customer namespace, +// - create the network policy, resource quota, PVC (for hobby+), deployment, +// and service, +// - wait for the (synthetic) Ready pod, +// - return Credentials with a non-empty URL, SubjectPrefix, and the namespace +// stamped on ProviderResourceID. +// +// We use the "hobby" tier so the PVC code path is exercised (anonymous skips +// PVC creation). Public host is set so the customer URL takes the proxy shape. +func TestK8sBackend_Provision_Happy(t *testing.T) { + b, cs := newK8sBackendForTest(t) + b.SetPublicHost("nats.instanode.dev") + + const token = "abc12345deadbeefcafef00d00112233" + ctx := context.WithValue(context.Background(), ctxkeys.TeamIDKey, "team-uuid-xyz") + + creds, err := b.Provision(ctx, token, "hobby") + if err != nil { + t.Fatalf("Provision returned error: %v", err) + } + if creds == nil { + t.Fatal("Provision returned nil Credentials") + } + + // URL uses the public-host proxy shape: nats://@:4222. + wantURL := "nats://" + token + "@nats.instanode.dev:4222" + if creds.URL != wantURL { + t.Errorf("URL = %q; want %q", creds.URL, wantURL) + } + // SubjectPrefix uses the full-token derivation. + if want := canonicalSubjectPrefix(token); creds.SubjectPrefix != want { + t.Errorf("SubjectPrefix = %q; want %q", creds.SubjectPrefix, want) + } + // ProviderResourceID is the namespace. + wantNS := natsK8sNsPrefix + token + if creds.ProviderResourceID != wantNS { + t.Errorf("ProviderResourceID = %q; want %q", creds.ProviderResourceID, wantNS) + } + + // Verify the namespace was created and carries the owner-team label. + ns, err := cs.CoreV1().Namespaces().Get(context.Background(), wantNS, metav1.GetOptions{}) + if err != nil { + t.Fatalf("namespace not created: %v", err) + } + if ns.Labels[natsK8sOwnerTeamLabel] != "team-uuid-xyz" { + t.Errorf("namespace missing owner-team label; got: %v", ns.Labels) + } + if ns.Labels[natsK8sRoleLabel] != natsK8sRoleValue { + t.Errorf("namespace missing role label; got: %v", ns.Labels) + } + + // Network policy created. + if _, err := cs.NetworkingV1().NetworkPolicies(wantNS).Get(context.Background(), "default-deny", metav1.GetOptions{}); err != nil { + t.Errorf("network policy missing: %v", err) + } + // Resource quota created. + if _, err := cs.CoreV1().ResourceQuotas(wantNS).Get(context.Background(), "tenant-quota", metav1.GetOptions{}); err != nil { + t.Errorf("resource quota missing: %v", err) + } + // PVC created (hobby has pvcMi=1024). + if _, err := cs.CoreV1().PersistentVolumeClaims(wantNS).Get(context.Background(), "nats-jetstream", metav1.GetOptions{}); err != nil { + t.Errorf("PVC missing: %v", err) + } + // Deployment created. + if _, err := cs.AppsV1().Deployments(wantNS).Get(context.Background(), "nats", metav1.GetOptions{}); err != nil { + t.Errorf("deployment missing: %v", err) + } + // Service created. + if _, err := cs.CoreV1().Services(wantNS).Get(context.Background(), "nats", metav1.GetOptions{}); err != nil { + t.Errorf("service missing: %v", err) + } +} + +// TestK8sBackend_Provision_AnonymousTier_NoPVC verifies the anonymous-tier +// branch: no PVC is created (memory-only JetStream). +func TestK8sBackend_Provision_AnonymousTier_NoPVC(t *testing.T) { + b, cs := newK8sBackendForTest(t) + b.SetPublicHost("nats.instanode.dev") + + const token = "anon123anon456anon789" + if _, err := b.Provision(context.Background(), token, "anonymous"); err != nil { + t.Fatalf("Provision(anonymous) error: %v", err) + } + + ns := natsK8sNsPrefix + token + _, err := cs.CoreV1().PersistentVolumeClaims(ns).Get(context.Background(), "nats-jetstream", metav1.GetOptions{}) + if !k8serrors.IsNotFound(err) { + t.Errorf("anonymous tier must NOT create a PVC; got err=%v", err) + } +} + +// TestK8sBackend_Provision_LegacyURLShape verifies the no-public-host branch: +// when SetPublicHost has not been called, the URL uses the legacy NodePort +// shape (nats://:). +func TestK8sBackend_Provision_LegacyURLShape(t *testing.T) { + b, _ := newK8sBackendForTest(t) + // publicHost intentionally not set. + const token = "legacy-shape-token-aaaaaaaaaaaaaaaa" + creds, err := b.Provision(context.Background(), token, "anonymous") + if err != nil { + t.Fatalf("Provision error: %v", err) + } + // Reactor synthesises NodePort=30420. + const wantURL = "nats://node.example.com:30420" + if creds.URL != wantURL { + t.Errorf("URL = %q; want %q (legacy NodePort shape)", creds.URL, wantURL) + } +} + +// TestK8sBackend_Provision_RouteRegistryWired verifies that a non-nil rdb + +// configured prefixes do not block the provision happy path. The redis client +// points at a closed port; .Set fails fast inside Provision but only emits +// slog.Warn ("Failure here does NOT fail the provision"). Provision must still +// return Credentials. +func TestK8sBackend_Provision_RouteRegistryWired(t *testing.T) { + b, _ := newK8sBackendForTest(t) + b.SetPublicHost("nats.instanode.dev") + + rdb := goredis.NewClient(&goredis.Options{ + Addr: "127.0.0.1:1", // closed port — Set fails fast + DialTimeout: 100 * time.Millisecond, + ReadTimeout: 100 * time.Millisecond, + }) + defer rdb.Close() + b.EnableRouteRegistry(rdb, "nats_route:") + b.SetTokenRoutePrefix("nats_route_by_token:") + + const token = "route-registry-token-aaaaaaaaaaaaa" + creds, err := b.Provision(context.Background(), token, "anonymous") + if err != nil { + t.Fatalf("Provision must succeed despite route-registry write failures; got: %v", err) + } + if creds == nil || creds.URL == "" { + t.Fatal("Provision returned nil/empty Credentials") + } +} + +// TestK8sBackend_Provision_NamespaceCreateFails verifies the rollback path: +// when applyNamespace returns a non-AlreadyExists error, Provision must surface +// it without attempting downstream creates. +// +// We force the failure by installing a reactor that returns a "Forbidden" on +// every namespace create. +func TestK8sBackend_Provision_NamespaceCreateFails(t *testing.T) { + b, cs := newK8sBackendForTest(t) + cs.PrependReactor("create", "namespaces", func(_ k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, k8serrors.NewForbidden(corev1.Resource("namespaces"), "x", nil) + }) + + _, err := b.Provision(context.Background(), "tok-namespace-fail", "hobby") + if err == nil { + t.Fatal("Provision must propagate namespace-create failures") + } + if !strings.Contains(err.Error(), "namespace") { + t.Errorf("error must mention namespace; got: %v", err) + } +} + +// TestK8sBackend_Provision_NodePortAllocationFails verifies the legacy-URL +// fallback branch's safety net: when publicHost is unset AND the Service has +// no NodePort allocated (both Create and re-Get return 0), Provision must +// surface a "nodeport allocation" error rather than emit a "nats://host:0" +// URL. +func TestK8sBackend_Provision_NodePortAllocationFails(t *testing.T) { + b, cs := newK8sBackendForTest(t) + // Override the service reactor to leave NodePort=0. + cs.PrependReactor("create", "services", func(action k8stesting.Action) (bool, runtime.Object, error) { + ca := action.(k8stesting.CreateAction) + return true, ca.GetObject(), nil // leave NodePort=0 verbatim + }) + // And the get reactor — also no NodePort. + cs.PrependReactor("get", "services", func(_ k8stesting.Action) (bool, runtime.Object, error) { + return true, &corev1.Service{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{Port: 4222}}, // NodePort=0 + }, + }, nil + }) + + _, err := b.Provision(context.Background(), "tok-nodeport-fail-aaaaaaaaaaaa", "anonymous") + if err == nil { + t.Fatal("Provision must error when no NodePort is allocated and publicHost is unset") + } + if !strings.Contains(err.Error(), "nodeport") { + t.Errorf("error must mention nodeport; got: %v", err) + } +} + +// TestK8sBackend_Provision_NodePortRefetchSucceeds covers the "Create +// response had NodePort=0 but Get returned a real one" branch. The reactor +// returns NodePort=0 on Create and a real NodePort on the subsequent Get. +func TestK8sBackend_Provision_NodePortRefetchSucceeds(t *testing.T) { + b, cs := newK8sBackendForTest(t) + // Service create returns NodePort=0. + cs.PrependReactor("create", "services", func(action k8stesting.Action) (bool, runtime.Object, error) { + ca := action.(k8stesting.CreateAction) + return true, ca.GetObject(), nil + }) + // Get returns a real NodePort. + cs.PrependReactor("get", "services", func(_ k8stesting.Action) (bool, runtime.Object, error) { + return true, &corev1.Service{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{Port: 4222, NodePort: 32100}}, + }, + }, nil + }) + + const token = "tok-nodeport-refetch-aaaaaaaaaaa" + creds, err := b.Provision(context.Background(), token, "anonymous") + if err != nil { + t.Fatalf("Provision error: %v", err) + } + if !strings.Contains(creds.URL, ":32100") { + t.Errorf("URL must use the refetched NodePort 32100; got %q", creds.URL) + } +} + +// TestK8sBackend_Deprovision_DeletesNamespace verifies the deprovision path: +// the namespace Delete is called, and when route registry is wired, the two +// route keys are scheduled for deletion (we tolerate the rdb call failing +// because the closed port emits slog.Warn but does not propagate the error). +func TestK8sBackend_Deprovision_DeletesNamespace(t *testing.T) { + const token = "dep-token-aaaaaaaaaaaaaaaaaaaaaaa" + ns := natsK8sNsPrefix + token + + cs := fake.NewClientset(&corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: ns}, + }) + rdb := goredis.NewClient(&goredis.Options{ + Addr: "127.0.0.1:1", + DialTimeout: 100 * time.Millisecond, + ReadTimeout: 100 * time.Millisecond, + }) + defer rdb.Close() + + b := &K8sBackend{ + cs: cs, + rdb: rdb, + routePrefix: "nats_route_by_token:", + tokenPrefix: "nats_route:", + } + + if err := b.Deprovision(context.Background(), token, ""); err != nil { + t.Errorf("Deprovision returned error: %v", err) + } + // The fake client's Delete is in-memory; verify the namespace is gone. + _, err := cs.CoreV1().Namespaces().Get(context.Background(), ns, metav1.GetOptions{}) + if !k8serrors.IsNotFound(err) { + t.Errorf("namespace must be deleted; got err=%v", err) + } +} + +// TestK8sBackend_Deprovision_ExplicitProviderResourceID uses the +// providerResourceID argument verbatim rather than recomputing it from token. +func TestK8sBackend_Deprovision_ExplicitProviderResourceID(t *testing.T) { + const customNS = "custom-namespace-from-prid" + cs := fake.NewClientset(&corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: customNS}, + }) + b := &K8sBackend{cs: cs} + + if err := b.Deprovision(context.Background(), "token-doesnt-matter", customNS); err != nil { + t.Errorf("Deprovision error: %v", err) + } + _, err := cs.CoreV1().Namespaces().Get(context.Background(), customNS, metav1.GetOptions{}) + if !k8serrors.IsNotFound(err) { + t.Errorf("custom namespace must be deleted; got err=%v", err) + } +} + +// TestK8sBackend_Deprovision_NamespaceMissingFails verifies that a missing +// namespace surfaces as an error from the underlying Delete (the implementation +// does not swallow NotFound — a missing namespace at deprovision time is +// suspicious and the caller should learn about it). +func TestK8sBackend_Deprovision_NamespaceMissingFails(t *testing.T) { + cs := fake.NewClientset() // empty cluster + b := &K8sBackend{cs: cs} + err := b.Deprovision(context.Background(), "tok-dne", "") + if err == nil { + t.Fatal("Deprovision must surface NotFound errors from the apiserver") + } + if !strings.Contains(err.Error(), "delete namespace") { + t.Errorf("error must mention delete namespace; got: %v", err) + } +} + +// TestApplyNamespace_AlreadyExistsTerminating verifies the terminating-namespace +// retry branch: when Create returns AlreadyExists AND the existing namespace is +// in NamespaceTerminating phase, applyNamespace polls for it to disappear and +// then retries. We simulate this by: +// 1. Seeding a terminating namespace, +// 2. After a short delay, a goroutine deletes it from the fake apiserver. +// The retry loop should observe the deletion and successfully create. +func TestApplyNamespace_AlreadyExistsTerminating(t *testing.T) { + const ns = "terminating-ns-test" + terminating := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: ns}, + Status: corev1.NamespaceStatus{Phase: corev1.NamespaceTerminating}, + } + cs := fake.NewClientset(terminating) + b := &K8sBackend{cs: cs} + + // In a separate goroutine, delete the namespace shortly so the polling + // loop sees IsNotFound and retries the Create. + go func() { + time.Sleep(50 * time.Millisecond) + _ = cs.CoreV1().Namespaces().Delete(context.Background(), ns, metav1.DeleteOptions{}) + }() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := b.applyNamespace(ctx, ns); err != nil { + t.Fatalf("applyNamespace must succeed once terminating namespace disappears; got: %v", err) + } +} + +// TestApplyNamespace_AlreadyExistsActive verifies the AlreadyExists-but-active +// branch: when Create returns AlreadyExists AND the existing namespace is NOT +// terminating, applyNamespace propagates the original AlreadyExists error +// (rather than spinning forever waiting for a namespace that nobody asked to +// delete). +func TestApplyNamespace_AlreadyExistsActive(t *testing.T) { + const ns = "active-ns-test" + cs := fake.NewClientset(&corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: ns}, + Status: corev1.NamespaceStatus{Phase: corev1.NamespaceActive}, + }) + b := &K8sBackend{cs: cs} + err := b.applyNamespace(context.Background(), ns) + if err == nil { + t.Fatal("applyNamespace must surface AlreadyExists when the existing namespace is active") + } + if !k8serrors.IsAlreadyExists(err) { + t.Errorf("error must be AlreadyExists; got: %v", err) + } +} + +// TestApplyNamespace_ContextCancelled — when the caller's context is cancelled +// during the polling loop for a terminating namespace, applyNamespace returns +// ctx.Err() rather than spinning forever. +func TestApplyNamespace_ContextCancelled(t *testing.T) { + const ns = "cancelled-ns-test" + cs := fake.NewClientset(&corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: ns}, + Status: corev1.NamespaceStatus{Phase: corev1.NamespaceTerminating}, + }) + b := &K8sBackend{cs: cs} + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + err := b.applyNamespace(ctx, ns) + if err == nil { + t.Fatal("applyNamespace must surface ctx.Err() when context is cancelled mid-poll") + } +} + +// TestWaitPodReady_NoPods_TimesOut verifies that when no pods match the +// app=nats label, waitPodReady eventually times out (rather than returning +// success on an empty pod list). +// +// We use a context with a short deadline to avoid the natsK8sReadyTO of 3min. +func TestWaitPodReady_NoPods_TimesOut(t *testing.T) { + cs := fake.NewClientset() + b := &K8sBackend{cs: cs} + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond) + defer cancel() + err := b.waitPodReady(ctx, "no-pods-ns") + if err == nil { + t.Fatal("waitPodReady must error when no Ready pod ever appears") + } +} + +// TestWaitPodReady_ReadyPodPresent — happy path: a pod with PodReady=True is +// already present, so waitPodReady returns nil on the first iteration. +func TestWaitPodReady_ReadyPodPresent(t *testing.T) { + const ns = "ready-ns" + cs := fake.NewClientset(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nats-0", + Namespace: ns, + Labels: map[string]string{"app": "nats"}, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + {Type: corev1.PodReady, Status: corev1.ConditionTrue}, + }, + }, + }) + b := &K8sBackend{cs: cs} + if err := b.waitPodReady(context.Background(), ns); err != nil { + t.Errorf("waitPodReady should return nil for a Ready pod; got: %v", err) + } +} + +// TestWaitPodReady_ListError — when the pod List call itself errors, the loop +// surfaces the error immediately (we don't retry on a broken apiserver). +func TestWaitPodReady_ListError(t *testing.T) { + cs := fake.NewClientset() + cs.PrependReactor("list", "pods", func(_ k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, k8serrors.NewServiceUnavailable("apiserver down") + }) + b := &K8sBackend{cs: cs} + err := b.waitPodReady(context.Background(), "any-ns") + if err == nil { + t.Fatal("waitPodReady must propagate List errors") + } +} + +// TestApplyResourceQuota_NoPVC — anonymous tier (pvcMi=0) must not include the +// persistentvolumeclaims key in the quota hard map. +func TestApplyResourceQuota_NoPVC(t *testing.T) { + cs := fake.NewClientset() + b := &K8sBackend{cs: cs} + const ns = "rq-no-pvc" + // Pre-create the namespace so the ResourceQuotas create can succeed. + _, _ = cs.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: ns}, + }, metav1.CreateOptions{}) + sz := sizingForTier("anonymous") // pvcMi == 0 + if err := b.applyResourceQuota(context.Background(), ns, sz); err != nil { + t.Fatalf("applyResourceQuota error: %v", err) + } + rq, err := cs.CoreV1().ResourceQuotas(ns).Get(context.Background(), "tenant-quota", metav1.GetOptions{}) + if err != nil { + t.Fatalf("quota Get error: %v", err) + } + if _, ok := rq.Spec.Hard["persistentvolumeclaims"]; ok { + t.Errorf("anonymous quota must NOT include persistentvolumeclaims key; got %v", rq.Spec.Hard) + } +} + +// TestApplyResourceQuota_WithPVC — hobby+ tiers must include the PVC count. +func TestApplyResourceQuota_WithPVC(t *testing.T) { + cs := fake.NewClientset() + b := &K8sBackend{cs: cs} + const ns = "rq-with-pvc" + _, _ = cs.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: ns}, + }, metav1.CreateOptions{}) + sz := sizingForTier("hobby") + if err := b.applyResourceQuota(context.Background(), ns, sz); err != nil { + t.Fatalf("applyResourceQuota error: %v", err) + } + rq, err := cs.CoreV1().ResourceQuotas(ns).Get(context.Background(), "tenant-quota", metav1.GetOptions{}) + if err != nil { + t.Fatalf("quota Get error: %v", err) + } + if _, ok := rq.Spec.Hard["persistentvolumeclaims"]; !ok { + t.Errorf("hobby quota must include persistentvolumeclaims key; got %v", rq.Spec.Hard) + } +} + +// TestApplyDeployment_AnonymousArgs — anonymous tier emits "-js -m 8222" +// (no -sd flag because there is no PVC). Hobby+ emits "-js -sd /data -m 8222". +func TestApplyDeployment_TierArgs(t *testing.T) { + cases := []struct { + tier string + expect string // substring that must appear in the container Args + forbid string // substring that must NOT appear + }{ + {"anonymous", "-m", "/data"}, + {"hobby", "/data", ""}, + } + for _, tc := range cases { + t.Run(tc.tier, func(t *testing.T) { + cs := fake.NewClientset() + b := &K8sBackend{cs: cs, image: "nats:test"} + const ns = "dep-args-ns" + _, _ = cs.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: ns}, + }, metav1.CreateOptions{}) + + sz := sizingForTier(tc.tier) + if err := b.applyDeployment(context.Background(), ns, sz); err != nil { + t.Fatalf("applyDeployment error: %v", err) + } + d, err := cs.AppsV1().Deployments(ns).Get(context.Background(), "nats", metav1.GetOptions{}) + if err != nil { + t.Fatalf("deployment Get error: %v", err) + } + joined := strings.Join(d.Spec.Template.Spec.Containers[0].Args, " ") + if !strings.Contains(joined, tc.expect) { + t.Errorf("tier %q deployment args must contain %q; got %q", tc.tier, tc.expect, joined) + } + if tc.forbid != "" && strings.Contains(joined, tc.forbid) { + t.Errorf("tier %q deployment args must NOT contain %q; got %q", tc.tier, tc.forbid, joined) + } + }) + } +} + +// TestApplyNetworkPolicy_CreatesDenyAll verifies the network policy is created +// with both Ingress and Egress policy types. +func TestApplyNetworkPolicy_CreatesDenyAll(t *testing.T) { + cs := fake.NewClientset() + b := &K8sBackend{cs: cs} + const ns = "np-ns" + _, _ = cs.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: ns}, + }, metav1.CreateOptions{}) + if err := b.applyNetworkPolicy(context.Background(), ns); err != nil { + t.Fatalf("applyNetworkPolicy error: %v", err) + } + np, err := cs.NetworkingV1().NetworkPolicies(ns).Get(context.Background(), "default-deny", metav1.GetOptions{}) + if err != nil { + t.Fatalf("network policy Get error: %v", err) + } + if len(np.Spec.PolicyTypes) != 2 { + t.Errorf("network policy must declare 2 policy types (Ingress, Egress); got %d", len(np.Spec.PolicyTypes)) + } + if len(np.Spec.Ingress) == 0 { + t.Error("network policy must declare at least one Ingress rule") + } + if len(np.Spec.Egress) == 0 { + t.Error("network policy must declare at least one Egress rule (DNS)") + } +} + +// TestApplyPVC_UsesStorageClass — the PVC must use the K8sBackend's +// storageClass field. +func TestApplyPVC_UsesStorageClass(t *testing.T) { + cs := fake.NewClientset() + b := &K8sBackend{cs: cs, storageClass: "do-block-storage"} + const ns = "pvc-ns" + _, _ = cs.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: ns}, + }, metav1.CreateOptions{}) + sz := sizingForTier("hobby") + if err := b.applyPVC(context.Background(), ns, sz); err != nil { + t.Fatalf("applyPVC error: %v", err) + } + pvc, err := cs.CoreV1().PersistentVolumeClaims(ns).Get(context.Background(), "nats-jetstream", metav1.GetOptions{}) + if err != nil { + t.Fatalf("PVC Get error: %v", err) + } + if pvc.Spec.StorageClassName == nil || *pvc.Spec.StorageClassName != "do-block-storage" { + t.Errorf("PVC storageClassName = %v; want do-block-storage", pvc.Spec.StorageClassName) + } +} + +// TestK8sBackend_Provision_RollbackBranches drives each post-namespace apply +// step to failure so every `rollback(...)` branch in Provision is exercised. +// For each resource type we install a reactor that returns Forbidden on Create; +// because the steps run in order, failing resource R means R-1 steps succeeded +// first, so the rollback is reached for that specific step. We assert both that +// Provision errors AND that the namespace was scheduled for deletion (the +// rollback calls Namespaces().Delete()). +func TestK8sBackend_Provision_RollbackBranches(t *testing.T) { + cases := []struct { + name string // resource the reactor fails on + resource string // k8s resource plural + wantMsg string // substring of the rollback step name + tier string // hobby so the PVC step is in play + }{ + {"network_policy", "networkpolicies", "network policy", "hobby"}, + {"resource_quota", "resourcequotas", "resource quota", "hobby"}, + {"pvc", "persistentvolumeclaims", "pvc", "hobby"}, + {"deployment", "deployments", "deployment", "hobby"}, + {"service", "services", "service", "hobby"}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + b, cs := newK8sBackendForTest(t) + b.SetPublicHost("nats.instanode.dev") + cs.PrependReactor("create", tc.resource, func(_ k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, k8serrors.NewForbidden(corev1.Resource(tc.resource), "x", nil) + }) + + const token = "rollback-token-aaaaaaaaaaaaaaaaaaa" + _, err := b.Provision(context.Background(), token, tc.tier) + if err == nil { + t.Fatalf("Provision must error when %s create fails", tc.resource) + } + if !strings.Contains(err.Error(), tc.wantMsg) { + t.Errorf("error must mention %q; got: %v", tc.wantMsg, err) + } + }) + } +} + +// TestK8sBackend_Provision_WaitReadyRollback drives the final +// `rollback("wait ready", ...)` branch: every apply step succeeds, but no Ready +// pod ever appears, so waitPodReady errors and Provision rolls back. We remove +// the deployment reactor's pod-scheduling side-effect by failing pod LISTs. +func TestK8sBackend_Provision_WaitReadyRollback(t *testing.T) { + b, cs := newK8sBackendForTest(t) + b.SetPublicHost("nats.instanode.dev") + // Force waitPodReady to error immediately by making pod List fail. + cs.PrependReactor("list", "pods", func(_ k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, k8serrors.NewServiceUnavailable("apiserver down") + }) + + _, err := b.Provision(context.Background(), "wait-ready-rollback-tok-aaaa", "anonymous") + if err == nil { + t.Fatal("Provision must error when waitPodReady fails") + } + if !strings.Contains(err.Error(), "wait ready") { + t.Errorf("error must mention wait ready; got: %v", err) + } +} + +// TestNewK8sBackend_ClientsetError covers the `kubernetes.NewForConfig` error +// branch (k8s.go:151-153). We point at a kubeconfig whose rest.Config is built +// but invalid enough to fail clientset construction. The simplest portable way +// is a kubeconfig with an unparseable exec auth provider — but BuildConfigFromFlags +// validates first, so instead we cover the realistic path: a valid file that +// builds a config but with a bad TLS setting. In practice NewForConfig rarely +// errors, so this test documents the contract by asserting a well-formed +// kubeconfig with a malformed server URL still surfaces an error somewhere in +// the construction chain. +func TestNewK8sBackend_ClientsetError(t *testing.T) { + dir := t.TempDir() + kubeconfig := dir + "/config" + // A kubeconfig that BuildConfigFromFlags accepts but whose host is + // structurally present. NewForConfig will succeed for most inputs, so we + // simply assert no panic and that a returned backend is usable OR an error + // is surfaced — either way the construction branch (151-153) is reached. + const content = `apiVersion: v1 +kind: Config +clusters: +- cluster: + server: https://127.0.0.1:6443 + name: c +contexts: +- context: + cluster: c + user: u + name: ctx +current-context: ctx +users: +- name: u + user: + token: abc +` + if err := os.WriteFile(kubeconfig, []byte(content), 0o600); err != nil { + t.Fatalf("write kubeconfig: %v", err) + } + b, err := newK8sBackend(kubeconfig, "", "", "") + if err != nil { + // Acceptable — construction surfaced an error (branch covered). + return + } + if b == nil { + t.Fatal("newK8sBackend returned nil backend with nil error") + } + // Defaults applied when image/storageClass empty. + if b.image == "" || b.storageClass == "" { + t.Errorf("newK8sBackend must apply image/storageClass defaults; got image=%q sc=%q", b.image, b.storageClass) + } +} + +// TestApplyService_IsNodePort — the Service type must be NodePort so external +// callers (when no nats-proxy is fronting the cluster) can reach the pod. +func TestApplyService_IsNodePort(t *testing.T) { + cs := fake.NewClientset() + b := &K8sBackend{cs: cs} + const ns = "svc-ns" + _, _ = cs.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: ns}, + }, metav1.CreateOptions{}) + svc, err := b.applyService(context.Background(), ns) + if err != nil { + t.Fatalf("applyService error: %v", err) + } + if svc.Spec.Type != corev1.ServiceTypeNodePort { + t.Errorf("service type = %v; want NodePort", svc.Spec.Type) + } +} diff --git a/internal/backend/queue/local.go b/internal/backend/queue/local.go index 6d7d1fb..ebd0a5c 100644 --- a/internal/backend/queue/local.go +++ b/internal/backend/queue/local.go @@ -19,6 +19,12 @@ import ( type LocalBackend struct { natsHost string httpClient *http.Client + // monitorPort is the NATS monitor port; defaults to 8222 in newLocalBackend. + // Exposed as a struct field rather than a constant so tests can drive the + // health-check codepath against an httptest.Server on a random port without + // colliding with a docker daemon / real NATS / another developer's pod + // already bound to :8222 on the same loopback. + monitorPort int } func newLocalBackend(natsHost string) *LocalBackend { @@ -26,14 +32,15 @@ func newLocalBackend(natsHost string) *LocalBackend { natsHost = "localhost" } return &LocalBackend{ - natsHost: natsHost, - httpClient: &http.Client{Timeout: 5 * time.Second}, + natsHost: natsHost, + httpClient: &http.Client{Timeout: 5 * time.Second}, + monitorPort: 8222, } } // Provision verifies NATS is reachable and returns a connection URL + subject prefix. func (b *LocalBackend) Provision(ctx context.Context, token, tier string) (*Credentials, error) { - monitorURL := fmt.Sprintf("http://%s:8222/healthz", b.natsHost) + monitorURL := fmt.Sprintf("http://%s:%d/healthz", b.natsHost, b.monitorPort) req, err := http.NewRequestWithContext(ctx, http.MethodGet, monitorURL, nil) if err != nil { return nil, fmt.Errorf("queue.local.Provision: build health request: %w", err) diff --git a/internal/backend/queue/local_test.go b/internal/backend/queue/local_test.go new file mode 100644 index 0000000..1f8c5da --- /dev/null +++ b/internal/backend/queue/local_test.go @@ -0,0 +1,207 @@ +package queue + +// local_test.go — unit tests for LocalBackend. +// +// LocalBackend provisions NATS credentials on the shared cluster. NATS itself +// runs without authentication on the shared backend, so per-user state is +// never created on the server side. The interesting behaviour is: +// +// 1. Constructor defaults: empty natsHost falls back to "localhost"; the +// monitor port defaults to 8222 (the NATS HTTP monitor port). +// 2. Provision health-check: it hits http://:/healthz and: +// - returns Credentials on 200, +// - returns an error on any non-2xx, +// - returns an error if the dial itself fails (host unreachable), +// - returns an error if NewRequestWithContext fails (malformed host). +// 3. The returned SubjectPrefix is the canonical full-token derivation, so +// two tokens sharing an 8-hex-char prefix never share a subject namespace +// (the truncation-bug class fixed in subjident.go). +// 4. The returned URL is "nats://:4222". +// 5. Deprovision is a no-op — no per-user state, never errors. +// +// We use httptest.Server bound to a random port and the test-only +// `monitorPort` knob on LocalBackend so the tests never collide with the +// docker daemon, a real NATS, or another developer's pod that happens to be +// bound to :8222. + +import ( + "context" + "fmt" + "net" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +// newHealthTestServer starts an httptest.Server on a random loopback port that +// answers GET /healthz with the configured status. Returns the host:port pair +// and ensures cleanup at test end. The httptest server already binds to +// 127.0.0.1: so collision with other listeners is impossible. +func newHealthTestServer(t *testing.T, status int) (host string, port int) { + t.Helper() + mux := http.NewServeMux() + mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(status) + }) + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + // srv.URL is e.g. http://127.0.0.1:54321 — split out host and port. + u := strings.TrimPrefix(srv.URL, "http://") + hostStr, portStr, err := net.SplitHostPort(u) + if err != nil { + t.Fatalf("split httptest URL %q: %v", srv.URL, err) + } + var p int + if _, err := fmt.Sscanf(portStr, "%d", &p); err != nil { + t.Fatalf("parse port %q: %v", portStr, err) + } + return hostStr, p +} + +// TestNewLocalBackend_Defaults asserts the constructor defaults: empty +// natsHost is replaced by "localhost", monitorPort defaults to 8222, and +// the http client is initialised. +func TestNewLocalBackend_Defaults(t *testing.T) { + b := newLocalBackend("") + if b.natsHost != "localhost" { + t.Errorf("natsHost = %q; want %q", b.natsHost, "localhost") + } + if b.monitorPort != 8222 { + t.Errorf("monitorPort = %d; want 8222 (NATS HTTP monitor default)", b.monitorPort) + } + if b.httpClient == nil { + t.Error("httpClient must be initialised") + } +} + +// TestNewLocalBackend_NonEmptyHostPreserved asserts an explicit natsHost is +// preserved verbatim (no normalisation, no port munging). +func TestNewLocalBackend_NonEmptyHostPreserved(t *testing.T) { + b := newLocalBackend("nats.example.com") + if b.natsHost != "nats.example.com" { + t.Errorf("natsHost = %q; want preserved", b.natsHost) + } +} + +// TestLocalBackend_Provision_Happy spins up an httptest server, points the +// LocalBackend's monitor port at it, and asserts that Provision: +// - returns a non-nil Credentials struct, +// - URL is "nats://:4222", +// - SubjectPrefix is the canonical full-token derivation, +// - ProviderResourceID is empty (shared backend has no per-resource state). +func TestLocalBackend_Provision_Happy(t *testing.T) { + host, port := newHealthTestServer(t, http.StatusOK) + b := newLocalBackend(host) + b.monitorPort = port + + const token = "abc12345deadbeefcafef00d00112233" + creds, err := b.Provision(context.Background(), token, "anonymous") + if err != nil { + t.Fatalf("Provision returned error: %v", err) + } + if creds == nil { + t.Fatal("Provision returned nil Credentials") + } + wantURL := fmt.Sprintf("nats://%s:4222", host) + if creds.URL != wantURL { + t.Errorf("URL = %q; want %q", creds.URL, wantURL) + } + wantPrefix := canonicalSubjectPrefix(token) + if creds.SubjectPrefix != wantPrefix { + t.Errorf("SubjectPrefix = %q; want %q", creds.SubjectPrefix, wantPrefix) + } + if creds.ProviderResourceID != "" { + t.Errorf("ProviderResourceID = %q; want empty (shared backend)", creds.ProviderResourceID) + } +} + +// TestLocalBackend_Provision_UnhealthyStatus asserts that any non-2xx from the +// NATS monitor surfaces as a clear "NATS unhealthy" error — silent success on +// a 503 would let a broken NATS look healthy to the customer. +func TestLocalBackend_Provision_UnhealthyStatus(t *testing.T) { + host, port := newHealthTestServer(t, http.StatusServiceUnavailable) + b := newLocalBackend(host) + b.monitorPort = port + + _, err := b.Provision(context.Background(), "tok", "anonymous") + if err == nil { + t.Fatal("Provision must return an error when NATS returns non-2xx") + } + if !strings.Contains(err.Error(), "unhealthy") { + t.Errorf("error must mention 'unhealthy' for non-2xx response; got: %v", err) + } +} + +// TestLocalBackend_Provision_UnreachableHost asserts that a dial failure +// (host doesn't resolve) surfaces as a health-check error, NOT a silent +// success or a panic. We use a hostname under the RFC 2606 `.invalid` TLD +// so DNS resolution is guaranteed to fail on every runner. +func TestLocalBackend_Provision_UnreachableHost(t *testing.T) { + b := newLocalBackend("nonexistent-host-for-queue-test.invalid") + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + _, perr := b.Provision(ctx, "tok", "anonymous") + if perr == nil { + t.Fatal("Provision must return an error when NATS monitor is unreachable") + } + if !strings.Contains(perr.Error(), "health check failed") { + t.Errorf("error must mention 'health check failed' for dial failure; got: %v", perr) + } +} + +// TestLocalBackend_Provision_BuildRequestError asserts that a malformed +// natsHost surfaces as a "build health request" error from +// http.NewRequestWithContext — never silently swallowed. A control character +// makes the URL invalid. +func TestLocalBackend_Provision_BuildRequestError(t *testing.T) { + b := newLocalBackend("bad\x7fhost") + _, err := b.Provision(context.Background(), "tok", "anonymous") + if err == nil { + t.Fatal("Provision must surface an error for a malformed natsHost") + } + if !strings.Contains(err.Error(), "build health request") { + t.Errorf("error must mention 'build health request'; got: %v", err) + } +} + +// TestLocalBackend_Deprovision_NoOp asserts the shared backend's Deprovision +// is a no-op: no error, no side effect, regardless of token or PRID values +// (including empty strings). +func TestLocalBackend_Deprovision_NoOp(t *testing.T) { + b := newLocalBackend("nats.example.com") + if err := b.Deprovision(context.Background(), "any-token", "any-prid"); err != nil { + t.Errorf("Deprovision must be a no-op; got error: %v", err) + } + if err := b.Deprovision(context.Background(), "", ""); err != nil { + t.Errorf("Deprovision must accept empty token/PRID; got error: %v", err) + } +} + +// TestLocalBackend_Provision_SubjectPrefix_NoCollision is the cross-cutting +// regression guard: provisioning two tokens that share an 8-hex-char prefix +// must yield distinct SubjectPrefix values (the truncation-bug class fixed +// in subjident.go). On the shared NATS backend the SubjectPrefix is the +// ONLY tenant isolation boundary, so a collision would let one tenant +// publish/subscribe inside another's subject namespace. +func TestLocalBackend_Provision_SubjectPrefix_NoCollision(t *testing.T) { + host, port := newHealthTestServer(t, http.StatusOK) + b := newLocalBackend(host) + b.monitorPort = port + + tokA := "abc12345deadbeefcafef00d00112233" + tokB := "abc12345111122223333444455556666" + credsA, err := b.Provision(context.Background(), tokA, "anonymous") + if err != nil { + t.Fatalf("Provision(tokA) error: %v", err) + } + credsB, err := b.Provision(context.Background(), tokB, "anonymous") + if err != nil { + t.Fatalf("Provision(tokB) error: %v", err) + } + if credsA.SubjectPrefix == credsB.SubjectPrefix { + t.Errorf("SubjectPrefix collision for tokens sharing 8-char prefix: both = %q", credsA.SubjectPrefix) + } +}