From b04bc9eb8e36982324eb94eb615619e77447e233 Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Thu, 4 Jun 2026 22:13:59 -0700 Subject: [PATCH 1/3] Issue workload identities on distributed runners The workload identity issuer holds the cluster's Ed25519 signing key and was only wired into the coordinator's embedded runner. Distributed runners built their sandbox controller with a nil issuer, so their sandboxes never received an identity token, the MIREN_* env vars, token refresh, or the token request server. Expose token minting as a coordinator RPC that runners call: - Add WorkloadIssuerInfo and IssueWorkloadToken to the RunnerRegistration service. The coordinator (which holds the signing key) mints tokens on behalf of runners. - Introduce a workloadidentity.TokenIssuer interface. The concrete *Issuer satisfies it on the coordinator; distributed runners use a remoteIssuer that proxies minting over RPC. A background loop keeps the issuer URL in sync (re-registration, coordinator restart) and logs only on transitions. - Guard against boxing a typed-nil *Issuer into the interface field, which would defeat the controller's nil checks. Authorization and integrity hardening on the coordinator side: - IssueWorkloadToken is authorized per sandbox: a runner may only mint tokens for sandboxes scheduled to it, verified via the caller's mTLS client-certificate identity. - The certificate CommonName embeds the full runner ID (not an 8-char prefix) so a runner cannot choose a colliding ID and impersonate another. - The app claim is derived from the sandbox entity server-side rather than trusted from the caller, and dropped from the RPC arguments entirely. Startup queries retry transient failures so a brief blip does not leave a runner permanently without token issuance. --- api/runner/rpc.yml | 41 +++ api/runner/runner_v1alpha/rpc.gen.go | 315 ++++++++++++++++++++++ cli/commands/server.go | 8 +- components/coordinate/coordinate.go | 1 + components/runner/remote_issuer.go | 152 +++++++++++ components/runner/remote_issuer_test.go | 80 ++++++ components/runner/runner.go | 68 ++++- controllers/sandbox/sandbox.go | 4 +- controllers/sandbox/token_server_test.go | 6 +- pkg/workloadidentity/issuer.go | 12 + servers/runner/registration.go | 182 ++++++++++++- servers/runner/registration_authz_test.go | 185 +++++++++++++ 12 files changed, 1041 insertions(+), 13 deletions(-) create mode 100644 components/runner/remote_issuer.go create mode 100644 components/runner/remote_issuer_test.go create mode 100644 servers/runner/registration_authz_test.go diff --git a/api/runner/rpc.yml b/api/runner/rpc.yml index 4e667c920..5bea3cb20 100644 --- a/api/runner/rpc.yml +++ b/api/runner/rpc.yml @@ -159,6 +159,47 @@ interfaces: type: string doc: Error message if removal failed + - name: WorkloadIssuerInfo + index: 6 + doc: | + Report whether the coordinator has a workload identity issuer + configured, and its issuer URL. Distributed runners call this once at + startup to decide whether to mint workload identity tokens via the + coordinator. + results: + - name: enabled + type: bool + doc: Whether a workload identity issuer is configured on the coordinator + - name: issuer_url + type: string + doc: The issuer URL (iss claim anchor) when enabled + + - name: IssueWorkloadToken + index: 7 + doc: | + Mint a workload identity token for a sandbox. Distributed runners, + which do not hold the cluster signing key, call this to obtain tokens + signed by the coordinator. The application identity is derived from the + sandbox by the coordinator, not supplied by the caller. + parameters: + - name: sandbox_id + type: string + doc: Sandbox ID the token is issued for + - name: audience + type: list + element: string + doc: Optional token audiences (defaults to "miren" when empty) + - name: ttl_seconds + type: int64 + doc: Optional token TTL in seconds (0 uses the issuer default) + results: + - name: token + type: string + doc: The signed workload identity token (JWT) + - name: error + type: string + doc: Error message if issuance failed + types: - type: InviteInfo doc: Information about a runner invite diff --git a/api/runner/runner_v1alpha/rpc.gen.go b/api/runner/runner_v1alpha/rpc.gen.go index 4e7c875c0..6d2198d96 100644 --- a/api/runner/runner_v1alpha/rpc.gen.go +++ b/api/runner/runner_v1alpha/rpc.gen.go @@ -927,6 +927,157 @@ func (v *RunnerRegistrationRemoveRunnerResults) UnmarshalJSON(data []byte) error return json.Unmarshal(data, &v.data) } +type runnerRegistrationWorkloadIssuerInfoArgsData struct{} + +type RunnerRegistrationWorkloadIssuerInfoArgs struct { + call rpc.Call + data runnerRegistrationWorkloadIssuerInfoArgsData +} + +func (v *RunnerRegistrationWorkloadIssuerInfoArgs) MarshalCBOR() ([]byte, error) { + return cbor.Marshal(v.data) +} + +func (v *RunnerRegistrationWorkloadIssuerInfoArgs) UnmarshalCBOR(data []byte) error { + return cbor.Unmarshal(data, &v.data) +} + +func (v *RunnerRegistrationWorkloadIssuerInfoArgs) MarshalJSON() ([]byte, error) { + return json.Marshal(v.data) +} + +func (v *RunnerRegistrationWorkloadIssuerInfoArgs) UnmarshalJSON(data []byte) error { + return json.Unmarshal(data, &v.data) +} + +type runnerRegistrationWorkloadIssuerInfoResultsData struct { + Enabled *bool `cbor:"0,keyasint,omitempty" json:"enabled,omitempty"` + IssuerUrl *string `cbor:"1,keyasint,omitempty" json:"issuer_url,omitempty"` +} + +type RunnerRegistrationWorkloadIssuerInfoResults struct { + call rpc.Call + data runnerRegistrationWorkloadIssuerInfoResultsData +} + +func (v *RunnerRegistrationWorkloadIssuerInfoResults) SetEnabled(enabled bool) { + v.data.Enabled = &enabled +} + +func (v *RunnerRegistrationWorkloadIssuerInfoResults) SetIssuerUrl(issuer_url string) { + v.data.IssuerUrl = &issuer_url +} + +func (v *RunnerRegistrationWorkloadIssuerInfoResults) MarshalCBOR() ([]byte, error) { + return cbor.Marshal(v.data) +} + +func (v *RunnerRegistrationWorkloadIssuerInfoResults) UnmarshalCBOR(data []byte) error { + return cbor.Unmarshal(data, &v.data) +} + +func (v *RunnerRegistrationWorkloadIssuerInfoResults) MarshalJSON() ([]byte, error) { + return json.Marshal(v.data) +} + +func (v *RunnerRegistrationWorkloadIssuerInfoResults) UnmarshalJSON(data []byte) error { + return json.Unmarshal(data, &v.data) +} + +type runnerRegistrationIssueWorkloadTokenArgsData struct { + SandboxId *string `cbor:"0,keyasint,omitempty" json:"sandbox_id,omitempty"` + Audience *[]string `cbor:"1,keyasint,omitempty" json:"audience,omitempty"` + TtlSeconds *int64 `cbor:"2,keyasint,omitempty" json:"ttl_seconds,omitempty"` +} + +type RunnerRegistrationIssueWorkloadTokenArgs struct { + call rpc.Call + data runnerRegistrationIssueWorkloadTokenArgsData +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) HasSandboxId() bool { + return v.data.SandboxId != nil +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) SandboxId() string { + if v.data.SandboxId == nil { + return "" + } + return *v.data.SandboxId +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) HasAudience() bool { + return v.data.Audience != nil +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) Audience() []string { + if v.data.Audience == nil { + return nil + } + return *v.data.Audience +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) HasTtlSeconds() bool { + return v.data.TtlSeconds != nil +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) TtlSeconds() int64 { + if v.data.TtlSeconds == nil { + return 0 + } + return *v.data.TtlSeconds +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) MarshalCBOR() ([]byte, error) { + return cbor.Marshal(v.data) +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) UnmarshalCBOR(data []byte) error { + return cbor.Unmarshal(data, &v.data) +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) MarshalJSON() ([]byte, error) { + return json.Marshal(v.data) +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) UnmarshalJSON(data []byte) error { + return json.Unmarshal(data, &v.data) +} + +type runnerRegistrationIssueWorkloadTokenResultsData struct { + Token *string `cbor:"0,keyasint,omitempty" json:"token,omitempty"` + Error *string `cbor:"1,keyasint,omitempty" json:"error,omitempty"` +} + +type RunnerRegistrationIssueWorkloadTokenResults struct { + call rpc.Call + data runnerRegistrationIssueWorkloadTokenResultsData +} + +func (v *RunnerRegistrationIssueWorkloadTokenResults) SetToken(token string) { + v.data.Token = &token +} + +func (v *RunnerRegistrationIssueWorkloadTokenResults) SetError(error string) { + v.data.Error = &error +} + +func (v *RunnerRegistrationIssueWorkloadTokenResults) MarshalCBOR() ([]byte, error) { + return cbor.Marshal(v.data) +} + +func (v *RunnerRegistrationIssueWorkloadTokenResults) UnmarshalCBOR(data []byte) error { + return cbor.Unmarshal(data, &v.data) +} + +func (v *RunnerRegistrationIssueWorkloadTokenResults) MarshalJSON() ([]byte, error) { + return json.Marshal(v.data) +} + +func (v *RunnerRegistrationIssueWorkloadTokenResults) UnmarshalJSON(data []byte) error { + return json.Unmarshal(data, &v.data) +} + type RunnerRegistrationCreateInvite struct { rpc.Call args RunnerRegistrationCreateInviteArgs @@ -1083,6 +1234,58 @@ func (t *RunnerRegistrationRemoveRunner) Results() *RunnerRegistrationRemoveRunn return results } +type RunnerRegistrationWorkloadIssuerInfo struct { + rpc.Call + args RunnerRegistrationWorkloadIssuerInfoArgs + results RunnerRegistrationWorkloadIssuerInfoResults +} + +func (t *RunnerRegistrationWorkloadIssuerInfo) Args() *RunnerRegistrationWorkloadIssuerInfoArgs { + args := &t.args + if args.call != nil { + return args + } + args.call = t.Call + t.Call.Args(args) + return args +} + +func (t *RunnerRegistrationWorkloadIssuerInfo) Results() *RunnerRegistrationWorkloadIssuerInfoResults { + results := &t.results + if results.call != nil { + return results + } + results.call = t.Call + t.Call.Results(results) + return results +} + +type RunnerRegistrationIssueWorkloadToken struct { + rpc.Call + args RunnerRegistrationIssueWorkloadTokenArgs + results RunnerRegistrationIssueWorkloadTokenResults +} + +func (t *RunnerRegistrationIssueWorkloadToken) Args() *RunnerRegistrationIssueWorkloadTokenArgs { + args := &t.args + if args.call != nil { + return args + } + args.call = t.Call + t.Call.Args(args) + return args +} + +func (t *RunnerRegistrationIssueWorkloadToken) Results() *RunnerRegistrationIssueWorkloadTokenResults { + results := &t.results + if results.call != nil { + return results + } + results.call = t.Call + t.Call.Results(results) + return results +} + type RunnerRegistration interface { CreateInvite(ctx context.Context, state *RunnerRegistrationCreateInvite) error Join(ctx context.Context, state *RunnerRegistrationJoin) error @@ -1090,6 +1293,8 @@ type RunnerRegistration interface { RevokeInvite(ctx context.Context, state *RunnerRegistrationRevokeInvite) error ListRunners(ctx context.Context, state *RunnerRegistrationListRunners) error RemoveRunner(ctx context.Context, state *RunnerRegistrationRemoveRunner) error + WorkloadIssuerInfo(ctx context.Context, state *RunnerRegistrationWorkloadIssuerInfo) error + IssueWorkloadToken(ctx context.Context, state *RunnerRegistrationIssueWorkloadToken) error } type reexportRunnerRegistration struct { @@ -1120,6 +1325,14 @@ func (reexportRunnerRegistration) RemoveRunner(ctx context.Context, state *Runne panic("not implemented") } +func (reexportRunnerRegistration) WorkloadIssuerInfo(ctx context.Context, state *RunnerRegistrationWorkloadIssuerInfo) error { + panic("not implemented") +} + +func (reexportRunnerRegistration) IssueWorkloadToken(ctx context.Context, state *RunnerRegistrationIssueWorkloadToken) error { + panic("not implemented") +} + func (t reexportRunnerRegistration) CapabilityClient() rpc.Client { return t.client } @@ -1180,6 +1393,24 @@ func AdaptRunnerRegistration(t RunnerRegistration) *rpc.Interface { return t.RemoveRunner(ctx, &RunnerRegistrationRemoveRunner{Call: call}) }, }, + { + Name: "WorkloadIssuerInfo", + InterfaceName: "RunnerRegistration", + Index: 6, + Public: false, + Handler: func(ctx context.Context, call rpc.Call) error { + return t.WorkloadIssuerInfo(ctx, &RunnerRegistrationWorkloadIssuerInfo{Call: call}) + }, + }, + { + Name: "IssueWorkloadToken", + InterfaceName: "RunnerRegistration", + Index: 7, + Public: false, + Handler: func(ctx context.Context, call rpc.Call) error { + return t.IssueWorkloadToken(ctx, &RunnerRegistrationIssueWorkloadToken{Call: call}) + }, + }, } return rpc.NewInterface(methods, t) @@ -1549,3 +1780,87 @@ func (v RunnerRegistrationClient) RemoveRunner(ctx context.Context, query string return &RunnerRegistrationClientRemoveRunnerResults{client: v.Client, data: ret}, nil } + +type RunnerRegistrationClientWorkloadIssuerInfoResults struct { + client rpc.Client + data runnerRegistrationWorkloadIssuerInfoResultsData +} + +func (v *RunnerRegistrationClientWorkloadIssuerInfoResults) HasEnabled() bool { + return v.data.Enabled != nil +} + +func (v *RunnerRegistrationClientWorkloadIssuerInfoResults) Enabled() bool { + if v.data.Enabled == nil { + return false + } + return *v.data.Enabled +} + +func (v *RunnerRegistrationClientWorkloadIssuerInfoResults) HasIssuerUrl() bool { + return v.data.IssuerUrl != nil +} + +func (v *RunnerRegistrationClientWorkloadIssuerInfoResults) IssuerUrl() string { + if v.data.IssuerUrl == nil { + return "" + } + return *v.data.IssuerUrl +} + +func (v RunnerRegistrationClient) WorkloadIssuerInfo(ctx context.Context) (*RunnerRegistrationClientWorkloadIssuerInfoResults, error) { + args := RunnerRegistrationWorkloadIssuerInfoArgs{} + + var ret runnerRegistrationWorkloadIssuerInfoResultsData + + err := v.Call(ctx, "WorkloadIssuerInfo", &args, &ret) + if err != nil { + return nil, err + } + + return &RunnerRegistrationClientWorkloadIssuerInfoResults{client: v.Client, data: ret}, nil +} + +type RunnerRegistrationClientIssueWorkloadTokenResults struct { + client rpc.Client + data runnerRegistrationIssueWorkloadTokenResultsData +} + +func (v *RunnerRegistrationClientIssueWorkloadTokenResults) HasToken() bool { + return v.data.Token != nil +} + +func (v *RunnerRegistrationClientIssueWorkloadTokenResults) Token() string { + if v.data.Token == nil { + return "" + } + return *v.data.Token +} + +func (v *RunnerRegistrationClientIssueWorkloadTokenResults) HasError() bool { + return v.data.Error != nil +} + +func (v *RunnerRegistrationClientIssueWorkloadTokenResults) Error() string { + if v.data.Error == nil { + return "" + } + return *v.data.Error +} + +func (v RunnerRegistrationClient) IssueWorkloadToken(ctx context.Context, sandbox_id string, audience []string, ttl_seconds int64) (*RunnerRegistrationClientIssueWorkloadTokenResults, error) { + args := RunnerRegistrationIssueWorkloadTokenArgs{} + args.data.SandboxId = &sandbox_id + x := slices.Clone(audience) + args.data.Audience = &x + args.data.TtlSeconds = &ttl_seconds + + var ret runnerRegistrationIssueWorkloadTokenResultsData + + err := v.Call(ctx, "IssueWorkloadToken", &args, &ret) + if err != nil { + return nil, err + } + + return &RunnerRegistrationClientIssueWorkloadTokenResults{client: v.Client, data: ret}, nil +} diff --git a/cli/commands/server.go b/cli/commands/server.go index 6b745f660..b1c2f78fc 100644 --- a/cli/commands/server.go +++ b/cli/commands/server.go @@ -827,7 +827,13 @@ func Server(ctx *Context, opts serverconfig.CLIFlags) error { Resolver: res, SandboxMetrics: ctx.ServerState.SandboxMetrics, IsCoordinator: true, - WorkloadIssuer: workloadIssuer, + } + + // Assign only when non-nil: storing a typed-nil *Issuer into the + // TokenIssuer interface field would make it compare != nil, defeating the + // nil guards in the sandbox controller and panicking on first use. + if workloadIssuer != nil { + deps.WorkloadIssuer = workloadIssuer } rc.DataPath = cfg.Server.GetDataPath() diff --git a/components/coordinate/coordinate.go b/components/coordinate/coordinate.go index 0ddbd1377..67003887a 100644 --- a/components/coordinate/coordinate.go +++ b/components/coordinate/coordinate.go @@ -1163,6 +1163,7 @@ func (c *Coordinator) Start(ctx context.Context) error { NetworkBackend: c.NetworkBackend, VictoriametricsAddress: c.VictoriametricsAddress, VictorialogsAddress: c.VictorialogsAddress, + WorkloadIssuer: c.WorkloadIssuer, }) server.ExposeValue(rpc.ServiceRunner, runner_v1alpha.AdaptRunnerRegistration(runnerReg)) diff --git a/components/runner/remote_issuer.go b/components/runner/remote_issuer.go new file mode 100644 index 000000000..125fac7a5 --- /dev/null +++ b/components/runner/remote_issuer.go @@ -0,0 +1,152 @@ +package runner + +import ( + "context" + "fmt" + "log/slog" + "sync" + "time" + + "miren.dev/runtime/api/runner/runner_v1alpha" + "miren.dev/runtime/pkg/workloadidentity" +) + +const ( + // remoteTokenTimeout bounds a single token-minting RPC to the coordinator. + remoteTokenTimeout = 30 * time.Second + + // issuerURLRefreshInterval is how often the cached issuer URL is re-synced + // with the coordinator. + issuerURLRefreshInterval = 5 * time.Minute +) + +// remoteIssuer satisfies workloadidentity.TokenIssuer by proxying token minting +// to the coordinator over RPC. Distributed runners do not hold the cluster +// signing key, so they cannot mint tokens locally and instead ask the +// coordinator, which holds the key. +// +// The issuer URL is cached but kept in sync by a background loop: it can change +// while the runner is running (e.g. the cluster gains a DNS hostname during +// re-registration), and the RPC client transparently reconnects after a +// coordinator restart, so periodic polling re-syncs without a runner restart. +type remoteIssuer struct { + ctx context.Context + client *runner_v1alpha.RunnerRegistrationClient + log *slog.Logger + + mu sync.RWMutex + issuerURL string + enabled bool +} + +var _ workloadidentity.TokenIssuer = (*remoteIssuer)(nil) + +func newRemoteIssuer(ctx context.Context, log *slog.Logger, client *runner_v1alpha.RunnerRegistrationClient, issuerURL string) *remoteIssuer { + r := &remoteIssuer{ + ctx: ctx, + client: client, + log: log, + issuerURL: issuerURL, + // A remoteIssuer is only constructed once the coordinator has reported + // an enabled issuer, so start in the enabled state. + enabled: true, + } + go r.refreshLoop() + return r +} + +func (r *remoteIssuer) IssuerURL() string { + r.mu.RLock() + defer r.mu.RUnlock() + return r.issuerURL +} + +func (r *remoteIssuer) setIssuerURL(url string) { + r.mu.Lock() + r.issuerURL = url + r.mu.Unlock() +} + +// setEnabled records the latest enabled state and reports whether it changed, +// so transitions can be logged once instead of on every refresh. +func (r *remoteIssuer) setEnabled(enabled bool) bool { + r.mu.Lock() + defer r.mu.Unlock() + if r.enabled == enabled { + return false + } + r.enabled = enabled + return true +} + +func (r *remoteIssuer) IssueToken(app, sandboxID string) (string, error) { + return r.IssueTokenWithOptions(app, sandboxID, workloadidentity.TokenOptions{}) +} + +// IssueTokenWithOptions mints a token via the coordinator. The app argument is +// ignored: the coordinator derives the app identity from the sandbox itself so +// a runner cannot forge it. +func (r *remoteIssuer) IssueTokenWithOptions(_, sandboxID string, opts workloadidentity.TokenOptions) (string, error) { + ctx, cancel := context.WithTimeout(r.ctx, remoteTokenTimeout) + defer cancel() + + var ttlSeconds int64 + if opts.TTL > 0 { + ttlSeconds = int64(opts.TTL / time.Second) + } + + res, err := r.client.IssueWorkloadToken(ctx, sandboxID, opts.Audience, ttlSeconds) + if err != nil { + return "", fmt.Errorf("requesting workload token from coordinator: %w", err) + } + if res.Error() != "" { + return "", fmt.Errorf("coordinator refused workload token: %s", res.Error()) + } + return res.Token(), nil +} + +// refreshLoop keeps the cached issuer URL in sync with the coordinator until the +// runner's context is cancelled. +func (r *remoteIssuer) refreshLoop() { + ticker := time.NewTicker(issuerURLRefreshInterval) + defer ticker.Stop() + + for { + select { + case <-r.ctx.Done(): + return + case <-ticker.C: + r.refreshIssuerURL() + } + } +} + +func (r *remoteIssuer) refreshIssuerURL() { + ctx, cancel := context.WithTimeout(r.ctx, remoteTokenTimeout) + defer cancel() + + info, err := r.client.WorkloadIssuerInfo(ctx) + if err != nil { + r.log.Warn("failed to refresh workload issuer info", "error", err) + return + } + + enabled := info.Enabled() + if r.setEnabled(enabled) { + // Log only on transitions to avoid spamming every refresh interval. + if enabled { + r.log.Info("coordinator re-enabled workload identity issuer") + } else { + r.log.Warn("coordinator disabled workload identity issuer; sandbox token issuance will fail until re-enabled") + } + } + if !enabled { + // Leave the cached URL as-is so already-injected sandbox env values + // stay coherent. + return + } + if url := info.IssuerUrl(); url != "" && url != r.IssuerURL() { + r.log.Info("workload issuer URL updated", "issuer", url) + r.setIssuerURL(url) + } +} diff --git a/components/runner/remote_issuer_test.go b/components/runner/remote_issuer_test.go new file mode 100644 index 000000000..52a385ec8 --- /dev/null +++ b/components/runner/remote_issuer_test.go @@ -0,0 +1,80 @@ +package runner + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "miren.dev/runtime/api/runner/runner_v1alpha" + "miren.dev/runtime/pkg/caauth" + "miren.dev/runtime/pkg/entity/testutils" + "miren.dev/runtime/pkg/rpc" + "miren.dev/runtime/pkg/workloadidentity" + runnersrv "miren.dev/runtime/servers/runner" +) + +// newTestRegistrationClient builds a RunnerRegistration client backed by a real +// in-process RegistrationServer, optionally with a workload identity issuer. +func newTestRegistrationClient(t *testing.T, issuer *workloadidentity.Issuer) *runner_v1alpha.RunnerRegistrationClient { + t.Helper() + + es, cleanup := testutils.NewInMemEntityServer(t) + t.Cleanup(cleanup) + + ca, err := caauth.New(caauth.Options{CommonName: "test-ca", Organization: "test"}) + require.NoError(t, err) + + srv := runnersrv.NewRegistrationServer(runnersrv.RegistrationServerConfig{ + Log: testutils.TestLogger(t), + Authority: ca, + EAC: es.EAC, + WorkloadIssuer: issuer, + }) + + local := rpc.LocalClient(runner_v1alpha.AdaptRunnerRegistration(srv)) + return runner_v1alpha.NewRunnerRegistrationClient(local) +} + +func TestRemoteIssuerRefreshesURL(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + issuer, err := workloadidentity.NewIssuer(workloadidentity.IssuerConfig{ + DataPath: t.TempDir(), + IssuerURL: "https://updated.example", + OrganizationID: "org-test", + ClusterID: "cluster-test", + }) + require.NoError(t, err) + + client := newTestRegistrationClient(t, issuer) + + ri := newRemoteIssuer(ctx, testutils.TestLogger(t), client, "https://stale.example") + require.Equal(t, "https://stale.example", ri.IssuerURL()) + + ri.refreshIssuerURL() + require.Equal(t, "https://updated.example", ri.IssuerURL()) +} + +func TestRemoteIssuerSetEnabledTransitions(t *testing.T) { + // setEnabled reports true only when the state actually changes, so the + // refresh loop logs a transition once rather than on every interval. + ri := &remoteIssuer{enabled: true} + require.False(t, ri.setEnabled(true), "no change should report false") + require.True(t, ri.setEnabled(false), "enabled->disabled should report true") + require.False(t, ri.setEnabled(false), "repeated disabled should report false") + require.True(t, ri.setEnabled(true), "disabled->enabled should report true") +} + +func TestRemoteIssuerRefreshKeepsURLWhenDisabled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // No issuer configured on the coordinator -> WorkloadIssuerInfo reports + // disabled. The cached URL must be preserved rather than cleared. + client := newTestRegistrationClient(t, nil) + + ri := newRemoteIssuer(ctx, testutils.TestLogger(t), client, "https://stale.example") + ri.refreshIssuerURL() + require.Equal(t, "https://stale.example", ri.IssuerURL()) +} diff --git a/components/runner/runner.go b/components/runner/runner.go index b97eb87d7..62cba5463 100644 --- a/components/runner/runner.go +++ b/components/runner/runner.go @@ -22,6 +22,7 @@ import ( "miren.dev/runtime/api/ingress/ingress_v1alpha" "miren.dev/runtime/api/metric/metric_v1alpha" "miren.dev/runtime/api/network/network_v1alpha" + "miren.dev/runtime/api/runner/runner_v1alpha" "miren.dev/runtime/api/storage/storage_v1alpha" "miren.dev/runtime/clientconfig" "miren.dev/runtime/components/coordinate" @@ -109,12 +110,19 @@ type RunnerDeps struct { EtcdTLSKeyFile string // Client private key file path EtcdTLSCAFile string // CA certificate file path - // WorkloadIssuer signs workload identity tokens for sandbox containers - WorkloadIssuer *workloadidentity.Issuer + // WorkloadIssuer mints workload identity tokens for sandbox containers. On + // the coordinator this is the concrete *workloadidentity.Issuer; on a + // distributed runner it is a remote issuer that proxies minting to the + // coordinator over RPC. + WorkloadIssuer workloadidentity.TokenIssuer } const ( DefaulWorkers = 3 + + // Bounded retry for the coordinator's workload-issuer-info query at startup. + issuerInfoMaxAttempts = 3 + issuerInfoRetryDelay = 2 * time.Second ) type shutdownCloser struct{ s interface{ Shutdown() } } @@ -431,6 +439,13 @@ func (r *Runner) Start(ctx context.Context, eg ...*errgroup.Group) error { ec := entityserver.NewClient(r.Log, eas) + // Distributed runners mint workload identity tokens via the coordinator, + // since they do not hold the cluster signing key. A failure here degrades + // to no sandbox tokens rather than blocking runner startup. + if err := r.setupRemoteWorkloadIssuer(ctx, rs); err != nil { + r.Log.Warn("failed to set up workload identity issuer", "error", err) + } + cm, err := r.SetupControllers(ctx, eas, rs.Server()) if err != nil { return err @@ -460,6 +475,55 @@ func (r *Runner) Start(ctx context.Context, eg ...*errgroup.Group) error { return nil } +// setupRemoteWorkloadIssuer wires a remote workload identity issuer for +// distributed runners. Runners do not hold the cluster signing key, so they +// mint tokens by calling the coordinator's RunnerRegistration service. When the +// coordinator reports no issuer is configured, token issuance stays disabled +// (deps.WorkloadIssuer remains nil). The coordinator's embedded runner +// (r.Config == nil) keeps the concrete issuer it was constructed with. +func (r *Runner) setupRemoteWorkloadIssuer(ctx context.Context, rs *rpc.State) error { + if r.Config == nil || r.deps.WorkloadIssuer != nil { + return nil + } + + client, err := rs.Client(string(rpc.ServiceRunner)) + if err != nil { + return fmt.Errorf("connecting to coordinator runner service: %w", err) + } + + regClient := runner_v1alpha.NewRunnerRegistrationClient(client) + + // Retry transient failures: the entities connection was just established, so + // a failure here is usually a brief blip. Giving up immediately would leave + // the runner with no token issuance until it is restarted. + var info *runner_v1alpha.RunnerRegistrationClientWorkloadIssuerInfoResults + for attempt := 1; ; attempt++ { + info, err = regClient.WorkloadIssuerInfo(ctx) + if err == nil { + break + } + if attempt >= issuerInfoMaxAttempts { + return fmt.Errorf("querying workload issuer info after %d attempts: %w", attempt, err) + } + r.Log.Warn("workload issuer info query failed; retrying", + "attempt", attempt, "max", issuerInfoMaxAttempts, "error", err) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(issuerInfoRetryDelay): + } + } + + if !info.Enabled() { + r.Log.Info("coordinator has no workload identity issuer; sandbox tokens disabled") + return nil + } + + r.deps.WorkloadIssuer = newRemoteIssuer(ctx, r.Log, regClient, info.IssuerUrl()) + r.Log.Info("workload identity issuer enabled via coordinator", "issuer", info.IssuerUrl()) + return nil +} + // initializeNetwork sets up the Flannel network for distributed runners. // This is only called when EtcdEndpoints are configured (distributed runner mode). func (r *Runner) initializeNetwork(ctx context.Context, eg ...*errgroup.Group) error { diff --git a/controllers/sandbox/sandbox.go b/controllers/sandbox/sandbox.go index 2d520dfbd..7ae491a5b 100644 --- a/controllers/sandbox/sandbox.go +++ b/controllers/sandbox/sandbox.go @@ -83,7 +83,7 @@ type SandboxControllerDeps struct { StatusMon *observability.StatusMonitor Resolver netresolve.Resolver Metrics *Metrics - WorkloadIssuer *workloadidentity.Issuer + WorkloadIssuer workloadidentity.TokenIssuer } type SandboxController struct { @@ -110,7 +110,7 @@ type SandboxController struct { Resolver netresolve.Resolver Metrics *Metrics - WorkloadIssuer *workloadidentity.Issuer + WorkloadIssuer workloadidentity.TokenIssuer tokenRefresher *tokenRefresher tokenSecrets *tokenSecretRegistry diff --git a/controllers/sandbox/token_server_test.go b/controllers/sandbox/token_server_test.go index 8c4fb8d96..7ae56286e 100644 --- a/controllers/sandbox/token_server_test.go +++ b/controllers/sandbox/token_server_test.go @@ -71,7 +71,7 @@ func TestTokenServer_DefaultToken(t *testing.T) { require.NotEmpty(t, resp.Value) token, err := jwt.ParseWithClaims(resp.Value, &workloadidentity.WorkloadClaims{}, func(tok *jwt.Token) (interface{}, error) { - return c.WorkloadIssuer.PublicKey(), nil + return c.WorkloadIssuer.(*workloadidentity.Issuer).PublicKey(), nil }) require.NoError(t, err) @@ -95,7 +95,7 @@ func TestTokenServer_CustomAudience(t *testing.T) { require.NoError(t, err) token, err := jwt.ParseWithClaims(resp.Value, &workloadidentity.WorkloadClaims{}, func(tok *jwt.Token) (interface{}, error) { - return c.WorkloadIssuer.PublicKey(), nil + return c.WorkloadIssuer.(*workloadidentity.Issuer).PublicKey(), nil }, jwt.WithAudience("sts.amazonaws.com")) require.NoError(t, err) @@ -116,7 +116,7 @@ func TestTokenServer_CustomTTL(t *testing.T) { require.NoError(t, err) token, err := jwt.ParseWithClaims(resp.Value, &workloadidentity.WorkloadClaims{}, func(tok *jwt.Token) (interface{}, error) { - return c.WorkloadIssuer.PublicKey(), nil + return c.WorkloadIssuer.(*workloadidentity.Issuer).PublicKey(), nil }) require.NoError(t, err) diff --git a/pkg/workloadidentity/issuer.go b/pkg/workloadidentity/issuer.go index 550c3fd46..4e1f9d3d1 100644 --- a/pkg/workloadidentity/issuer.go +++ b/pkg/workloadidentity/issuer.go @@ -60,6 +60,18 @@ type Issuer struct { previousKey *jose.JSONWebKey } +// TokenIssuer is the minting surface the sandbox controller depends on. The +// concrete *Issuer satisfies it directly (the coordinator holds the signing +// key). Distributed runners have no signing key, so they supply an +// implementation that proxies minting to the coordinator over RPC. +type TokenIssuer interface { + IssueToken(app, sandboxID string) (string, error) + IssueTokenWithOptions(app, sandboxID string, opts TokenOptions) (string, error) + IssuerURL() string +} + +var _ TokenIssuer = (*Issuer)(nil) + type WorkloadClaims struct { jwt.RegisteredClaims OrganizationID string `json:"organization_id,omitempty"` diff --git a/servers/runner/registration.go b/servers/runner/registration.go index 28d803050..a4f4ced0b 100644 --- a/servers/runner/registration.go +++ b/servers/runner/registration.go @@ -20,7 +20,9 @@ import ( "miren.dev/runtime/pkg/entity" "miren.dev/runtime/pkg/entity/types" "miren.dev/runtime/pkg/joincode" + "miren.dev/runtime/pkg/rpc" "miren.dev/runtime/pkg/rpc/standard" + "miren.dev/runtime/pkg/workloadidentity" ) const ( @@ -42,6 +44,11 @@ type RegistrationServerConfig struct { // Observability endpoints provided to runners at join time VictoriametricsAddress string VictorialogsAddress string + + // WorkloadIssuer mints workload identity tokens. Distributed runners, which + // do not hold the cluster signing key, request tokens from the coordinator + // through this server. May be nil when no issuer is configured. + WorkloadIssuer *workloadidentity.Issuer } type RegistrationServer struct { @@ -240,11 +247,7 @@ func (s *RegistrationServer) Join(ctx context.Context, req *runner_v1alpha.Runne // Now that invite is claimed, issue the certificate with proper SANs // so the coordinator can connect to the runner's API by IP. - runnerIDPrefix := runnerID - if len(runnerIDPrefix) > 8 { - runnerIDPrefix = runnerIDPrefix[:8] - } - certName := fmt.Sprintf("runner-%s", runnerIDPrefix) + certName := runnerCertName(runnerID) ips := []net.IP{ net.ParseIP("127.0.0.1"), @@ -604,6 +607,175 @@ func (s *RegistrationServer) RemoveRunner(ctx context.Context, req *runner_v1alp return nil } +// WorkloadIssuerInfo reports whether the coordinator has a workload identity +// issuer configured and, if so, its issuer URL. Distributed runners call this +// once at startup to decide whether to mint workload identity tokens via the +// coordinator. +func (s *RegistrationServer) WorkloadIssuerInfo(ctx context.Context, req *runner_v1alpha.RunnerRegistrationWorkloadIssuerInfo) error { + results := req.Results() + + if s.WorkloadIssuer == nil { + results.SetEnabled(false) + return nil + } + + results.SetEnabled(true) + results.SetIssuerUrl(s.WorkloadIssuer.IssuerURL()) + return nil +} + +// IssueWorkloadToken mints a workload identity token for a sandbox on behalf of +// a distributed runner, which does not hold the cluster signing key. The caller +// is an mTLS-authenticated runner. +func (s *RegistrationServer) IssueWorkloadToken(ctx context.Context, req *runner_v1alpha.RunnerRegistrationIssueWorkloadToken) error { + args := req.Args() + results := req.Results() + + if s.WorkloadIssuer == nil { + results.SetError("workload identity issuer is not configured") + return nil + } + + if !args.HasSandboxId() || args.SandboxId() == "" { + results.SetError("sandbox_id is required") + return nil + } + sandboxID := args.SandboxId() + + // A runner may only mint tokens for sandboxes scheduled to it. This prevents + // a compromised or buggy runner from obtaining identities for workloads + // running on other runners. + if err := s.authorizeSandboxOwnership(ctx, sandboxID); err != nil { + s.Log.Warn("workload token request denied", "sandbox", sandboxID, "error", err) + results.SetError("not authorized to issue a token for this sandbox") + return nil + } + + // Derive the app identity from the sandbox itself rather than trusting the + // caller. The app is part of the token subject that external verifiers + // federate on, so a runner must not be able to forge it. + appName := s.resolveSandboxApp(ctx, sandboxID) + + opts := workloadidentity.TokenOptions{} + if args.HasAudience() { + opts.Audience = args.Audience() + } + if args.HasTtlSeconds() && args.TtlSeconds() > 0 { + opts.TTL = time.Duration(args.TtlSeconds()) * time.Second + } + + token, err := s.WorkloadIssuer.IssueTokenWithOptions(appName, sandboxID, opts) + if err != nil { + s.Log.Error("failed to issue workload identity token", + "sandbox", sandboxID, "app", appName, "error", err) + results.SetError("failed to issue token") + return nil + } + + results.SetToken(token) + return nil +} + +// runnerCertName is the client-certificate CommonName issued to a runner during +// Join. It embeds the full runner ID so the coordinator can attribute an mTLS +// connection back to a specific runner and authorize per-runner actions. +// +// The full ID (rather than a prefix) is required for authorization: a runner +// chooses its own runner ID at Join, so a short prefix would let a malicious +// runner pick an ID whose prefix collides with a victim's cert name and mint +// tokens for the victim's sandboxes. A runner ID is a UUID, so "runner-" plus +// the ID stays within the certificate CommonName length limit. +func runnerCertName(runnerID string) string { + return fmt.Sprintf("runner-%s", runnerID) +} + +// authorizeSandboxOwnership verifies the authenticated caller is the runner the +// given sandbox is scheduled to. Callers authenticate with the mTLS client +// certificate issued during Join, whose CommonName maps back to the runner via +// runnerCertName. When authentication is disabled (anonymous), there is nothing +// to verify against and the check is skipped. +func (s *RegistrationServer) authorizeSandboxOwnership(ctx context.Context, sandboxID string) error { + identity := rpc.IdentityFromContext(ctx) + if identity == nil { + return fmt.Errorf("no caller identity") + } + if identity.Method == rpc.AuthMethodAnonymous { + // Authentication is disabled on this coordinator; ownership cannot be + // established, so don't block issuance. + return nil + } + if identity.Method != rpc.AuthMethodCert { + return fmt.Errorf("caller must authenticate with a runner certificate, got %q", identity.Method) + } + + sbResp, err := s.EAC.Get(ctx, sandboxID) + if err != nil { + return fmt.Errorf("looking up sandbox %s: %w", sandboxID, err) + } + + var sch compute_v1alpha.Schedule + sch.Decode(sbResp.Entity().Entity()) + if sch.Key.Node == "" { + return fmt.Errorf("sandbox %s is not scheduled to a node", sandboxID) + } + + nodeResp, err := s.EAC.Get(ctx, string(sch.Key.Node)) + if err != nil { + return fmt.Errorf("looking up node %s: %w", sch.Key.Node, err) + } + + var node compute_v1alpha.Node + node.Decode(nodeResp.Entity().Entity()) + if node.RunnerId == "" { + return fmt.Errorf("node %s has no runner id", sch.Key.Node) + } + + if expected := runnerCertName(node.RunnerId); identity.Subject != expected { + return fmt.Errorf("caller %q is not the owner of sandbox %s (owned by %q)", + identity.Subject, sandboxID, expected) + } + + return nil +} + +// resolveSandboxApp derives the application name for a sandbox from the entity +// store (sandbox -> app version -> app metadata name), mirroring the sandbox +// controller's local resolution. The app name is part of the workload identity +// token subject, so it must be derived server-side rather than trusted from the +// calling runner. Returns "" when the app cannot be resolved. +func (s *RegistrationServer) resolveSandboxApp(ctx context.Context, sandboxID string) string { + sbResp, err := s.EAC.Get(ctx, sandboxID) + if err != nil { + return "" + } + + var sb compute_v1alpha.Sandbox + sb.Decode(sbResp.Entity().Entity()) + if sb.Spec.Version == "" { + return "" + } + + versionResp, err := s.EAC.Get(ctx, sb.Spec.Version.String()) + if err != nil { + return "" + } + + var version core_v1alpha.AppVersion + version.Decode(versionResp.Entity().Entity()) + if version.App == "" { + return "" + } + + appResp, err := s.EAC.Get(ctx, version.App.String()) + if err != nil { + return "" + } + + var appMeta core_v1alpha.Metadata + appMeta.Decode(appResp.Entity().Entity()) + return appMeta.Name +} + // findNodeByQuery looks up a node entity by name, runner ID, entity ID, or short ID prefix. // Exact matches (name, runner ID, entity ID) are returned immediately. Prefix // matches are collected and only returned when unambiguous. diff --git a/servers/runner/registration_authz_test.go b/servers/runner/registration_authz_test.go new file mode 100644 index 000000000..3b45d917f --- /dev/null +++ b/servers/runner/registration_authz_test.go @@ -0,0 +1,185 @@ +package runner + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "miren.dev/runtime/api/compute/compute_v1alpha" + "miren.dev/runtime/api/core/core_v1alpha" + "miren.dev/runtime/pkg/caauth" + "miren.dev/runtime/pkg/entity" + "miren.dev/runtime/pkg/entity/testutils" + "miren.dev/runtime/pkg/rpc" +) + +const ( + authzRunnerID = "abcd1234-5678-90ab-cdef-1234567890ab" + authzSandboxID = "sandbox/myapp-web-aaa111" +) + +// newOwnershipTestServer builds a RegistrationServer backed by an in-memory +// entity store containing a node owned by authzRunnerID and a sandbox scheduled +// to it. +func newOwnershipTestServer(t *testing.T) (*RegistrationServer, func()) { + t.Helper() + + es, cleanup := testutils.NewInMemEntityServer(t) + + ca, err := caauth.New(caauth.Options{CommonName: "test-ca", Organization: "test"}) + require.NoError(t, err) + + srv := NewRegistrationServer(RegistrationServerConfig{ + Log: testutils.TestLogger(t), + Authority: ca, + EAC: es.EAC, + }) + + ctx := context.Background() + nodeID := entity.Id("node/" + authzRunnerID) + + _, err = es.EAC.Create(ctx, entity.New( + entity.DBId, nodeID, + (&compute_v1alpha.Node{RunnerId: authzRunnerID}).Encode, + ).Attrs()) + require.NoError(t, err) + + schedule := compute_v1alpha.Schedule{ + Key: compute_v1alpha.Key{Kind: compute_v1alpha.KindSandbox, Node: nodeID}, + } + _, err = es.EAC.Create(ctx, entity.New( + entity.DBId, entity.Id(authzSandboxID), + (&compute_v1alpha.Sandbox{}).Encode, + schedule.Encode, + ).Attrs()) + require.NoError(t, err) + + return srv, cleanup +} + +func ctxWithCert(subject string) context.Context { + return rpc.ContextWithIdentity(context.Background(), &rpc.Identity{ + Subject: subject, + Method: rpc.AuthMethodCert, + }) +} + +func TestAuthorizeSandboxOwnership_Owner(t *testing.T) { + srv, cleanup := newOwnershipTestServer(t) + defer cleanup() + + ctx := ctxWithCert(runnerCertName(authzRunnerID)) + require.NoError(t, srv.authorizeSandboxOwnership(ctx, authzSandboxID)) +} + +func TestAuthorizeSandboxOwnership_OtherRunnerDenied(t *testing.T) { + srv, cleanup := newOwnershipTestServer(t) + defer cleanup() + + ctx := ctxWithCert(runnerCertName("99999999-0000-0000-0000-000000000000")) + require.Error(t, srv.authorizeSandboxOwnership(ctx, authzSandboxID)) +} + +func TestAuthorizeSandboxOwnership_AnonymousSkipped(t *testing.T) { + srv, cleanup := newOwnershipTestServer(t) + defer cleanup() + + ctx := rpc.ContextWithIdentity(context.Background(), &rpc.Identity{ + Subject: "anonymous", + Method: rpc.AuthMethodAnonymous, + }) + require.NoError(t, srv.authorizeSandboxOwnership(ctx, authzSandboxID)) +} + +func TestAuthorizeSandboxOwnership_NonCertDenied(t *testing.T) { + srv, cleanup := newOwnershipTestServer(t) + defer cleanup() + + ctx := rpc.ContextWithIdentity(context.Background(), &rpc.Identity{ + Subject: "user@example.com", + Method: rpc.AuthMethodJWT, + }) + require.Error(t, srv.authorizeSandboxOwnership(ctx, authzSandboxID)) +} + +func TestAuthorizeSandboxOwnership_NoIdentityDenied(t *testing.T) { + srv, cleanup := newOwnershipTestServer(t) + defer cleanup() + + require.Error(t, srv.authorizeSandboxOwnership(context.Background(), authzSandboxID)) +} + +func TestAuthorizeSandboxOwnership_UnscheduledDenied(t *testing.T) { + srv, cleanup := newOwnershipTestServer(t) + defer cleanup() + + ctx := ctxWithCert(runnerCertName(authzRunnerID)) + require.Error(t, srv.authorizeSandboxOwnership(ctx, "sandbox/does-not-exist")) +} + +func TestRunnerCertNameUsesFullID(t *testing.T) { + // Two runner IDs sharing an 8-char prefix must map to distinct cert names, + // otherwise a runner could choose a colliding ID and impersonate another. + a := "abcd1234-1111-1111-1111-111111111111" + b := "abcd1234-2222-2222-2222-222222222222" + require.NotEqual(t, runnerCertName(a), runnerCertName(b)) + require.Equal(t, "runner-"+a, runnerCertName(a)) +} + +func TestResolveSandboxApp(t *testing.T) { + ctx := context.Background() + es, cleanup := testutils.NewInMemEntityServer(t) + defer cleanup() + + ca, err := caauth.New(caauth.Options{CommonName: "test-ca", Organization: "test"}) + require.NoError(t, err) + + srv := NewRegistrationServer(RegistrationServerConfig{ + Log: testutils.TestLogger(t), + Authority: ca, + EAC: es.EAC, + }) + + _, err = es.EAC.Create(ctx, entity.New( + entity.DBId, entity.Id("app/myapp"), + (&core_v1alpha.Metadata{Name: "myapp"}).Encode, + ).Attrs()) + require.NoError(t, err) + + _, err = es.EAC.Create(ctx, entity.New( + entity.DBId, entity.Id("version/v1"), + (&core_v1alpha.AppVersion{App: "app/myapp"}).Encode, + ).Attrs()) + require.NoError(t, err) + + _, err = es.EAC.Create(ctx, entity.New( + entity.DBId, entity.Id("sandbox/s1"), + (&compute_v1alpha.Sandbox{Spec: compute_v1alpha.SandboxSpec{Version: "version/v1"}}).Encode, + ).Attrs()) + require.NoError(t, err) + + require.Equal(t, "myapp", srv.resolveSandboxApp(ctx, "sandbox/s1")) +} + +func TestResolveSandboxApp_NoVersionReturnsEmpty(t *testing.T) { + ctx := context.Background() + es, cleanup := testutils.NewInMemEntityServer(t) + defer cleanup() + + ca, err := caauth.New(caauth.Options{CommonName: "test-ca", Organization: "test"}) + require.NoError(t, err) + + srv := NewRegistrationServer(RegistrationServerConfig{ + Log: testutils.TestLogger(t), + Authority: ca, + EAC: es.EAC, + }) + + _, err = es.EAC.Create(ctx, entity.New( + entity.DBId, entity.Id("sandbox/no-version"), + (&compute_v1alpha.Sandbox{}).Encode, + ).Attrs()) + require.NoError(t, err) + + require.Equal(t, "", srv.resolveSandboxApp(ctx, "sandbox/no-version")) +} From 0a4a853dd76f538c651cbdb8a2a2c96ddba9449d Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Sat, 6 Jun 2026 11:18:39 -0700 Subject: [PATCH 2/3] Address PR review: bound startup query, fix test, simplify remote issuer - Bound each WorkloadIssuerInfo startup attempt with a per-attempt timeout (queryWorkloadIssuerInfo) so a hung coordinator RPC can't stall runner startup and the retry budget can expire. - Fix TestAuthorizeSandboxOwnership_UnscheduledDenied to insert a sandbox that exists but has no scheduling node, exercising the sch.Key.Node == "" branch; the missing-entity case is now its own test. - Simplify remoteIssuer: the coordinator's issuer URL is fixed for a process lifetime, so drop the background refresh loop and enabled-transition flag and cache the URL captured at startup. --- components/runner/remote_issuer.go | 105 +++------------------- components/runner/remote_issuer_test.go | 73 +-------------- components/runner/runner.go | 13 ++- servers/runner/registration_authz_test.go | 29 +++++- 4 files changed, 53 insertions(+), 167 deletions(-) diff --git a/components/runner/remote_issuer.go b/components/runner/remote_issuer.go index 125fac7a5..e65d485e4 100644 --- a/components/runner/remote_issuer.go +++ b/components/runner/remote_issuer.go @@ -3,82 +3,45 @@ package runner import ( "context" "fmt" - "log/slog" - "sync" "time" "miren.dev/runtime/api/runner/runner_v1alpha" "miren.dev/runtime/pkg/workloadidentity" ) -const ( - // remoteTokenTimeout bounds a single token-minting RPC to the coordinator. - remoteTokenTimeout = 30 * time.Second - - // issuerURLRefreshInterval is how often the cached issuer URL is re-synced - // with the coordinator. - issuerURLRefreshInterval = 5 * time.Minute -) +// remoteTokenTimeout bounds a single token-minting RPC to the coordinator. +const remoteTokenTimeout = 30 * time.Second // remoteIssuer satisfies workloadidentity.TokenIssuer by proxying token minting // to the coordinator over RPC. Distributed runners do not hold the cluster // signing key, so they cannot mint tokens locally and instead ask the // coordinator, which holds the key. // -// The issuer URL is cached but kept in sync by a background loop: it can change -// while the runner is running (e.g. the cluster gains a DNS hostname during -// re-registration), and the RPC client transparently reconnects after a -// coordinator restart, so periodic polling re-syncs without a runner restart. +// The issuer URL is captured once at startup. It is the cluster's OIDC issuer +// anchor, derived from the coordinator's configuration at boot, so it is stable +// for a coordinator's process lifetime; a change (e.g. the cluster gaining a DNS +// hostname via re-registration) restarts the coordinator and is picked up when +// the runner reconnects/restarts. type remoteIssuer struct { - ctx context.Context - client *runner_v1alpha.RunnerRegistrationClient - log *slog.Logger - - mu sync.RWMutex + ctx context.Context + client *runner_v1alpha.RunnerRegistrationClient issuerURL string - enabled bool } var _ workloadidentity.TokenIssuer = (*remoteIssuer)(nil) -func newRemoteIssuer(ctx context.Context, log *slog.Logger, client *runner_v1alpha.RunnerRegistrationClient, issuerURL string) *remoteIssuer { - r := &remoteIssuer{ +func newRemoteIssuer(ctx context.Context, client *runner_v1alpha.RunnerRegistrationClient, issuerURL string) *remoteIssuer { + return &remoteIssuer{ ctx: ctx, client: client, - log: log, issuerURL: issuerURL, - // A remoteIssuer is only constructed once the coordinator has reported - // an enabled issuer, so start in the enabled state. - enabled: true, } - go r.refreshLoop() - return r } func (r *remoteIssuer) IssuerURL() string { - r.mu.RLock() - defer r.mu.RUnlock() return r.issuerURL } -func (r *remoteIssuer) setIssuerURL(url string) { - r.mu.Lock() - r.issuerURL = url - r.mu.Unlock() -} - -// setEnabled records the latest enabled state and reports whether it changed, -// so transitions can be logged once instead of on every refresh. -func (r *remoteIssuer) setEnabled(enabled bool) bool { - r.mu.Lock() - defer r.mu.Unlock() - if r.enabled == enabled { - return false - } - r.enabled = enabled - return true -} - func (r *remoteIssuer) IssueToken(app, sandboxID string) (string, error) { return r.IssueTokenWithOptions(app, sandboxID, workloadidentity.TokenOptions{}) } @@ -104,49 +67,3 @@ func (r *remoteIssuer) IssueTokenWithOptions(_, sandboxID string, opts workloadi } return res.Token(), nil } - -// refreshLoop keeps the cached issuer URL in sync with the coordinator until the -// runner's context is cancelled. -func (r *remoteIssuer) refreshLoop() { - ticker := time.NewTicker(issuerURLRefreshInterval) - defer ticker.Stop() - - for { - select { - case <-r.ctx.Done(): - return - case <-ticker.C: - r.refreshIssuerURL() - } - } -} - -func (r *remoteIssuer) refreshIssuerURL() { - ctx, cancel := context.WithTimeout(r.ctx, remoteTokenTimeout) - defer cancel() - - info, err := r.client.WorkloadIssuerInfo(ctx) - if err != nil { - r.log.Warn("failed to refresh workload issuer info", "error", err) - return - } - - enabled := info.Enabled() - if r.setEnabled(enabled) { - // Log only on transitions to avoid spamming every refresh interval. - if enabled { - r.log.Info("coordinator re-enabled workload identity issuer") - } else { - r.log.Warn("coordinator disabled workload identity issuer; sandbox token issuance will fail until re-enabled") - } - } - if !enabled { - // Leave the cached URL as-is so already-injected sandbox env values - // stay coherent. - return - } - if url := info.IssuerUrl(); url != "" && url != r.IssuerURL() { - r.log.Info("workload issuer URL updated", "issuer", url) - r.setIssuerURL(url) - } -} diff --git a/components/runner/remote_issuer_test.go b/components/runner/remote_issuer_test.go index 52a385ec8..facbb0759 100644 --- a/components/runner/remote_issuer_test.go +++ b/components/runner/remote_issuer_test.go @@ -5,76 +5,9 @@ import ( "testing" "github.com/stretchr/testify/require" - "miren.dev/runtime/api/runner/runner_v1alpha" - "miren.dev/runtime/pkg/caauth" - "miren.dev/runtime/pkg/entity/testutils" - "miren.dev/runtime/pkg/rpc" - "miren.dev/runtime/pkg/workloadidentity" - runnersrv "miren.dev/runtime/servers/runner" ) -// newTestRegistrationClient builds a RunnerRegistration client backed by a real -// in-process RegistrationServer, optionally with a workload identity issuer. -func newTestRegistrationClient(t *testing.T, issuer *workloadidentity.Issuer) *runner_v1alpha.RunnerRegistrationClient { - t.Helper() - - es, cleanup := testutils.NewInMemEntityServer(t) - t.Cleanup(cleanup) - - ca, err := caauth.New(caauth.Options{CommonName: "test-ca", Organization: "test"}) - require.NoError(t, err) - - srv := runnersrv.NewRegistrationServer(runnersrv.RegistrationServerConfig{ - Log: testutils.TestLogger(t), - Authority: ca, - EAC: es.EAC, - WorkloadIssuer: issuer, - }) - - local := rpc.LocalClient(runner_v1alpha.AdaptRunnerRegistration(srv)) - return runner_v1alpha.NewRunnerRegistrationClient(local) -} - -func TestRemoteIssuerRefreshesURL(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - issuer, err := workloadidentity.NewIssuer(workloadidentity.IssuerConfig{ - DataPath: t.TempDir(), - IssuerURL: "https://updated.example", - OrganizationID: "org-test", - ClusterID: "cluster-test", - }) - require.NoError(t, err) - - client := newTestRegistrationClient(t, issuer) - - ri := newRemoteIssuer(ctx, testutils.TestLogger(t), client, "https://stale.example") - require.Equal(t, "https://stale.example", ri.IssuerURL()) - - ri.refreshIssuerURL() - require.Equal(t, "https://updated.example", ri.IssuerURL()) -} - -func TestRemoteIssuerSetEnabledTransitions(t *testing.T) { - // setEnabled reports true only when the state actually changes, so the - // refresh loop logs a transition once rather than on every interval. - ri := &remoteIssuer{enabled: true} - require.False(t, ri.setEnabled(true), "no change should report false") - require.True(t, ri.setEnabled(false), "enabled->disabled should report true") - require.False(t, ri.setEnabled(false), "repeated disabled should report false") - require.True(t, ri.setEnabled(true), "disabled->enabled should report true") -} - -func TestRemoteIssuerRefreshKeepsURLWhenDisabled(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // No issuer configured on the coordinator -> WorkloadIssuerInfo reports - // disabled. The cached URL must be preserved rather than cleared. - client := newTestRegistrationClient(t, nil) - - ri := newRemoteIssuer(ctx, testutils.TestLogger(t), client, "https://stale.example") - ri.refreshIssuerURL() - require.Equal(t, "https://stale.example", ri.IssuerURL()) +func TestRemoteIssuerCachesURL(t *testing.T) { + ri := newRemoteIssuer(context.Background(), nil, "https://issuer.example") + require.Equal(t, "https://issuer.example", ri.IssuerURL()) } diff --git a/components/runner/runner.go b/components/runner/runner.go index 62cba5463..ac0290f6b 100644 --- a/components/runner/runner.go +++ b/components/runner/runner.go @@ -498,7 +498,7 @@ func (r *Runner) setupRemoteWorkloadIssuer(ctx context.Context, rs *rpc.State) e // the runner with no token issuance until it is restarted. var info *runner_v1alpha.RunnerRegistrationClientWorkloadIssuerInfoResults for attempt := 1; ; attempt++ { - info, err = regClient.WorkloadIssuerInfo(ctx) + info, err = queryWorkloadIssuerInfo(ctx, regClient) if err == nil { break } @@ -519,11 +519,20 @@ func (r *Runner) setupRemoteWorkloadIssuer(ctx context.Context, rs *rpc.State) e return nil } - r.deps.WorkloadIssuer = newRemoteIssuer(ctx, r.Log, regClient, info.IssuerUrl()) + r.deps.WorkloadIssuer = newRemoteIssuer(ctx, regClient, info.IssuerUrl()) r.Log.Info("workload identity issuer enabled via coordinator", "issuer", info.IssuerUrl()) return nil } +// queryWorkloadIssuerInfo performs a single WorkloadIssuerInfo call bounded by a +// per-attempt timeout, so a hung coordinator RPC cannot stall runner startup +// indefinitely and the retry budget is allowed to expire. +func queryWorkloadIssuerInfo(ctx context.Context, regClient *runner_v1alpha.RunnerRegistrationClient) (*runner_v1alpha.RunnerRegistrationClientWorkloadIssuerInfoResults, error) { + ctx, cancel := context.WithTimeout(ctx, remoteTokenTimeout) + defer cancel() + return regClient.WorkloadIssuerInfo(ctx) +} + // initializeNetwork sets up the Flannel network for distributed runners. // This is only called when EtcdEndpoints are configured (distributed runner mode). func (r *Runner) initializeNetwork(ctx context.Context, eg ...*errgroup.Group) error { diff --git a/servers/runner/registration_authz_test.go b/servers/runner/registration_authz_test.go index 3b45d917f..af3710e20 100644 --- a/servers/runner/registration_authz_test.go +++ b/servers/runner/registration_authz_test.go @@ -109,7 +109,7 @@ func TestAuthorizeSandboxOwnership_NoIdentityDenied(t *testing.T) { require.Error(t, srv.authorizeSandboxOwnership(context.Background(), authzSandboxID)) } -func TestAuthorizeSandboxOwnership_UnscheduledDenied(t *testing.T) { +func TestAuthorizeSandboxOwnership_MissingSandboxDenied(t *testing.T) { srv, cleanup := newOwnershipTestServer(t) defer cleanup() @@ -117,6 +117,33 @@ func TestAuthorizeSandboxOwnership_UnscheduledDenied(t *testing.T) { require.Error(t, srv.authorizeSandboxOwnership(ctx, "sandbox/does-not-exist")) } +func TestAuthorizeSandboxOwnership_UnscheduledDenied(t *testing.T) { + ctx := context.Background() + es, cleanup := testutils.NewInMemEntityServer(t) + defer cleanup() + + ca, err := caauth.New(caauth.Options{CommonName: "test-ca", Organization: "test"}) + require.NoError(t, err) + + srv := NewRegistrationServer(RegistrationServerConfig{ + Log: testutils.TestLogger(t), + Authority: ca, + EAC: es.EAC, + }) + + // A sandbox that exists but has no scheduling node, exercising the + // sch.Key.Node == "" branch rather than the missing-entity path. + _, err = es.EAC.Create(ctx, entity.New( + entity.DBId, entity.Id("sandbox/unscheduled"), + (&compute_v1alpha.Sandbox{}).Encode, + ).Attrs()) + require.NoError(t, err) + + authCtx := ctxWithCert(runnerCertName(authzRunnerID)) + err = srv.authorizeSandboxOwnership(authCtx, "sandbox/unscheduled") + require.ErrorContains(t, err, "not scheduled to a node") +} + func TestRunnerCertNameUsesFullID(t *testing.T) { // Two runner IDs sharing an 8-char prefix must map to distinct cert names, // otherwise a runner could choose a colliding ID and impersonate another. From fc5c614f42c24cc24eab7f76d5b7b6c016a5394c Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Sat, 6 Jun 2026 11:32:50 -0700 Subject: [PATCH 3/3] Update frozen sandbox.go hash for TokenIssuer field migration The WorkloadIssuer field type change (concrete *Issuer -> TokenIssuer interface) altered sandbox.go's contents. The change is a pure type migration on the shared SandboxController; the saga controller wraps that same inner controller and inherits it, so there is no behavioral divergence to mirror. Update the recorded hash. --- controllers/sandbox/sandbox_frozen_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controllers/sandbox/sandbox_frozen_test.go b/controllers/sandbox/sandbox_frozen_test.go index ef9782c85..6cf98595a 100644 --- a/controllers/sandbox/sandbox_frozen_test.go +++ b/controllers/sandbox/sandbox_frozen_test.go @@ -24,7 +24,7 @@ import ( // sha256sum controllers/sandbox/sandbox.go controllers/sandbox/volume.go controllers/sandbox/firewall.go func TestSandboxControllerFrozen(t *testing.T) { frozen := map[string]string{ - "sandbox.go": "65e398f232a887de37250abd8da5f27bf66e9ace6625331145d4e788a08f1707", + "sandbox.go": "bc84259d85ac797bbf7d031cef36b32294183ac082ea4a0fd76b3da730af82bf", "volume.go": "292dbc050cd94901ab704a23605f5537c944787c9e06077a3fc004f40e9c0b6c", "firewall.go": "648cb5d91091d5eb7400152b19695a8045585feae59c5dd36c12d663a27bb91f", }