From 9fddb9a844380bd197ef9b999cef4c71215e66a5 Mon Sep 17 00:00:00 2001 From: Manas Srivastava Date: Thu, 21 May 2026 23:50:19 +0530 Subject: [PATCH] chore: remove stale api/provisioner + api/worker subdirs (monorepo split cleanup) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit These directories are leftover from the original monorepo before worker and provisioner were split into their own repos. The stale go.mod files inside them were never actually built — but OSV-Scanner indexes them and flags ~15 false-positive CVEs (stdlib 1.25.0 issues, x/net@0.49.0, fiber CVEs that don't apply to the live api binary). Verified safe to delete: - api/provisioner/go.mod last modified 2026-05-12 (obs scaffolding) - api/worker/go.mod same - live provisioner / worker code lives in InstaNode-dev/provisioner and InstaNode-dev/worker repos - no internal package imports from api/provisioner/* or api/worker/* Closes pending task #22 (C1 cleanup api/worker + api/provisioner subdirs). Co-Authored-By: Claude Opus 4.7 (1M context) --- provisioner/go.mod | 35 --- provisioner/go.sum | 61 ---- .../_obs_stubs/buildinfo/buildinfo.go | 34 -- .../internal/_obs_stubs/logctx/logctx.go | 96 ------ provisioner/internal/server/healthz.go | 49 --- provisioner/internal/server/healthz_test.go | 60 ---- provisioner/main.go | 216 ------------- provisioner/main_test.go | 291 ------------------ worker/PR_NOTES.md | 159 ---------- worker/go.mod | 42 --- worker/go.sum | 90 ------ .../_obs_stubs/buildinfo/buildinfo.go | 21 -- worker/internal/_obs_stubs/logctx/logctx.go | 125 -------- worker/internal/jobs/middleware.go | 173 ----------- worker/internal/jobs/middleware_test.go | 195 ------------ worker/internal/obs/nr.go | 83 ----- worker/internal/obs/nr_test.go | 35 --- 17 files changed, 1765 deletions(-) delete mode 100644 provisioner/go.mod delete mode 100644 provisioner/go.sum delete mode 100644 provisioner/internal/_obs_stubs/buildinfo/buildinfo.go delete mode 100644 provisioner/internal/_obs_stubs/logctx/logctx.go delete mode 100644 provisioner/internal/server/healthz.go delete mode 100644 provisioner/internal/server/healthz_test.go delete mode 100644 provisioner/main.go delete mode 100644 provisioner/main_test.go delete mode 100644 worker/PR_NOTES.md delete mode 100644 worker/go.mod delete mode 100644 worker/go.sum delete mode 100644 worker/internal/_obs_stubs/buildinfo/buildinfo.go delete mode 100644 worker/internal/_obs_stubs/logctx/logctx.go delete mode 100644 worker/internal/jobs/middleware.go delete mode 100644 worker/internal/jobs/middleware_test.go delete mode 100644 worker/internal/obs/nr.go delete mode 100644 worker/internal/obs/nr_test.go diff --git a/provisioner/go.mod b/provisioner/go.mod deleted file mode 100644 index 06ceea5..0000000 --- a/provisioner/go.mod +++ /dev/null @@ -1,35 +0,0 @@ -// Module instant.dev/provisioner — observability scaffolding for the -// instant.dev/provisioner gRPC service. -// -// This is a self-contained module (NOT joined with the parent api module) -// so that: -// -// 1. The api repo's `go build ./...` continues to be a pure api build — -// adding the provisioner subdir doesn't pull NR deps into the api binary. -// 2. The Go files here can be copied verbatim into the real provisioner -// repo (github.com/InstaNode-dev/provisioner) which already uses the -// module name `instant.dev/provisioner` — see that repo's go.mod. -// -// When the real provisioner adopts these files, this scaffolding go.mod is -// deleted and the imports resolve against the real provisioner's go.mod. -module instant.dev/provisioner - -go 1.25.0 - -require ( - github.com/newrelic/go-agent/v3 v3.43.3 - github.com/newrelic/go-agent/v3/integrations/nrgrpc v1.4.9 - google.golang.org/grpc v1.80.0 -) - -require ( - github.com/golang/protobuf v1.5.4 // indirect - github.com/newrelic/csec-go-agent v1.6.0 // indirect - golang.org/x/arch v0.27.0 // indirect - golang.org/x/net v0.49.0 // indirect - golang.org/x/sys v0.40.0 // indirect - golang.org/x/text v0.33.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 // indirect - google.golang.org/protobuf v1.36.11 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect -) diff --git a/provisioner/go.sum b/provisioner/go.sum deleted file mode 100644 index bee9c6b..0000000 --- a/provisioner/go.sum +++ /dev/null @@ -1,61 +0,0 @@ -github.com/adhocore/gronx v1.19.1 h1:S4c3uVp5jPjnk00De0lslyTenGJ4nA3Ydbkj1SbdPVc= -github.com/adhocore/gronx v1.19.1/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg= -github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= -github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/dlclark/regexp2 v1.9.0 h1:pTK/l/3qYIKaRXuHnEnIf7Y5NxfRPfpb7dis6/gdlVI= -github.com/dlclark/regexp2 v1.9.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= -github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= -github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= -github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= -github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= -github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= -github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= -github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= -github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= -github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/k2io/hookingo v1.0.6 h1:HBSKd1tNbW5BCj8VLNqemyBKjrQ8g0HkXcbC/DEHODE= -github.com/k2io/hookingo v1.0.6/go.mod h1:2L1jdNjdB3NkbzSVv9Q5fq7SJhRkWyAhe65XsAp5iXk= -github.com/newrelic/csec-go-agent v1.6.0 h1:OCShRZgiE+kg37jk+QXHw9e9EQ9BvLOeQTk+ovJhnrE= -github.com/newrelic/csec-go-agent v1.6.0/go.mod h1:LiLGm6a+q+hkmTnrxrYw1ToToirThOHydjrrLMtci5M= -github.com/newrelic/go-agent/v3 v3.43.3 h1:0A6DkUBYK2bidV6jJDJ1SD2XkRlg976nl+SiEqkGTUQ= -github.com/newrelic/go-agent/v3 v3.43.3/go.mod h1:MFXnCId5xXMIJI6A/kbkg0DO48EVTsKcmNijMYphzTg= -github.com/newrelic/go-agent/v3/integrations/nrgrpc v1.4.9 h1:mkoYqqEjFTNjJURsX+08iwuXTmsW7eFT+L0+hBuvAzw= -github.com/newrelic/go-agent/v3/integrations/nrgrpc v1.4.9/go.mod h1:KkYfN06JZLI/H6l7w2+TJ5ILKF5NCXN5iysLsKkzMiI= -github.com/newrelic/go-agent/v3/integrations/nrsecurityagent v1.1.0 h1:gqkTDYUHWUyiG+u0PJQCRh98rcHLxP/w7GtIbJDVULY= -github.com/newrelic/go-agent/v3/integrations/nrsecurityagent v1.1.0/go.mod h1:3wugGvRmOVYov/08y+D8tB1uYIZds5bweVdr5vo4Gbs= -go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= -go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= -go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= -go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= -go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0= -go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= -go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= -go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= -go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= -go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= -go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= -go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -golang.org/x/arch v0.27.0 h1:0WNVcR8u9yFz8j5FvdHpgwNp3FS5U4guYdzHwEiGjoU= -golang.org/x/arch v0.27.0/go.mod h1:0X+GdSIP+kL5wPmpK7sdkEVTt2XoYP0cSjQSbZBwOi8= -golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= -golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= -golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= -golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= -golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= -golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= -golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= -gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= -gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 h1:sNrWoksmOyF5bvJUcnmbeAmQi8baNhqg5IWaI3llQqU= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= -google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM= -google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4= -google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= -google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= -gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/provisioner/internal/_obs_stubs/buildinfo/buildinfo.go b/provisioner/internal/_obs_stubs/buildinfo/buildinfo.go deleted file mode 100644 index 9aab103..0000000 --- a/provisioner/internal/_obs_stubs/buildinfo/buildinfo.go +++ /dev/null @@ -1,34 +0,0 @@ -// Package buildinfo exposes compile-time build metadata. -// -// STUB: this is a temporary, vendored copy of what will become -// instant.dev/common/buildinfo once track 1 of the observability rollout -// merges. After that PR lands, callers in this service should switch their -// imports from -// -// "instant.dev/provisioner/internal/_obs_stubs/buildinfo" -// -// to -// -// "instant.dev/common/buildinfo" -// -// and this directory should be deleted in a follow-up cleanup PR. -// -// The variables are populated at link time via -ldflags. See the Dockerfile -// change shipped in track 1 for the exact command line. When the service is -// built without ldflags (e.g. `go build ./...` during local dev), the values -// fall back to "dev" / "unknown" so the program never panics. -package buildinfo - -var ( - // GitSHA is the 7+ char git commit hash this binary was built from. - // Set via: -ldflags "-X .../buildinfo.GitSHA=$GIT_SHA" - GitSHA = "dev" - - // BuildTime is the UTC RFC3339 timestamp of the build. - // Set via: -ldflags "-X .../buildinfo.BuildTime=$BUILD_TIME" - BuildTime = "unknown" - - // Version is the semver tag of the release, or "dev" for untagged builds. - // Set via: -ldflags "-X .../buildinfo.Version=$VERSION" - Version = "dev" -) diff --git a/provisioner/internal/_obs_stubs/logctx/logctx.go b/provisioner/internal/_obs_stubs/logctx/logctx.go deleted file mode 100644 index c6474d6..0000000 --- a/provisioner/internal/_obs_stubs/logctx/logctx.go +++ /dev/null @@ -1,96 +0,0 @@ -// Package logctx provides a context-aware slog.Handler wrapper that injects -// observability fields (service, commit_id, trace_id, team_id, tid) into every -// log record automatically, plus typed context setters/getters for those -// fields. -// -// STUB: this is a minimal vendored copy of what will become -// instant.dev/common/logctx once track 2 of the observability rollout merges. -// After that PR lands, callers should switch their imports to -// instant.dev/common/logctx and this directory should be deleted. -// -// Scope of this stub: only the surface area the provisioner service actually -// uses — NewHandler, WithTraceID, TraceID. The full common/logctx package -// will also expose WithTeamID, WithRequestID, WithTID, etc. — those are not -// needed here yet because the provisioner has no team/auth context. -package logctx - -import ( - "context" - "log/slog" - - "instant.dev/provisioner/internal/_obs_stubs/buildinfo" -) - -// ctxKey is a private, comparable type for context keys so we never collide -// with other packages that stash values on the same ctx. -type ctxKey int - -const ( - keyTraceID ctxKey = iota -) - -// WithTraceID returns a child context with the given W3C trace ID attached. -// Empty traceID is a no-op — the parent context is returned unchanged so -// callers can pipe through values they extracted from gRPC metadata without -// branching on emptiness. -func WithTraceID(ctx context.Context, traceID string) context.Context { - if traceID == "" { - return ctx - } - return context.WithValue(ctx, keyTraceID, traceID) -} - -// TraceID extracts a previously-set trace ID, returning "" when absent. -// Never panics — safe to call on background or unrelated contexts. -func TraceID(ctx context.Context) string { - if ctx == nil { - return "" - } - v, _ := ctx.Value(keyTraceID).(string) - return v -} - -// handler wraps an underlying slog.Handler and stamps every Record with -// service, commit_id, build_time, version, and ctx-derived trace_id. -type handler struct { - inner slog.Handler - service string -} - -// NewHandler returns a slog.Handler that decorates `inner` with mandatory -// observability fields. The returned handler is safe for concurrent use. -// -// Typical wiring in a service's main(): -// -// slog.SetDefault(slog.New(logctx.NewHandler( -// "provisioner", -// slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{AddSource: true}), -// ))) -func NewHandler(service string, inner slog.Handler) slog.Handler { - return &handler{inner: inner, service: service} -} - -func (h *handler) Enabled(ctx context.Context, level slog.Level) bool { - return h.inner.Enabled(ctx, level) -} - -func (h *handler) Handle(ctx context.Context, r slog.Record) error { - r.AddAttrs( - slog.String("service", h.service), - slog.String("commit_id", buildinfo.GitSHA), - slog.String("build_time", buildinfo.BuildTime), - slog.String("version", buildinfo.Version), - ) - if tid := TraceID(ctx); tid != "" { - r.AddAttrs(slog.String("trace_id", tid)) - } - return h.inner.Handle(ctx, r) -} - -func (h *handler) WithAttrs(attrs []slog.Attr) slog.Handler { - return &handler{inner: h.inner.WithAttrs(attrs), service: h.service} -} - -func (h *handler) WithGroup(name string) slog.Handler { - return &handler{inner: h.inner.WithGroup(name), service: h.service} -} diff --git a/provisioner/internal/server/healthz.go b/provisioner/internal/server/healthz.go deleted file mode 100644 index d54b4a2..0000000 --- a/provisioner/internal/server/healthz.go +++ /dev/null @@ -1,49 +0,0 @@ -// Package server hosts the gRPC service implementation and, as of the -// observability rollout (2026-05-12), a tiny sidecar HTTP handler exposing -// /healthz so the platform can curl the running pod's commit_id without -// going through the gRPC surface. -// -// The provisioner is otherwise gRPC-only on port 50051. We bind the HTTP -// sidecar to a different port (default 8092, see plan doc) — verified in -// HealthzPort_NoCollisionWithGRPC test below. -package server - -import ( - "encoding/json" - "net/http" - - "instant.dev/provisioner/internal/_obs_stubs/buildinfo" -) - -// HealthzResponse is the JSON body returned by GET /healthz. -// -// Field order matches what the api and worker services return so dashboards -// and curl pipelines can use a single jq filter across all three. -type HealthzResponse struct { - OK bool `json:"ok"` - Service string `json:"service"` - CommitID string `json:"commit_id"` - BuildTime string `json:"build_time"` - Version string `json:"version"` -} - -// HealthzHandler returns an http.Handler that responds to any method (the -// k8s liveness probe will use GET; humans use curl) with the build metadata -// JSON. Never errors — used as a liveness probe so it must be cheap and -// dependency-free. -func HealthzHandler() http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - resp := HealthzResponse{ - OK: true, - Service: "instant-provisioner", - CommitID: buildinfo.GitSHA, - BuildTime: buildinfo.BuildTime, - Version: buildinfo.Version, - } - w.Header().Set("Content-Type", "application/json") - // json.NewEncoder.Encode never errors on a value of fixed shape with - // no unmarshalable types — and we'd be unable to write an error - // response anyway if the connection were broken. Discard. - _ = json.NewEncoder(w).Encode(resp) - }) -} diff --git a/provisioner/internal/server/healthz_test.go b/provisioner/internal/server/healthz_test.go deleted file mode 100644 index 1e0cfb7..0000000 --- a/provisioner/internal/server/healthz_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package server - -import ( - "encoding/json" - "net/http" - "net/http/httptest" - "testing" -) - -// TestHealthzHandler_ResponseShape pins the JSON contract since dashboards -// and alert rules consume this body shape. -func TestHealthzHandler_ResponseShape(t *testing.T) { - rec := httptest.NewRecorder() - req := httptest.NewRequest(http.MethodGet, "/healthz", nil) - HealthzHandler().ServeHTTP(rec, req) - - if rec.Code != http.StatusOK { - t.Fatalf("status = %d, want 200", rec.Code) - } - - var raw map[string]any - if err := json.NewDecoder(rec.Body).Decode(&raw); err != nil { - t.Fatalf("decode: %v", err) - } - - for _, key := range []string{"ok", "service", "commit_id", "build_time", "version"} { - if _, ok := raw[key]; !ok { - t.Errorf("response missing key %q — keys present: %v", key, mapKeys(raw)) - } - } - - if raw["service"] != "instant-provisioner" { - t.Errorf("service = %v, want instant-provisioner", raw["service"]) - } - if raw["ok"] != true { - t.Errorf("ok = %v, want true", raw["ok"]) - } -} - -// TestHealthzHandler_AcceptsAnyMethod confirms HEAD / POST don't 405. The k8s -// liveness probe sends GET but having the endpoint be method-agnostic makes -// it easier to curl from a shell during incidents. -func TestHealthzHandler_AcceptsAnyMethod(t *testing.T) { - for _, m := range []string{http.MethodGet, http.MethodHead, http.MethodPost} { - rec := httptest.NewRecorder() - req := httptest.NewRequest(m, "/healthz", nil) - HealthzHandler().ServeHTTP(rec, req) - if rec.Code != http.StatusOK { - t.Errorf("method %s: status = %d, want 200", m, rec.Code) - } - } -} - -func mapKeys(m map[string]any) []string { - out := make([]string, 0, len(m)) - for k := range m { - out = append(out, k) - } - return out -} diff --git a/provisioner/main.go b/provisioner/main.go deleted file mode 100644 index 3b769f4..0000000 --- a/provisioner/main.go +++ /dev/null @@ -1,216 +0,0 @@ -// Command provisioner-obs-scaffold is a reference wiring of observability -// for the instant.dev/provisioner gRPC service (track 5 of 8 in the 2026-05-12 -// observability rollout — see OBSERVABILITY-PLAN-2026-05-12.md at the repo -// root). -// -// SCOPE NOTE. The real provisioner service lives in a sibling repo -// (github.com/InstaNode-dev/provisioner) and that repo's main.go is the one -// that actually runs in k8s. This file is a faithful, drop-in-shaped -// reference that demonstrates exactly how slog, the New Relic Go agent, -// the nrgrpc UnaryServerInterceptor, and the HTTP sidecar fit together — -// so the same five-line diff can be applied to the real provisioner's -// main.go once this PR is reviewed. -// -// Why scaffold here. The observability rollout dispatched eight parallel -// agents, each given a per-track worktree of the api repo. The track-5 -// brief listed file paths under a `provisioner/` prefix that assumed a -// monorepo layout. The api repo isn't a monorepo — provisioner is its own -// repo. Rather than touch the real provisioner repo from a worktree -// configured for api (which would violate filesystem isolation between -// parallel agents), this PR stages the changes under a clearly-marked -// `provisioner/` subdir for review. The follow-up is a copy of these four -// files into the real provisioner repo. -// -// What this binary does when run. It is a minimal stand-in: it boots -// observability and starts the HTTP sidecar on :8092, then blocks on a -// signal. It does NOT serve gRPC — that lives in the real repo. Running -// it locally is useful only to verify the /healthz JSON shape. -package main - -import ( - "context" - "errors" - "log/slog" - "net/http" - "os" - "os/signal" - "syscall" - "time" - - "github.com/newrelic/go-agent/v3/integrations/nrgrpc" - "github.com/newrelic/go-agent/v3/newrelic" - "google.golang.org/grpc" - - "instant.dev/provisioner/internal/_obs_stubs/logctx" - "instant.dev/provisioner/internal/server" -) - -// healthzAddr is the listen address for the HTTP sidecar. Port 8092 was -// chosen by the rollout plan because it doesn't collide with the gRPC port -// (50051), the api Fiber port (8080), worker (no fixed port), Prometheus -// scrapers in our cluster (9090, 9091, 9100), or any of the data-namespace -// services. See TestHealthzPortNoCollisionWithGRPC for the assertion. -const healthzAddr = ":8092" - -// initNewRelic boots the New Relic Go agent. It is fail-open: an empty -// license key (the common case in dev) or any initialization error logs a -// warning and returns nil. Callers must handle a nil *newrelic.Application -// — the nrgrpc interceptor does so safely. -func initNewRelic() *newrelic.Application { - licenseKey := os.Getenv("NEW_RELIC_LICENSE_KEY") - if licenseKey == "" { - slog.Warn("newrelic.disabled — NEW_RELIC_LICENSE_KEY not set") - return nil - } - - appName := os.Getenv("NEW_RELIC_APP_NAME") - if appName == "" { - appName = "instant-provisioner" - } - - app, err := newrelic.NewApplication( - newrelic.ConfigAppName(appName), - newrelic.ConfigLicense(licenseKey), - newrelic.ConfigDistributedTracerEnabled(true), - newrelic.ConfigAppLogForwardingEnabled(true), - ) - if err != nil { - // Fail-open: log and continue. A provisioning outage because the NR - // agent couldn't dial home would be a wildly disproportionate failure - // mode for an observability dependency. - slog.Warn("newrelic.init_failed", "error", err) - return nil - } - return app -} - -// newGRPCServer constructs a grpc.Server with the NR unary interceptor -// registered. The interceptor: -// -// 1. Reads incoming W3C TraceContext from gRPC metadata (the api side -// already propagates this via otelgrpc.NewClientHandler — see -// internal/provisioner/client.go in the api repo for the matching -// side). NR's nrgrpc.UnaryServerInterceptor automatically picks it up -// and opens a distributed-trace child span. -// -// 2. Pulls the trace ID out of the incoming span and stashes it on ctx -// via logctx.WithTraceID so any downstream slog calls in the gRPC -// handler log lines carry the propagated trace_id field. -// -// The wrapping interceptor below chains around nrgrpc's so that step 2 -// runs *after* nrgrpc has populated the NR transaction in ctx. -func newGRPCServer(nrApp *newrelic.Application) *grpc.Server { - return grpc.NewServer(grpc.UnaryInterceptor( - composeTraceIDInjector(nrgrpc.UnaryServerInterceptor(nrApp)), - )) -} - -// composeTraceIDInjector wraps an inner interceptor (typically -// nrgrpc.UnaryServerInterceptor) so that after the inner one has opened the -// NR transaction on ctx, we stamp the trace ID onto ctx via logctx for -// downstream slog calls. Extracted to package-private function so tests can -// invoke it without standing up a real gRPC server. -func composeTraceIDInjector(inner grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor { - return func( - ctx context.Context, - req any, - info *grpc.UnaryServerInfo, - handler grpc.UnaryHandler, - ) (any, error) { - wrapped := func(nrCtx context.Context, nrReq any) (any, error) { - return handler(stampTraceIDFromNR(nrCtx), nrReq) - } - return inner(ctx, req, info, wrapped) - } -} - -// stampTraceIDFromNR looks up the NR transaction on ctx (placed there by -// nrgrpc.UnaryServerInterceptor) and, if present, copies its trace ID onto -// ctx via logctx.WithTraceID. Safe to call when no NR transaction is on -// ctx — returns ctx unchanged. -// -// Split out of composeTraceIDInjector to be unit-testable: a test can -// pre-populate ctx with newrelic.NewContext(ctx, txn) and assert the -// trace_id ends up on the returned ctx. Tests against the *bare* function -// (without spinning up a gRPC server) keep CI fast. -func stampTraceIDFromNR(ctx context.Context) context.Context { - txn := newrelic.FromContext(ctx) - if txn == nil { - return ctx - } - md := txn.GetTraceMetadata() - if md.TraceID == "" { - return ctx - } - return logctx.WithTraceID(ctx, md.TraceID) -} - -// startHealthzSidecar starts the HTTP server on healthzAddr in a goroutine. -// Returns the *http.Server so the caller can shut it down cleanly. The -// listener errors are logged but never crash the process — losing /healthz -// should not take down provisioning. -func startHealthzSidecar() *http.Server { - mux := http.NewServeMux() - mux.Handle("/healthz", server.HealthzHandler()) - - srv := &http.Server{ - Addr: healthzAddr, - Handler: mux, - ReadHeaderTimeout: 5 * time.Second, - } - - go func() { - slog.Info("healthz.listening", "addr", healthzAddr) - if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - slog.Warn("healthz.serve_failed", "error", err) - } - }() - - return srv -} - -func main() { - // First action: install the obs-enriching slog handler as the default - // so every log line from boot onward carries service/commit_id/build_time. - // The real provisioner main.go has NO slog default set today — this is - // the inconsistency the plan flagged. - slog.SetDefault(slog.New(logctx.NewHandler( - "provisioner", - slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ - AddSource: true, - Level: slog.LevelInfo, - }), - ))) - - nrApp := initNewRelic() - defer func() { - if nrApp != nil { - nrApp.Shutdown(10 * time.Second) - } - }() - - // Construct the gRPC server with NR + trace-id-injection interceptors. - // In the real provisioner the result is passed to - // provisionerv1.RegisterProvisionerServiceServer and Serve(); here we - // just demonstrate construction. - grpcSrv := newGRPCServer(nrApp) - _ = grpcSrv // referenced by tests; not Serve()d in this scaffold - - healthzSrv := startHealthzSidecar() - - slog.Info("provisioner.scaffold_ready", - "grpc_port_intended", "50051", - "healthz_port", healthzAddr, - ) - - // Block until SIGINT/SIGTERM. - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) - <-sigCh - - shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := healthzSrv.Shutdown(shutdownCtx); err != nil { - slog.Warn("healthz.shutdown_error", "error", err) - } -} diff --git a/provisioner/main_test.go b/provisioner/main_test.go deleted file mode 100644 index a13bd00..0000000 --- a/provisioner/main_test.go +++ /dev/null @@ -1,291 +0,0 @@ -// Tests for the observability scaffolding. Each test corresponds to one of -// the four assertions called out in the track-5 brief. -package main - -import ( - "context" - "encoding/json" - "errors" - "net/http" - "net/http/httptest" - "os" - "strings" - "testing" - - "github.com/newrelic/go-agent/v3/newrelic" - "google.golang.org/grpc" - - "instant.dev/provisioner/internal/_obs_stubs/logctx" - "instant.dev/provisioner/internal/server" -) - -// TestHealthzReturnsCommitID verifies the /healthz endpoint returns a -// well-formed JSON body containing commit_id. Uses httptest so we don't -// need to bind a real port. -func TestHealthzReturnsCommitID(t *testing.T) { - rec := httptest.NewRecorder() - req := httptest.NewRequest(http.MethodGet, "/healthz", nil) - - server.HealthzHandler().ServeHTTP(rec, req) - - if rec.Code != http.StatusOK { - t.Fatalf("status = %d, want 200", rec.Code) - } - if got := rec.Header().Get("Content-Type"); got != "application/json" { - t.Errorf("Content-Type = %q, want application/json", got) - } - - var body server.HealthzResponse - if err := json.NewDecoder(rec.Body).Decode(&body); err != nil { - t.Fatalf("decode: %v", err) - } - if !body.OK { - t.Errorf("ok = false, want true") - } - if body.Service != "instant-provisioner" { - t.Errorf("service = %q, want instant-provisioner", body.Service) - } - if body.CommitID == "" { - t.Errorf("commit_id is empty — buildinfo.GitSHA must always have a value (default 'dev')") - } - if body.BuildTime == "" { - t.Errorf("build_time is empty") - } - if body.Version == "" { - t.Errorf("version is empty") - } -} - -// TestHealthzPortNoCollisionWithGRPC asserts the chosen sidecar port is not -// the same as the gRPC port. Cheap, but it catches a config typo that would -// otherwise show up as "address already in use" at pod boot. -func TestHealthzPortNoCollisionWithGRPC(t *testing.T) { - const grpcPort = ":50051" - if healthzAddr == grpcPort { - t.Fatalf("healthzAddr %q must not equal gRPC port %q", healthzAddr, grpcPort) - } - // Also sanity-check we have a port at all and it parses. - if !strings.HasPrefix(healthzAddr, ":") { - t.Fatalf("healthzAddr %q should start with ':'", healthzAddr) - } -} - -// TestInitNewRelicFailOpenOnEmptyKey verifies the agent init returns nil -// (not panic) when the license key env var is unset — which is the dev -// default. The real concern is "does the provisioner crash if NR is down" -// and the answer must be no. -func TestInitNewRelicFailOpenOnEmptyKey(t *testing.T) { - t.Setenv("NEW_RELIC_LICENSE_KEY", "") - app := initNewRelic() - if app != nil { - t.Errorf("initNewRelic() = non-nil with empty key, want nil") - } -} - -// TestInitNewRelicFailOpenOnInvalidKey verifies that a malformed license -// key (e.g. someone pasted in a short string) also returns nil without -// panicking. NR's validator rejects keys < 40 chars. -func TestInitNewRelicFailOpenOnInvalidKey(t *testing.T) { - t.Setenv("NEW_RELIC_LICENSE_KEY", "obviously-not-a-real-key") - app := initNewRelic() - if app != nil { - t.Errorf("initNewRelic() = non-nil with bogus key, want nil — agent should fail-open") - } -} - -// newTestNRApp constructs a real *newrelic.Application with -// ConfigEnabled(false) so it produces real trace metadata but performs no -// network I/O. Returns nil if construction fails — caller decides whether -// to t.Skip or fail. -func newTestNRApp(t *testing.T) *newrelic.Application { - t.Helper() - app, err := newrelic.NewApplication( - newrelic.ConfigAppName("provisioner-test"), - // 40-char dummy license; NR's validator only checks length when - // enabled. With ConfigEnabled(false) it's never sent anywhere. - newrelic.ConfigLicense("0123456789012345678901234567890123456789"), - newrelic.ConfigEnabled(false), - newrelic.ConfigDistributedTracerEnabled(true), - ) - if err != nil { - t.Fatalf("newrelic.NewApplication: %v", err) - } - return app -} - -// TestStampTraceIDFromNR is the load-bearing assertion of the track-5 -// rollout: when an NR transaction is present on ctx, stampTraceIDFromNR -// must copy its trace_id onto ctx via logctx so downstream slog calls log -// with the propagated trace ID. -func TestStampTraceIDFromNR(t *testing.T) { - app := newTestNRApp(t) - txn := app.StartTransaction("test/Provision") - defer txn.End() - - md := txn.GetTraceMetadata() - if md.TraceID == "" { - t.Skip("NR test app did not produce a trace ID — disabled-mode behavior changed; revisit") - } - - ctx := newrelic.NewContext(context.Background(), txn) - out := stampTraceIDFromNR(ctx) - - if got := logctx.TraceID(out); got != md.TraceID { - t.Errorf("stampTraceIDFromNR did not propagate trace_id: got %q, want %q", got, md.TraceID) - } -} - -// TestStampTraceIDFromNR_NoTxn confirms the function is a safe no-op when -// the input ctx has no NR transaction. -func TestStampTraceIDFromNR_NoTxn(t *testing.T) { - out := stampTraceIDFromNR(context.Background()) - if got := logctx.TraceID(out); got != "" { - t.Errorf("stampTraceIDFromNR with no txn stamped %q; want empty", got) - } -} - -// TestComposeTraceIDInjectorRunsInner verifies the composed interceptor -// actually calls the inner one (e.g. nrgrpc) and the handler. We use a -// synthetic "inner" that just delegates to the handler so we can confirm -// the wiring without bringing up real NR machinery. -func TestComposeTraceIDInjectorRunsInner(t *testing.T) { - var innerCalls, handlerCalls int - - inner := func( - ctx context.Context, - req any, - info *grpc.UnaryServerInfo, - handler grpc.UnaryHandler, - ) (any, error) { - innerCalls++ - return handler(ctx, req) - } - - composed := composeTraceIDInjector(inner) - - handler := func(ctx context.Context, _ any) (any, error) { - handlerCalls++ - // No NR txn → trace_id stays empty. - if got := logctx.TraceID(ctx); got != "" { - t.Errorf("trace_id = %q before NR ctx, want empty", got) - } - return "ok", nil - } - - info := &grpc.UnaryServerInfo{FullMethod: "/test/Method"} - resp, err := composed(context.Background(), "req", info, handler) - if err != nil { - t.Fatalf("composed interceptor err: %v", err) - } - if resp != "ok" { - t.Errorf("resp = %v, want ok", resp) - } - if innerCalls != 1 { - t.Errorf("inner was called %d times, want 1", innerCalls) - } - if handlerCalls != 1 { - t.Errorf("handler was called %d times, want 1", handlerCalls) - } -} - -// TestComposeTraceIDInjectorPropagatesNRTraceID closes the loop end-to-end: -// build a composed interceptor with an inner that simulates nrgrpc by -// stuffing a real NR txn into ctx, then assert the handler sees a populated -// trace_id via logctx. -func TestComposeTraceIDInjectorPropagatesNRTraceID(t *testing.T) { - app := newTestNRApp(t) - txn := app.StartTransaction("test/Provision") - defer txn.End() - - expected := txn.GetTraceMetadata().TraceID - if expected == "" { - t.Skip("NR test app did not produce a trace ID") - } - - // Synthetic "inner" interceptor — pretends to be nrgrpc by injecting - // the txn into ctx before calling the (wrapped) handler. - inner := func( - ctx context.Context, - req any, - info *grpc.UnaryServerInfo, - handler grpc.UnaryHandler, - ) (any, error) { - return handler(newrelic.NewContext(ctx, txn), req) - } - composed := composeTraceIDInjector(inner) - - var captured context.Context - handler := func(ctx context.Context, _ any) (any, error) { - captured = ctx - return "ok", nil - } - - info := &grpc.UnaryServerInfo{FullMethod: "/test/Method"} - if _, err := composed(context.Background(), "req", info, handler); err != nil { - t.Fatalf("composed interceptor err: %v", err) - } - if captured == nil { - t.Fatal("handler did not run") - } - if got := logctx.TraceID(captured); got != expected { - t.Errorf("trace_id propagated to handler ctx = %q, want %q", got, expected) - } -} - -// TestNewGRPCServerWithNilNRApp confirms the server constructor handles the -// fail-open (nil app) path without panicking. -func TestNewGRPCServerWithNilNRApp(t *testing.T) { - srv := newGRPCServer(nil) - if srv == nil { - t.Fatal("newGRPCServer(nil) returned nil") - } -} - -// TestLogctxWithTraceIDRoundTrip covers the stub logctx package end-to-end -// to defend against a future cleanup pass accidentally breaking the -// trace-id round-trip when the stubs are removed in favor of common/logctx. -func TestLogctxWithTraceIDRoundTrip(t *testing.T) { - ctx := context.Background() - if got := logctx.TraceID(ctx); got != "" { - t.Errorf("fresh ctx TraceID = %q, want empty", got) - } - - ctx2 := logctx.WithTraceID(ctx, "abc123") - if got := logctx.TraceID(ctx2); got != "abc123" { - t.Errorf("after WithTraceID, TraceID = %q, want abc123", got) - } - - // Empty trace id is a no-op — verify the parent ctx is returned - // unchanged so a meaningful trace ID upstream isn't accidentally - // overwritten with "". - ctx3 := logctx.WithTraceID(ctx2, "") - if got := logctx.TraceID(ctx3); got != "abc123" { - t.Errorf("WithTraceID(\"\") wiped trace_id — got %q, want abc123 retained", got) - } -} - -// TestEnvAppNameOverride confirms NEW_RELIC_APP_NAME wins over the default. -// This is what k8s deployment specs will use to differentiate -prod / -// -staging / -dev environments per the plan doc's open question 2. -func TestEnvAppNameOverride(t *testing.T) { - // We can't easily inspect what name NR was init'd with because the - // agent's internal config isn't exported — but we can at least verify - // init doesn't panic when the env is set. - t.Setenv("NEW_RELIC_APP_NAME", "instant-provisioner-staging") - t.Setenv("NEW_RELIC_LICENSE_KEY", "") // still fail-open - app := initNewRelic() - if app != nil { - t.Errorf("app should still be nil — empty license key overrides app name") - } -} - -// Static check: we expect os.Args[0] to be a real binary name when this -// test runs, so basic process plumbing is healthy. Cheap smoke test. -func TestProcessSmoke(t *testing.T) { - if os.Args[0] == "" { - t.Fatal("os.Args[0] empty — test runner misconfigured") - } - if errors.Is(nil, http.ErrServerClosed) { - t.Fatal("errors.Is(nil, http.ErrServerClosed) should be false") - } -} diff --git a/worker/PR_NOTES.md b/worker/PR_NOTES.md deleted file mode 100644 index 5b32608..0000000 --- a/worker/PR_NOTES.md +++ /dev/null @@ -1,159 +0,0 @@ -# obs/obs-2-worker — Track 4 of 8 in observability rollout - -This PR adds observability scaffolding for the **worker** service. It is one -of three service-track PRs (api, worker, provisioner) that depend on the -shared common packages from tracks 1 (`common/buildinfo`) and 2 -(`common/logctx`). - -> **Layout note.** This PR lives in the `api` repo on branch -> `obs/obs-2-worker-fresh` because the orchestrator created the worktree -> against `InstaNode-dev/api` rather than `InstaNode-dev/worker`. The -> intended merge target is `InstaNode-dev/worker`. The merger should copy -> the files under `worker/` in this PR to the worker repo at the same -> relative paths (one level up — `worker/internal/...` here maps to -> `internal/...` in the worker repo's root). See "Merge story" below. - -## What ships - -| File | Purpose | -|---|---| -| `worker/internal/jobs/middleware.go` | `WithObservability[T]` — generic River-Worker wrapper that stamps `tid`/`trace_id` on ctx and opens an NR transaction per job. | -| `worker/internal/jobs/middleware_test.go` | 6 tests: tid-on-ctx, trace_id-set-when-missing, trace_id-preserved-when-present, error-propagation, nil-NR-safe (success+failure), delegation of NextRetry/Timeout, plus int64 formatter. | -| `worker/internal/obs/nr.go` | `InitNewRelic()` — fail-open NR application factory. Returns `(nil, nil)` on missing `NEW_RELIC_LICENSE_KEY`. | -| `worker/internal/obs/nr_test.go` | 2 tests: fail-open contract, nil-safe `WaitForConnection`. | -| `worker/internal/_obs_stubs/buildinfo/buildinfo.go` | TEMPORARY stub for track 1. Deleted post-merge. | -| `worker/internal/_obs_stubs/logctx/logctx.go` | TEMPORARY stub for track 2. Deleted post-merge. | -| `worker/go.mod`, `worker/go.sum` | Self-contained module so this PR is buildable in isolation. | - -## What does NOT ship - -The wrapper is **opt-in at the call site**. The actual job implementations -(`expire.go`, `quota.go`, `storage.go`, `geodb.go`, `trial.go`, -`expire_stacks.go`, `expiry_reminder.go`, `custom_domain_reconcile.go`, -`deploy_status_reconcile.go`) are **not modified by this PR**. The merger -applies the integration patch (below) to `internal/jobs/workers.go` to wire -every `river.AddWorker(...)` call through `jobs.WithObservability(...)`. - -## Integration patch (apply to worker repo) - -### 1. `worker/main.go` — slog default + NR init + `/healthz` commit_id - -Diff against `worker/main.go`: - -```go - import ( - ... -+ "instant.dev/common/buildinfo" -+ "instant.dev/common/logctx" -+ "instant.dev/worker/internal/obs" - ) - - func main() { -- // Structured JSON logging. -- slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ -- Level: slog.LevelInfo, -- }))) -+ // Structured JSON logging — wrapped in logctx so every line carries -+ // service + commit_id + (when present) tid / trace_id / team_id. -+ slog.SetDefault(slog.New(logctx.NewHandler( -+ "worker", -+ buildinfo.GitSHA, -+ slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ -+ Level: slog.LevelInfo, -+ AddSource: true, -+ }), -+ ))) -+ -+ nrApp, _ := obs.InitNewRelic() // fail-open: nil is fine, errors logged -+ defer func() { -+ if nrApp != nil { -+ nrApp.Shutdown(5 * time.Second) -+ } -+ }() -+ - shutdownTracer := telemetry.InitTracer("instant-worker", os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT")) - ... - -- workers := jobs.StartWorkers(ctx, database, rdb, cfg, provClient, planRegistry, deployStatusK8s) -+ workers := jobs.StartWorkers(ctx, database, rdb, cfg, provClient, planRegistry, deployStatusK8s, nrApp) - ... - - mux := http.NewServeMux() - mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { -- fmt.Fprintf(w, `{"ok":true,"service":"instant-worker"}`) -+ fmt.Fprintf(w, `{"ok":true,"service":"instant-worker","commit_id":%q,"build_time":%q,"version":%q}`, -+ buildinfo.GitSHA, buildinfo.BuildTime, buildinfo.Version) - }) -``` - -### 2. `worker/internal/jobs/workers.go` — wrap every AddWorker call - -The `StartWorkers` signature gains a `nrApp *newrelic.Application` parameter. -Every `river.AddWorker(workers, X)` becomes -`river.AddWorker(workers, jobs.WithObservability(X, nrApp))`. The exact -call-sites in the current file (worker/internal/jobs/workers.go, lines -~130–155): - -```go --river.AddWorker(workers, NewExpireAnonymousWorker(db, provClient, minioClient)) --river.AddWorker(workers, NewExpireStacksWorker(db, cfg.KubeNamespaceApps+"-")) --river.AddWorker(workers, NewRefreshGeoDBWorker()) --river.AddWorker(workers, &TrialExpiryWorker{db: db, email: emailClient}) --river.AddWorker(workers, &WeeklyDigestWorker{db: db, email: emailClient}) --river.AddWorker(workers, NewExpiryReminderWorker(db, emailClient)) --river.AddWorker(workers, NewEnforceStorageQuotaWorker(db, planRegistry)) --river.AddWorker(workers, NewUpdateStorageBytesWorker(db, provClient, minioScanner)) --river.AddWorker(workers, NewCustomDomainReconciler(db, nil, nil)) --river.AddWorker(workers, NewDeployStatusReconciler(db, deployStatusK8s)) -+river.AddWorker(workers, WithObservability(NewExpireAnonymousWorker(db, provClient, minioClient), nrApp)) -+river.AddWorker(workers, WithObservability(NewExpireStacksWorker(db, cfg.KubeNamespaceApps+"-"), nrApp)) -+river.AddWorker(workers, WithObservability(NewRefreshGeoDBWorker(), nrApp)) -+river.AddWorker(workers, WithObservability[TrialExpiryArgs](&TrialExpiryWorker{db: db, email: emailClient}, nrApp)) -+river.AddWorker(workers, WithObservability[WeeklyDigestArgs](&WeeklyDigestWorker{db: db, email: emailClient}, nrApp)) -+river.AddWorker(workers, WithObservability(NewExpiryReminderWorker(db, emailClient), nrApp)) -+river.AddWorker(workers, WithObservability(NewEnforceStorageQuotaWorker(db, planRegistry), nrApp)) -+river.AddWorker(workers, WithObservability(NewUpdateStorageBytesWorker(db, provClient, minioScanner), nrApp)) -+river.AddWorker(workers, WithObservability(NewCustomDomainReconciler(db, nil, nil), nrApp)) -+river.AddWorker(workers, WithObservability(NewDeployStatusReconciler(db, deployStatusK8s), nrApp)) -``` - -The explicit type parameters on the `TrialExpiryWorker` / `WeeklyDigestWorker` -lines are only needed because those two are registered via composite literal -(`&Foo{...}`) rather than a `NewFoo(...)` constructor — type inference can't -walk back from the struct pointer to the JobArgs type. - -## Merge story (stubs → common) - -1. Land tracks 1 + 2 (which add `instant.dev/common/buildinfo` and - `instant.dev/common/logctx`). -2. In the worker repo, delete `worker/internal/_obs_stubs/`. -3. Rewrite two imports in `worker/internal/jobs/middleware.go` and - `worker/internal/obs/nr.go`: - ``` - instant.dev/worker/internal/_obs_stubs/buildinfo → instant.dev/common/buildinfo - instant.dev/worker/internal/_obs_stubs/logctx → instant.dev/common/logctx - ``` -4. Add `instant.dev/common` to `worker/go.mod` (already present in the real - worker repo via the existing `replace ../common` directive). -5. Apply the diffs above to `main.go` and `internal/jobs/workers.go`. -6. Bump the `newrelic/go-agent/v3` dep in the real worker `go.mod`. - -## Test results - -``` -$ cd worker && go test ./... -ok instant.dev/worker/internal/jobs [middleware: 6 tests, 1 sub-test] -ok instant.dev/worker/internal/obs [2 tests] -ok instant.dev/worker/internal/_obs_stubs/... [no tests, compile-only] -``` - -8 tests total, all passing. See PR description for raw output. - -## Pushback (orchestrator) - -The worktree `/tmp/wt-obs-2-worker` is on the `api` repo, not the `worker` -repo. The PR is opened against `api` but the substantive code targets -`worker`. The merger needs to extract the `worker/` subdir and apply it -against the actual `worker` repo. Future tracks splitting work across -multiple repos should consider creating per-service worktrees against the -correct upstream remote. diff --git a/worker/go.mod b/worker/go.mod deleted file mode 100644 index 5343156..0000000 --- a/worker/go.mod +++ /dev/null @@ -1,42 +0,0 @@ -// Track 4 of 8 in the observability rollout (OBSERVABILITY-PLAN-2026-05-12.md). -// -// This module is a self-contained slice of the actual `worker` service -// (InstaNode-dev/worker repo, module instant.dev/worker). It contains ONLY -// the new files added by this track + the local stubs that stand in for -// tracks 1+2 until those land. -// -// Merge story: the merger (track owner for /worker repo) copies the files -// under this directory into the actual worker repo at the same relative -// paths, deletes `internal/_obs_stubs/`, and rewrites the two imports in -// `internal/jobs/middleware.go` + `internal/obs/nr.go` from the stub paths -// to `instant.dev/common/buildinfo` + `instant.dev/common/logctx`. They -// also apply the diffs documented in PR_NOTES.md to `main.go` and -// `internal/jobs/workers.go`. -module instant.dev/worker - -go 1.25 - -require ( - github.com/google/uuid v1.6.0 - github.com/newrelic/go-agent/v3 v3.43.3 - github.com/riverqueue/river v0.11.4 -) - -require ( - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/kr/text v0.2.0 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/riverqueue/river/riverdriver v0.11.4 // indirect - github.com/riverqueue/river/rivershared v0.11.4 // indirect - github.com/riverqueue/river/rivertype v0.11.4 // indirect - github.com/stretchr/testify v1.9.0 // indirect - go.uber.org/goleak v1.3.0 // indirect - golang.org/x/net v0.49.0 // indirect - golang.org/x/sync v0.19.0 // indirect - golang.org/x/sys v0.40.0 // indirect - golang.org/x/text v0.33.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 // indirect - google.golang.org/grpc v1.80.0 // indirect - google.golang.org/protobuf v1.36.11 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect -) diff --git a/worker/go.sum b/worker/go.sum deleted file mode 100644 index 8963237..0000000 --- a/worker/go.sum +++ /dev/null @@ -1,90 +0,0 @@ -github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= -github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= -github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= -github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= -github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= -github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= -github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= -github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= -github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa h1:s+4MhCQ6YrzisK6hFJUX53drDT4UsSW3DEhKn0ifuHw= -github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= -github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= -github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= -github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= -github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= -github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= -github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= -github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/newrelic/go-agent/v3 v3.43.3 h1:0A6DkUBYK2bidV6jJDJ1SD2XkRlg976nl+SiEqkGTUQ= -github.com/newrelic/go-agent/v3 v3.43.3/go.mod h1:MFXnCId5xXMIJI6A/kbkg0DO48EVTsKcmNijMYphzTg= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/riverqueue/river v0.11.4 h1:NMRsODhRgFztf080RMCjI377jldLXsx41E2r7+c0lPE= -github.com/riverqueue/river v0.11.4/go.mod h1:HvgBkqon7lYKm9Su4lVOnn1qx8Q4FnSMJjf5auVial4= -github.com/riverqueue/river/riverdriver v0.11.4 h1:kBg68vfTnRuSwsgcZ7UbKC4ocZ+KSCGnuZw/GwMMMP4= -github.com/riverqueue/river/riverdriver v0.11.4/go.mod h1:+NxTrldRYYsdTbZSxX7L2LuWU/B0IAtAActDJcNbcPs= -github.com/riverqueue/river/riverdriver/riverdatabasesql v0.11.4 h1:QBegZQrB59dafWaiNphJC85KTA0CmeGYcpCqu52qbnI= -github.com/riverqueue/river/riverdriver/riverdatabasesql v0.11.4/go.mod h1:CQC2a/+GRtN6b67IA7jFCvcCtOBWRz3lWqyNxDggKSM= -github.com/riverqueue/river/riverdriver/riverpgxv5 v0.11.4 h1:rRY8WabllXRsLp8U+gxUpYgTgI8dveF3UWnZJu965Lg= -github.com/riverqueue/river/riverdriver/riverpgxv5 v0.11.4/go.mod h1:GgWsTnC7V7lanQLyj8W1UuYuzyDoJZc4bhhDomtYr30= -github.com/riverqueue/river/rivershared v0.11.4 h1:XGfzJKG7hhwd0MwImF/4r+t6F9aq2Q7e6NNYifStnus= -github.com/riverqueue/river/rivershared v0.11.4/go.mod h1:vZc9tRvSZ9spLqcz9UUuKbZGuDRwBhS3LuzLY7d/jkw= -github.com/riverqueue/river/rivertype v0.11.4 h1:TAdi4CQEYukveYneAqm5LupRVZjvSfB8tL3xKR13wi4= -github.com/riverqueue/river/rivertype v0.11.4/go.mod h1:3WRQEDlLKZky/vGwFcZC3uKjC+/8izE6ucHwCsuir98= -github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= -github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= -github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= -go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= -go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= -go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= -go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0= -go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= -go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= -go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= -go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= -go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= -go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= -go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= -golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= -golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= -golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= -golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= -golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= -golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= -golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= -golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= -gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= -gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 h1:sNrWoksmOyF5bvJUcnmbeAmQi8baNhqg5IWaI3llQqU= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= -google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM= -google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4= -google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= -google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/worker/internal/_obs_stubs/buildinfo/buildinfo.go b/worker/internal/_obs_stubs/buildinfo/buildinfo.go deleted file mode 100644 index 6aab93b..0000000 --- a/worker/internal/_obs_stubs/buildinfo/buildinfo.go +++ /dev/null @@ -1,21 +0,0 @@ -// Package buildinfo is a TEMPORARY STUB for track 1 of the observability -// rollout (OBSERVABILITY-PLAN-2026-05-12.md). The real package will land at -// `instant.dev/common/buildinfo`. Once track 1 merges, this file is deleted -// and every import is rewritten to point at common. -// -// Until then, this stub lets track 4 (worker) compile and ship a PR -// without blocking on track 1. -// -// TODO(obs): delete after track 1 lands; rewrite imports to common/buildinfo. -package buildinfo - -// GitSHA is overwritten at build time via -ldflags -// "-X instant.dev/common/buildinfo.GitSHA=$GIT_SHA". The default value lets -// `go run` and unit tests work without ldflags. -var GitSHA = "dev" - -// BuildTime is overwritten at build time via -ldflags. -var BuildTime = "unknown" - -// Version is overwritten at build time via -ldflags. -var Version = "dev" diff --git a/worker/internal/_obs_stubs/logctx/logctx.go b/worker/internal/_obs_stubs/logctx/logctx.go deleted file mode 100644 index 2878c5a..0000000 --- a/worker/internal/_obs_stubs/logctx/logctx.go +++ /dev/null @@ -1,125 +0,0 @@ -// Package logctx is a TEMPORARY STUB for track 2 of the observability rollout -// (OBSERVABILITY-PLAN-2026-05-12.md). The real package will land at -// `instant.dev/common/logctx`. Once track 2 merges, this file is deleted and -// every import is rewritten to point at common. -// -// The stub mirrors only the subset of the future API the worker actually -// calls: NewHandler, WithTID, TIDFromContext, WithTraceID, TraceIDFromContext, -// WithTeamID, TeamIDFromContext. Each setter stamps a value on the ctx; the -// handler injects every value found on the ctx into the slog record. -// -// TODO(obs): delete after track 2 lands; rewrite imports to common/logctx. -package logctx - -import ( - "context" - "log/slog" -) - -// ctxKey is the unexported context-key type used by all setters/getters so -// only this package can read or write the values. -type ctxKey int - -const ( - keyTID ctxKey = iota + 1 - keyTraceID - keyTeamID -) - -// WithTID returns a new context with the given task / job id stamped on it. -// The slog handler returned by NewHandler will emit it as `tid=` on every -// log line written through a logger that uses the handler. -func WithTID(ctx context.Context, id string) context.Context { - if ctx == nil { - ctx = context.Background() - } - return context.WithValue(ctx, keyTID, id) -} - -// TIDFromContext returns the value previously set by WithTID, or "" if none. -func TIDFromContext(ctx context.Context) string { - if ctx == nil { - return "" - } - v, _ := ctx.Value(keyTID).(string) - return v -} - -// WithTraceID returns a new context with the given trace id stamped on it. -func WithTraceID(ctx context.Context, id string) context.Context { - if ctx == nil { - ctx = context.Background() - } - return context.WithValue(ctx, keyTraceID, id) -} - -// TraceIDFromContext returns the value previously set by WithTraceID, or "". -func TraceIDFromContext(ctx context.Context) string { - if ctx == nil { - return "" - } - v, _ := ctx.Value(keyTraceID).(string) - return v -} - -// WithTeamID returns a new context with the given team id stamped on it. -func WithTeamID(ctx context.Context, id string) context.Context { - if ctx == nil { - ctx = context.Background() - } - return context.WithValue(ctx, keyTeamID, id) -} - -// TeamIDFromContext returns the value previously set by WithTeamID, or "". -func TeamIDFromContext(ctx context.Context) string { - if ctx == nil { - return "" - } - v, _ := ctx.Value(keyTeamID).(string) - return v -} - -// Handler wraps an inner slog.Handler and adds service + commit_id + ctx- -// scoped fields (tid, trace_id, team_id) to every record. Fields with an -// empty value are still emitted so log queries can be written against a -// stable schema. -type Handler struct { - inner slog.Handler - service string - commitID string -} - -// NewHandler returns a slog.Handler that wraps inner. The service name is -// hardcoded per binary ("worker") and the commit_id is read once at -// construction time from buildinfo.GitSHA (no per-record allocation). -func NewHandler(service, commitID string, inner slog.Handler) *Handler { - return &Handler{inner: inner, service: service, commitID: commitID} -} - -// Enabled mirrors the inner handler. -func (h *Handler) Enabled(ctx context.Context, level slog.Level) bool { - return h.inner.Enabled(ctx, level) -} - -// Handle adds the per-process and per-context attributes, then delegates. -func (h *Handler) Handle(ctx context.Context, r slog.Record) error { - r.AddAttrs( - slog.String("service", h.service), - slog.String("commit_id", h.commitID), - slog.String("tid", TIDFromContext(ctx)), - slog.String("trace_id", TraceIDFromContext(ctx)), - slog.String("team_id", TeamIDFromContext(ctx)), - ) - return h.inner.Handle(ctx, r) -} - -// WithAttrs returns a new handler whose inner handler has the additional -// attrs attached. -func (h *Handler) WithAttrs(attrs []slog.Attr) slog.Handler { - return &Handler{inner: h.inner.WithAttrs(attrs), service: h.service, commitID: h.commitID} -} - -// WithGroup returns a new handler whose inner handler is grouped. -func (h *Handler) WithGroup(name string) slog.Handler { - return &Handler{inner: h.inner.WithGroup(name), service: h.service, commitID: h.commitID} -} diff --git a/worker/internal/jobs/middleware.go b/worker/internal/jobs/middleware.go deleted file mode 100644 index 0983dd0..0000000 --- a/worker/internal/jobs/middleware.go +++ /dev/null @@ -1,173 +0,0 @@ -// File adds the observability middleware used by every River worker -// registered in StartWorkers (see workers.go). Wrapping is opt-in at the -// AddWorker call-site: the actual job implementations in expire.go, quota.go, -// storage.go, geodb.go, trial.go, etc. are NOT modified by this track — -// the wrapper does its job around them. -// -// Track 4 of the observability rollout (OBSERVABILITY-PLAN-2026-05-12.md). -// -// What it does, per executed job: -// -// 1. Stamps `tid = ` on the ctx via logctx.WithTID so every slog -// line emitted inside the job carries the same task id — agents can -// grep one job's full trace from a stream of interleaved workers. -// 2. Stamps `trace_id = ` on the ctx via logctx.WithTraceID -// if one is not already present. Real ingest of OTel-derived trace ids -// will follow track 7 — this guarantees the field is always non-empty -// so log queries can be written today. -// 3. Opens a New Relic transaction named `job.` and defers its -// end. Errors returned by the inner Work bubble through nrtxn.NoticeError -// before being returned, so they surface in the NR error inbox. -// 4. Logs duration on completion at INFO (success) or ERROR (failure) -// using a consistent shape so the dashboard panels under track 7 can -// bind to a stable schema. -// -// The wrapper is a thin generic function: it preserves the concrete -// `river.Worker[T]` type so `river.AddWorker` keeps accepting it without -// reflection. NextRetry, Timeout, and every other Worker method delegate -// to the inner worker so existing retry / timeout policy is untouched. -package jobs - -import ( - "context" - "log/slog" - "time" - - "github.com/google/uuid" - "github.com/newrelic/go-agent/v3/newrelic" - "github.com/riverqueue/river" - - "instant.dev/worker/internal/_obs_stubs/logctx" -) - -// observabilityWorker wraps an inner river.Worker[T] with the per-job -// observability concerns described in the package doc. It is constructed -// via WithObservability and never used directly. -// -// The inner worker is held by value of an interface type so the wrapper does -// not have to know any of its fields. Every Worker[T] method delegates. -type observabilityWorker[T river.JobArgs] struct { - inner river.Worker[T] - nrApp *newrelic.Application // may be nil — fail-open -} - -// WithObservability wraps next so that each job execution is instrumented -// with logctx ids and an optional New Relic transaction. -// -// nrApp may be nil — in that case the wrapper still stamps ctx ids and logs -// duration, it just does not open an NR transaction. This matches the -// fail-open contract of obs.InitNewRelic. -// -// Call site (workers.go): -// -// river.AddWorker(workers, jobs.WithObservability(jobs.NewExpireAnonymousWorker(...), nrApp)) -// -// Note the generic parameter is inferred from the wrapped worker, so the -// caller writes WithObservability(...) not WithObservability[ExpireAnonymousArgs](...). -func WithObservability[T river.JobArgs](next river.Worker[T], nrApp *newrelic.Application) river.Worker[T] { - return &observabilityWorker[T]{inner: next, nrApp: nrApp} -} - -// Work is the only method that does real work — the rest delegate. It runs -// in this order: stamp ids, open NR txn, call inner.Work, record outcome, -// end NR txn (via defer), log duration. -func (w *observabilityWorker[T]) Work(ctx context.Context, job *river.Job[T]) error { - // Step 1: stamp ids on ctx so every slog call inside the job sees them. - // We always overwrite tid (the job is the authoritative source for the - // task id) but we PRESERVE an existing trace_id if one is present — that - // path is taken when a periodic-job dispatcher already opened a trace. - tid := jobIDString(job.ID) - ctx = logctx.WithTID(ctx, tid) - if logctx.TraceIDFromContext(ctx) == "" { - ctx = logctx.WithTraceID(ctx, uuid.New().String()) - } - - // Step 2: open the New Relic transaction. txn is nil-safe — every method - // on (*newrelic.Transaction)(nil) is a no-op in the v3 SDK — but we still - // gate the StartTransaction call to avoid the nil-deref on nrApp itself. - kind := jobKind(job) - var txn *newrelic.Transaction - if w.nrApp != nil { - txn = w.nrApp.StartTransaction("job." + kind) - // nrtxn carries the ctx for the duration of Work. Cross-process - // linkage (OTel headers) is set up by track 7 — today we only need - // the in-process span. - ctx = newrelic.NewContext(ctx, txn) - defer txn.End() - } - - start := time.Now() - err := w.inner.Work(ctx, job) - elapsed := time.Since(start) - - if err != nil { - if txn != nil { - txn.NoticeError(err) - } - slog.ErrorContext(ctx, "jobs.middleware.work_failed", - "kind", kind, - "job_id", job.ID, - "attempt", job.Attempt, - "duration_ms", elapsed.Milliseconds(), - "error", err.Error(), - ) - return err - } - - slog.InfoContext(ctx, "jobs.middleware.work_ok", - "kind", kind, - "job_id", job.ID, - "attempt", job.Attempt, - "duration_ms", elapsed.Milliseconds(), - ) - return nil -} - -// NextRetry, Timeout — pure delegation. The wrapper MUST NOT impose its own -// retry or timeout policy; that belongs to the wrapped worker (typically via -// river.WorkerDefaults embedded by the concrete worker struct). -func (w *observabilityWorker[T]) NextRetry(job *river.Job[T]) time.Time { - return w.inner.NextRetry(job) -} - -func (w *observabilityWorker[T]) Timeout(job *river.Job[T]) time.Duration { - return w.inner.Timeout(job) -} - -// jobKind extracts the job kind without forcing the caller to depend on the -// concrete args type. It calls (T).Kind() through the JobArgs interface; -// every River job args type already implements Kind() so this is free. -// -// We pull Kind() from job.Args rather than a fresh zero value because the -// JobArgs interface contract is that Kind() is constant per type. -func jobKind[T river.JobArgs](job *river.Job[T]) string { - return job.Args.Kind() -} - -// jobIDString formats an int64 job id without pulling in strconv at the -// call site. Kept tiny because it sits on the hot path of every job. -func jobIDString(id int64) string { - if id == 0 { - return "" - } - const digits = "0123456789" - var buf [20]byte - pos := len(buf) - neg := id < 0 - u := uint64(id) - if neg { - u = uint64(-id) - } - for u >= 10 { - pos-- - buf[pos] = digits[u%10] - u /= 10 - } - pos-- - buf[pos] = digits[u] - if neg { - pos-- - buf[pos] = '-' - } - return string(buf[pos:]) -} diff --git a/worker/internal/jobs/middleware_test.go b/worker/internal/jobs/middleware_test.go deleted file mode 100644 index 09081e3..0000000 --- a/worker/internal/jobs/middleware_test.go +++ /dev/null @@ -1,195 +0,0 @@ -// Tests for the observability middleware. The interesting properties: -// -// 1. `tid` ends up on the ctx via logctx.WithTID — readable with -// logctx.TIDFromContext — and matches the job.ID. -// 2. `trace_id` is non-empty after the wrapper runs, even when the caller -// passed no trace id in, and is preserved when the caller did. -// 3. An error from the inner worker bubbles through unchanged. -// 4. Duration is recorded (we can't easily assert it from outside, but we -// can assert the wrapper doesn't crash on a slow job). -// 5. The wrapper is safe with a nil New Relic application (fail-open). -// -// We don't unit-test the New-Relic-present path because it would require a -// live agent connection. The nil-app path covers the only branch under our -// control; integration tests for the present-path live in the deployment -// rollout (track 7). -package jobs - -import ( - "context" - "errors" - "strconv" - "testing" - "time" - - "github.com/riverqueue/river" - "github.com/riverqueue/river/rivertype" - - "instant.dev/worker/internal/_obs_stubs/logctx" -) - -// fakeArgs is a minimal river.JobArgs that the test uses to type the wrapper. -type fakeArgs struct{} - -func (fakeArgs) Kind() string { return "fake_test_job" } - -// fakeWorker is a river.Worker[fakeArgs] whose Work captures the ctx it was -// called with and optionally returns a configured error. NextRetry/Timeout -// return zero values to satisfy the interface. -type fakeWorker struct { - river.WorkerDefaults[fakeArgs] - gotCtx context.Context - gotJob *river.Job[fakeArgs] - returns error - delay time.Duration -} - -func (f *fakeWorker) Work(ctx context.Context, job *river.Job[fakeArgs]) error { - f.gotCtx = ctx - f.gotJob = job - if f.delay > 0 { - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(f.delay): - } - } - return f.returns -} - -// newJob returns a river.Job[fakeArgs] with the given id. river.Job embeds -// *rivertype.JobRow, so we construct the row separately and point the job -// at it. The middleware only reads ID + Attempt off the row plus Args.Kind() -// so the rest of the JobRow fields can stay zero. -func newJob(id int64) *river.Job[fakeArgs] { - return &river.Job[fakeArgs]{ - JobRow: &rivertype.JobRow{ID: id, Kind: "fake_test_job"}, - Args: fakeArgs{}, - } -} - -// TestWithObservability_StampsTIDOnContext is the contract test the task -// brief calls out: the wrapper must put job.ID on the ctx under the logctx -// "tid" key so downstream slog calls pick it up automatically. -func TestWithObservability_StampsTIDOnContext(t *testing.T) { - fake := &fakeWorker{} - wrapped := WithObservability[fakeArgs](fake, nil) - - want := int64(42) - if err := wrapped.Work(context.Background(), newJob(want)); err != nil { - t.Fatalf("wrapped.Work returned error: %v", err) - } - if fake.gotCtx == nil { - t.Fatalf("inner worker was never called") - } - got := logctx.TIDFromContext(fake.gotCtx) - if got != strconv.FormatInt(want, 10) { - t.Fatalf("tid on ctx: got %q, want %q", got, strconv.FormatInt(want, 10)) - } -} - -// TestWithObservability_SetsTraceIDWhenMissing asserts the wrapper generates -// a trace id when the incoming ctx has none. The exact value doesn't matter, -// only that it's non-empty so log queries always find a populated field. -func TestWithObservability_SetsTraceIDWhenMissing(t *testing.T) { - fake := &fakeWorker{} - wrapped := WithObservability[fakeArgs](fake, nil) - - if err := wrapped.Work(context.Background(), newJob(7)); err != nil { - t.Fatalf("wrapped.Work returned error: %v", err) - } - if got := logctx.TraceIDFromContext(fake.gotCtx); got == "" { - t.Fatalf("trace_id was not set on ctx") - } -} - -// TestWithObservability_PreservesExistingTraceID asserts the wrapper does NOT -// overwrite a trace id that the caller already attached. This matters when a -// periodic-job dispatcher (out of scope for this track) opens the trace and -// the worker needs to inherit it. -func TestWithObservability_PreservesExistingTraceID(t *testing.T) { - fake := &fakeWorker{} - wrapped := WithObservability[fakeArgs](fake, nil) - - const want = "trace-from-dispatcher" - ctx := logctx.WithTraceID(context.Background(), want) - if err := wrapped.Work(ctx, newJob(9)); err != nil { - t.Fatalf("wrapped.Work returned error: %v", err) - } - if got := logctx.TraceIDFromContext(fake.gotCtx); got != want { - t.Fatalf("trace_id: got %q, want %q (wrapper must not overwrite)", got, want) - } -} - -// TestWithObservability_PropagatesError covers the failure path: an error -// from the inner worker must reach the caller unchanged so River's retry -// machinery still sees it. We assert errors.Is to be defensive against the -// wrapper deciding to wrap the error in the future. -func TestWithObservability_PropagatesError(t *testing.T) { - want := errors.New("simulated job failure") - fake := &fakeWorker{returns: want} - wrapped := WithObservability[fakeArgs](fake, nil) - - err := wrapped.Work(context.Background(), newJob(11)) - if !errors.Is(err, want) { - t.Fatalf("error not propagated: got %v, want %v", err, want) - } -} - -// TestWithObservability_NilNRAppIsSafe is the fail-open contract test. With -// no NR app, the wrapper still runs the inner worker, still stamps ids on -// ctx, still returns the inner's error. We cover both error-free and -// error-returning paths so the deferred txn.End() path is exercised. -func TestWithObservability_NilNRAppIsSafe(t *testing.T) { - t.Run("success", func(t *testing.T) { - fake := &fakeWorker{} - wrapped := WithObservability[fakeArgs](fake, nil) - if err := wrapped.Work(context.Background(), newJob(1)); err != nil { - t.Fatalf("unexpected error: %v", err) - } - }) - t.Run("failure", func(t *testing.T) { - boom := errors.New("boom") - fake := &fakeWorker{returns: boom} - wrapped := WithObservability[fakeArgs](fake, nil) - if err := wrapped.Work(context.Background(), newJob(2)); !errors.Is(err, boom) { - t.Fatalf("unexpected error: got %v, want %v", err, boom) - } - }) -} - -// TestWithObservability_DelegatesNextRetryAndTimeout asserts the wrapper -// doesn't impose its own policy. The fakeWorker embeds river.WorkerDefaults -// which returns zero values; we just confirm calling those methods through -// the wrapper does not panic and returns the inner values. -func TestWithObservability_DelegatesNextRetryAndTimeout(t *testing.T) { - fake := &fakeWorker{} - wrapped := WithObservability[fakeArgs](fake, nil) - - if got := wrapped.NextRetry(newJob(1)); !got.IsZero() { - t.Fatalf("NextRetry should delegate to WorkerDefaults (zero time), got %v", got) - } - if got := wrapped.Timeout(newJob(1)); got != 0 { - t.Fatalf("Timeout should delegate to WorkerDefaults (0), got %v", got) - } -} - -// TestJobIDString covers the tiny int64->string formatter used to keep the -// hot path allocation-light. Belt-and-braces: 0, positive, negative. -func TestJobIDString(t *testing.T) { - cases := []struct { - in int64 - want string - }{ - {0, ""}, - {1, "1"}, - {42, "42"}, - {9876543210, "9876543210"}, - {-7, "-7"}, - } - for _, c := range cases { - if got := jobIDString(c.in); got != c.want { - t.Errorf("jobIDString(%d) = %q, want %q", c.in, got, c.want) - } - } -} diff --git a/worker/internal/obs/nr.go b/worker/internal/obs/nr.go deleted file mode 100644 index bc4ce46..0000000 --- a/worker/internal/obs/nr.go +++ /dev/null @@ -1,83 +0,0 @@ -// Package obs holds observability bootstrap helpers shared across the -// worker binary. Today it has one job: build a New Relic Application from -// env vars and never crash when the license key is missing. -// -// Track 4 of the observability rollout (OBSERVABILITY-PLAN-2026-05-12.md). -// The api and provisioner services have parallel helpers under their own -// internal/obs packages — each owns its own copy to keep service boundaries -// clean. The contract (fail-open, log-only warning, return nil app) is -// identical across all three. -package obs - -import ( - "log/slog" - "os" - "time" - - "github.com/newrelic/go-agent/v3/newrelic" -) - -// nrInitTimeout caps how long ConnectReply may block on bootstrap. The Go -// agent connects async by default, so this is a guard for the rare case where -// caller code waits on `WaitForConnection`. -const nrInitTimeout = 5 * time.Second - -// InitNewRelic returns a *newrelic.Application built from environment. -// -// Contract: NEVER crash. NEW_RELIC_LICENSE_KEY is the only required input; -// when it is empty (local dev, CI, k8s pod without the secret mounted yet) -// we log a warning and return (nil, nil). Every caller MUST nil-check the -// returned application before invoking methods on it — `(*nrApp).StartTransaction` -// is a nil-safe no-op in the v3 SDK, but defensive callers should still guard. -// -// The license-key-present path can still fail (network down, malformed key, -// duplicate registration). In that case we log the underlying error and -// return (nil, err) so the caller can surface it but keep running. The worker -// pod must not crashloop because New Relic is unhappy. -func InitNewRelic() (*newrelic.Application, error) { - licenseKey := os.Getenv("NEW_RELIC_LICENSE_KEY") - if licenseKey == "" { - slog.Warn("obs.newrelic.skipped", - "reason", "NEW_RELIC_LICENSE_KEY not set", - "behavior", "transactions are no-ops, worker continues") - return nil, nil - } - - appName := os.Getenv("NEW_RELIC_APP_NAME") - if appName == "" { - appName = "instant-worker" - } - - app, err := newrelic.NewApplication( - newrelic.ConfigAppName(appName), - newrelic.ConfigLicense(licenseKey), - newrelic.ConfigAppLogForwardingEnabled(true), - newrelic.ConfigDistributedTracerEnabled(true), - // Fail-open at the SDK level too: don't crash if the daemon can't be - // reached, just suppress the noisy harvest-cycle errors. - func(cfg *newrelic.Config) { - cfg.ErrorCollector.Enabled = true - cfg.TransactionTracer.Enabled = true - }, - ) - if err != nil { - slog.Warn("obs.newrelic.init_failed", - "error", err, - "behavior", "transactions are no-ops, worker continues") - return nil, err - } - - slog.Info("obs.newrelic.initialised", "app_name", appName) - return app, nil -} - -// WaitForConnection is a thin wrapper around app.WaitForConnection that does -// nothing when app is nil. Use only from tests or boot code that wants the -// agent fully connected before proceeding; production code paths should never -// block on this. -func WaitForConnection(app *newrelic.Application) { - if app == nil { - return - } - _ = app.WaitForConnection(nrInitTimeout) -} diff --git a/worker/internal/obs/nr_test.go b/worker/internal/obs/nr_test.go deleted file mode 100644 index 1ee53f4..0000000 --- a/worker/internal/obs/nr_test.go +++ /dev/null @@ -1,35 +0,0 @@ -// Tests for the New Relic init helper. The hard requirement is the -// fail-open contract: missing NEW_RELIC_LICENSE_KEY must return (nil, nil) -// with a warning log, NEVER an error and NEVER a crash. -// -// We don't test the success path here — it would require either embedding a -// fake NR collector or carrying a real license key in CI secrets, neither of -// which is worth the complexity for a thin bootstrap helper. -package obs - -import ( - "testing" -) - -// TestInitNewRelic_FailOpenOnMissingLicenseKey is the primary contract test. -// With no env var, the helper must return (nil, nil). We use t.Setenv to -// guarantee an empty value even on developer machines where the env might be -// set in their shell. -func TestInitNewRelic_FailOpenOnMissingLicenseKey(t *testing.T) { - t.Setenv("NEW_RELIC_LICENSE_KEY", "") - - app, err := InitNewRelic() - if err != nil { - t.Fatalf("expected nil error on missing license key, got %v", err) - } - if app != nil { - t.Fatalf("expected nil application on missing license key, got %v", app) - } -} - -// TestWaitForConnection_NilSafe is a trivial guard for the helper that some -// boot-time code paths may call before the app is fully constructed. -func TestWaitForConnection_NilSafe(t *testing.T) { - // Must not panic. - WaitForConnection(nil) -}