diff --git a/api/runner/rpc.yml b/api/runner/rpc.yml index 4e667c920..5bea3cb20 100644 --- a/api/runner/rpc.yml +++ b/api/runner/rpc.yml @@ -159,6 +159,47 @@ interfaces: type: string doc: Error message if removal failed + - name: WorkloadIssuerInfo + index: 6 + doc: | + Report whether the coordinator has a workload identity issuer + configured, and its issuer URL. Distributed runners call this once at + startup to decide whether to mint workload identity tokens via the + coordinator. + results: + - name: enabled + type: bool + doc: Whether a workload identity issuer is configured on the coordinator + - name: issuer_url + type: string + doc: The issuer URL (iss claim anchor) when enabled + + - name: IssueWorkloadToken + index: 7 + doc: | + Mint a workload identity token for a sandbox. Distributed runners, + which do not hold the cluster signing key, call this to obtain tokens + signed by the coordinator. The application identity is derived from the + sandbox by the coordinator, not supplied by the caller. + parameters: + - name: sandbox_id + type: string + doc: Sandbox ID the token is issued for + - name: audience + type: list + element: string + doc: Optional token audiences (defaults to "miren" when empty) + - name: ttl_seconds + type: int64 + doc: Optional token TTL in seconds (0 uses the issuer default) + results: + - name: token + type: string + doc: The signed workload identity token (JWT) + - name: error + type: string + doc: Error message if issuance failed + types: - type: InviteInfo doc: Information about a runner invite diff --git a/api/runner/runner_v1alpha/rpc.gen.go b/api/runner/runner_v1alpha/rpc.gen.go index 4e7c875c0..6d2198d96 100644 --- a/api/runner/runner_v1alpha/rpc.gen.go +++ b/api/runner/runner_v1alpha/rpc.gen.go @@ -927,6 +927,157 @@ func (v *RunnerRegistrationRemoveRunnerResults) UnmarshalJSON(data []byte) error return json.Unmarshal(data, &v.data) } +type runnerRegistrationWorkloadIssuerInfoArgsData struct{} + +type RunnerRegistrationWorkloadIssuerInfoArgs struct { + call rpc.Call + data runnerRegistrationWorkloadIssuerInfoArgsData +} + +func (v *RunnerRegistrationWorkloadIssuerInfoArgs) MarshalCBOR() ([]byte, error) { + return cbor.Marshal(v.data) +} + +func (v *RunnerRegistrationWorkloadIssuerInfoArgs) UnmarshalCBOR(data []byte) error { + return cbor.Unmarshal(data, &v.data) +} + +func (v *RunnerRegistrationWorkloadIssuerInfoArgs) MarshalJSON() ([]byte, error) { + return json.Marshal(v.data) +} + +func (v *RunnerRegistrationWorkloadIssuerInfoArgs) UnmarshalJSON(data []byte) error { + return json.Unmarshal(data, &v.data) +} + +type runnerRegistrationWorkloadIssuerInfoResultsData struct { + Enabled *bool `cbor:"0,keyasint,omitempty" json:"enabled,omitempty"` + IssuerUrl *string `cbor:"1,keyasint,omitempty" json:"issuer_url,omitempty"` +} + +type RunnerRegistrationWorkloadIssuerInfoResults struct { + call rpc.Call + data runnerRegistrationWorkloadIssuerInfoResultsData +} + +func (v *RunnerRegistrationWorkloadIssuerInfoResults) SetEnabled(enabled bool) { + v.data.Enabled = &enabled +} + +func (v *RunnerRegistrationWorkloadIssuerInfoResults) SetIssuerUrl(issuer_url string) { + v.data.IssuerUrl = &issuer_url +} + +func (v *RunnerRegistrationWorkloadIssuerInfoResults) MarshalCBOR() ([]byte, error) { + return cbor.Marshal(v.data) +} + +func (v *RunnerRegistrationWorkloadIssuerInfoResults) UnmarshalCBOR(data []byte) error { + return cbor.Unmarshal(data, &v.data) +} + +func (v *RunnerRegistrationWorkloadIssuerInfoResults) MarshalJSON() ([]byte, error) { + return json.Marshal(v.data) +} + +func (v *RunnerRegistrationWorkloadIssuerInfoResults) UnmarshalJSON(data []byte) error { + return json.Unmarshal(data, &v.data) +} + +type runnerRegistrationIssueWorkloadTokenArgsData struct { + SandboxId *string `cbor:"0,keyasint,omitempty" json:"sandbox_id,omitempty"` + Audience *[]string `cbor:"1,keyasint,omitempty" json:"audience,omitempty"` + TtlSeconds *int64 `cbor:"2,keyasint,omitempty" json:"ttl_seconds,omitempty"` +} + +type RunnerRegistrationIssueWorkloadTokenArgs struct { + call rpc.Call + data runnerRegistrationIssueWorkloadTokenArgsData +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) HasSandboxId() bool { + return v.data.SandboxId != nil +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) SandboxId() string { + if v.data.SandboxId == nil { + return "" + } + return *v.data.SandboxId +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) HasAudience() bool { + return v.data.Audience != nil +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) Audience() []string { + if v.data.Audience == nil { + return nil + } + return *v.data.Audience +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) HasTtlSeconds() bool { + return v.data.TtlSeconds != nil +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) TtlSeconds() int64 { + if v.data.TtlSeconds == nil { + return 0 + } + return *v.data.TtlSeconds +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) MarshalCBOR() ([]byte, error) { + return cbor.Marshal(v.data) +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) UnmarshalCBOR(data []byte) error { + return cbor.Unmarshal(data, &v.data) +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) MarshalJSON() ([]byte, error) { + return json.Marshal(v.data) +} + +func (v *RunnerRegistrationIssueWorkloadTokenArgs) UnmarshalJSON(data []byte) error { + return json.Unmarshal(data, &v.data) +} + +type runnerRegistrationIssueWorkloadTokenResultsData struct { + Token *string `cbor:"0,keyasint,omitempty" json:"token,omitempty"` + Error *string `cbor:"1,keyasint,omitempty" json:"error,omitempty"` +} + +type RunnerRegistrationIssueWorkloadTokenResults struct { + call rpc.Call + data runnerRegistrationIssueWorkloadTokenResultsData +} + +func (v *RunnerRegistrationIssueWorkloadTokenResults) SetToken(token string) { + v.data.Token = &token +} + +func (v *RunnerRegistrationIssueWorkloadTokenResults) SetError(error string) { + v.data.Error = &error +} + +func (v *RunnerRegistrationIssueWorkloadTokenResults) MarshalCBOR() ([]byte, error) { + return cbor.Marshal(v.data) +} + +func (v *RunnerRegistrationIssueWorkloadTokenResults) UnmarshalCBOR(data []byte) error { + return cbor.Unmarshal(data, &v.data) +} + +func (v *RunnerRegistrationIssueWorkloadTokenResults) MarshalJSON() ([]byte, error) { + return json.Marshal(v.data) +} + +func (v *RunnerRegistrationIssueWorkloadTokenResults) UnmarshalJSON(data []byte) error { + return json.Unmarshal(data, &v.data) +} + type RunnerRegistrationCreateInvite struct { rpc.Call args RunnerRegistrationCreateInviteArgs @@ -1083,6 +1234,58 @@ func (t *RunnerRegistrationRemoveRunner) Results() *RunnerRegistrationRemoveRunn return results } +type RunnerRegistrationWorkloadIssuerInfo struct { + rpc.Call + args RunnerRegistrationWorkloadIssuerInfoArgs + results RunnerRegistrationWorkloadIssuerInfoResults +} + +func (t *RunnerRegistrationWorkloadIssuerInfo) Args() *RunnerRegistrationWorkloadIssuerInfoArgs { + args := &t.args + if args.call != nil { + return args + } + args.call = t.Call + t.Call.Args(args) + return args +} + +func (t *RunnerRegistrationWorkloadIssuerInfo) Results() *RunnerRegistrationWorkloadIssuerInfoResults { + results := &t.results + if results.call != nil { + return results + } + results.call = t.Call + t.Call.Results(results) + return results +} + +type RunnerRegistrationIssueWorkloadToken struct { + rpc.Call + args RunnerRegistrationIssueWorkloadTokenArgs + results RunnerRegistrationIssueWorkloadTokenResults +} + +func (t *RunnerRegistrationIssueWorkloadToken) Args() *RunnerRegistrationIssueWorkloadTokenArgs { + args := &t.args + if args.call != nil { + return args + } + args.call = t.Call + t.Call.Args(args) + return args +} + +func (t *RunnerRegistrationIssueWorkloadToken) Results() *RunnerRegistrationIssueWorkloadTokenResults { + results := &t.results + if results.call != nil { + return results + } + results.call = t.Call + t.Call.Results(results) + return results +} + type RunnerRegistration interface { CreateInvite(ctx context.Context, state *RunnerRegistrationCreateInvite) error Join(ctx context.Context, state *RunnerRegistrationJoin) error @@ -1090,6 +1293,8 @@ type RunnerRegistration interface { RevokeInvite(ctx context.Context, state *RunnerRegistrationRevokeInvite) error ListRunners(ctx context.Context, state *RunnerRegistrationListRunners) error RemoveRunner(ctx context.Context, state *RunnerRegistrationRemoveRunner) error + WorkloadIssuerInfo(ctx context.Context, state *RunnerRegistrationWorkloadIssuerInfo) error + IssueWorkloadToken(ctx context.Context, state *RunnerRegistrationIssueWorkloadToken) error } type reexportRunnerRegistration struct { @@ -1120,6 +1325,14 @@ func (reexportRunnerRegistration) RemoveRunner(ctx context.Context, state *Runne panic("not implemented") } +func (reexportRunnerRegistration) WorkloadIssuerInfo(ctx context.Context, state *RunnerRegistrationWorkloadIssuerInfo) error { + panic("not implemented") +} + +func (reexportRunnerRegistration) IssueWorkloadToken(ctx context.Context, state *RunnerRegistrationIssueWorkloadToken) error { + panic("not implemented") +} + func (t reexportRunnerRegistration) CapabilityClient() rpc.Client { return t.client } @@ -1180,6 +1393,24 @@ func AdaptRunnerRegistration(t RunnerRegistration) *rpc.Interface { return t.RemoveRunner(ctx, &RunnerRegistrationRemoveRunner{Call: call}) }, }, + { + Name: "WorkloadIssuerInfo", + InterfaceName: "RunnerRegistration", + Index: 6, + Public: false, + Handler: func(ctx context.Context, call rpc.Call) error { + return t.WorkloadIssuerInfo(ctx, &RunnerRegistrationWorkloadIssuerInfo{Call: call}) + }, + }, + { + Name: "IssueWorkloadToken", + InterfaceName: "RunnerRegistration", + Index: 7, + Public: false, + Handler: func(ctx context.Context, call rpc.Call) error { + return t.IssueWorkloadToken(ctx, &RunnerRegistrationIssueWorkloadToken{Call: call}) + }, + }, } return rpc.NewInterface(methods, t) @@ -1549,3 +1780,87 @@ func (v RunnerRegistrationClient) RemoveRunner(ctx context.Context, query string return &RunnerRegistrationClientRemoveRunnerResults{client: v.Client, data: ret}, nil } + +type RunnerRegistrationClientWorkloadIssuerInfoResults struct { + client rpc.Client + data runnerRegistrationWorkloadIssuerInfoResultsData +} + +func (v *RunnerRegistrationClientWorkloadIssuerInfoResults) HasEnabled() bool { + return v.data.Enabled != nil +} + +func (v *RunnerRegistrationClientWorkloadIssuerInfoResults) Enabled() bool { + if v.data.Enabled == nil { + return false + } + return *v.data.Enabled +} + +func (v *RunnerRegistrationClientWorkloadIssuerInfoResults) HasIssuerUrl() bool { + return v.data.IssuerUrl != nil +} + +func (v *RunnerRegistrationClientWorkloadIssuerInfoResults) IssuerUrl() string { + if v.data.IssuerUrl == nil { + return "" + } + return *v.data.IssuerUrl +} + +func (v RunnerRegistrationClient) WorkloadIssuerInfo(ctx context.Context) (*RunnerRegistrationClientWorkloadIssuerInfoResults, error) { + args := RunnerRegistrationWorkloadIssuerInfoArgs{} + + var ret runnerRegistrationWorkloadIssuerInfoResultsData + + err := v.Call(ctx, "WorkloadIssuerInfo", &args, &ret) + if err != nil { + return nil, err + } + + return &RunnerRegistrationClientWorkloadIssuerInfoResults{client: v.Client, data: ret}, nil +} + +type RunnerRegistrationClientIssueWorkloadTokenResults struct { + client rpc.Client + data runnerRegistrationIssueWorkloadTokenResultsData +} + +func (v *RunnerRegistrationClientIssueWorkloadTokenResults) HasToken() bool { + return v.data.Token != nil +} + +func (v *RunnerRegistrationClientIssueWorkloadTokenResults) Token() string { + if v.data.Token == nil { + return "" + } + return *v.data.Token +} + +func (v *RunnerRegistrationClientIssueWorkloadTokenResults) HasError() bool { + return v.data.Error != nil +} + +func (v *RunnerRegistrationClientIssueWorkloadTokenResults) Error() string { + if v.data.Error == nil { + return "" + } + return *v.data.Error +} + +func (v RunnerRegistrationClient) IssueWorkloadToken(ctx context.Context, sandbox_id string, audience []string, ttl_seconds int64) (*RunnerRegistrationClientIssueWorkloadTokenResults, error) { + args := RunnerRegistrationIssueWorkloadTokenArgs{} + args.data.SandboxId = &sandbox_id + x := slices.Clone(audience) + args.data.Audience = &x + args.data.TtlSeconds = &ttl_seconds + + var ret runnerRegistrationIssueWorkloadTokenResultsData + + err := v.Call(ctx, "IssueWorkloadToken", &args, &ret) + if err != nil { + return nil, err + } + + return &RunnerRegistrationClientIssueWorkloadTokenResults{client: v.Client, data: ret}, nil +} diff --git a/cli/commands/server.go b/cli/commands/server.go index 6b745f660..b1c2f78fc 100644 --- a/cli/commands/server.go +++ b/cli/commands/server.go @@ -827,7 +827,13 @@ func Server(ctx *Context, opts serverconfig.CLIFlags) error { Resolver: res, SandboxMetrics: ctx.ServerState.SandboxMetrics, IsCoordinator: true, - WorkloadIssuer: workloadIssuer, + } + + // Assign only when non-nil: storing a typed-nil *Issuer into the + // TokenIssuer interface field would make it compare != nil, defeating the + // nil guards in the sandbox controller and panicking on first use. + if workloadIssuer != nil { + deps.WorkloadIssuer = workloadIssuer } rc.DataPath = cfg.Server.GetDataPath() diff --git a/components/coordinate/coordinate.go b/components/coordinate/coordinate.go index 0ddbd1377..67003887a 100644 --- a/components/coordinate/coordinate.go +++ b/components/coordinate/coordinate.go @@ -1163,6 +1163,7 @@ func (c *Coordinator) Start(ctx context.Context) error { NetworkBackend: c.NetworkBackend, VictoriametricsAddress: c.VictoriametricsAddress, VictorialogsAddress: c.VictorialogsAddress, + WorkloadIssuer: c.WorkloadIssuer, }) server.ExposeValue(rpc.ServiceRunner, runner_v1alpha.AdaptRunnerRegistration(runnerReg)) diff --git a/components/runner/remote_issuer.go b/components/runner/remote_issuer.go new file mode 100644 index 000000000..e65d485e4 --- /dev/null +++ b/components/runner/remote_issuer.go @@ -0,0 +1,69 @@ +package runner + +import ( + "context" + "fmt" + "time" + + "miren.dev/runtime/api/runner/runner_v1alpha" + "miren.dev/runtime/pkg/workloadidentity" +) + +// remoteTokenTimeout bounds a single token-minting RPC to the coordinator. +const remoteTokenTimeout = 30 * time.Second + +// remoteIssuer satisfies workloadidentity.TokenIssuer by proxying token minting +// to the coordinator over RPC. Distributed runners do not hold the cluster +// signing key, so they cannot mint tokens locally and instead ask the +// coordinator, which holds the key. +// +// The issuer URL is captured once at startup. It is the cluster's OIDC issuer +// anchor, derived from the coordinator's configuration at boot, so it is stable +// for a coordinator's process lifetime; a change (e.g. the cluster gaining a DNS +// hostname via re-registration) restarts the coordinator and is picked up when +// the runner reconnects/restarts. +type remoteIssuer struct { + ctx context.Context + client *runner_v1alpha.RunnerRegistrationClient + issuerURL string +} + +var _ workloadidentity.TokenIssuer = (*remoteIssuer)(nil) + +func newRemoteIssuer(ctx context.Context, client *runner_v1alpha.RunnerRegistrationClient, issuerURL string) *remoteIssuer { + return &remoteIssuer{ + ctx: ctx, + client: client, + issuerURL: issuerURL, + } +} + +func (r *remoteIssuer) IssuerURL() string { + return r.issuerURL +} + +func (r *remoteIssuer) IssueToken(app, sandboxID string) (string, error) { + return r.IssueTokenWithOptions(app, sandboxID, workloadidentity.TokenOptions{}) +} + +// IssueTokenWithOptions mints a token via the coordinator. The app argument is +// ignored: the coordinator derives the app identity from the sandbox itself so +// a runner cannot forge it. +func (r *remoteIssuer) IssueTokenWithOptions(_, sandboxID string, opts workloadidentity.TokenOptions) (string, error) { + ctx, cancel := context.WithTimeout(r.ctx, remoteTokenTimeout) + defer cancel() + + var ttlSeconds int64 + if opts.TTL > 0 { + ttlSeconds = int64(opts.TTL / time.Second) + } + + res, err := r.client.IssueWorkloadToken(ctx, sandboxID, opts.Audience, ttlSeconds) + if err != nil { + return "", fmt.Errorf("requesting workload token from coordinator: %w", err) + } + if res.Error() != "" { + return "", fmt.Errorf("coordinator refused workload token: %s", res.Error()) + } + return res.Token(), nil +} diff --git a/components/runner/remote_issuer_test.go b/components/runner/remote_issuer_test.go new file mode 100644 index 000000000..facbb0759 --- /dev/null +++ b/components/runner/remote_issuer_test.go @@ -0,0 +1,13 @@ +package runner + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRemoteIssuerCachesURL(t *testing.T) { + ri := newRemoteIssuer(context.Background(), nil, "https://issuer.example") + require.Equal(t, "https://issuer.example", ri.IssuerURL()) +} diff --git a/components/runner/runner.go b/components/runner/runner.go index b97eb87d7..ac0290f6b 100644 --- a/components/runner/runner.go +++ b/components/runner/runner.go @@ -22,6 +22,7 @@ import ( "miren.dev/runtime/api/ingress/ingress_v1alpha" "miren.dev/runtime/api/metric/metric_v1alpha" "miren.dev/runtime/api/network/network_v1alpha" + "miren.dev/runtime/api/runner/runner_v1alpha" "miren.dev/runtime/api/storage/storage_v1alpha" "miren.dev/runtime/clientconfig" "miren.dev/runtime/components/coordinate" @@ -109,12 +110,19 @@ type RunnerDeps struct { EtcdTLSKeyFile string // Client private key file path EtcdTLSCAFile string // CA certificate file path - // WorkloadIssuer signs workload identity tokens for sandbox containers - WorkloadIssuer *workloadidentity.Issuer + // WorkloadIssuer mints workload identity tokens for sandbox containers. On + // the coordinator this is the concrete *workloadidentity.Issuer; on a + // distributed runner it is a remote issuer that proxies minting to the + // coordinator over RPC. + WorkloadIssuer workloadidentity.TokenIssuer } const ( DefaulWorkers = 3 + + // Bounded retry for the coordinator's workload-issuer-info query at startup. + issuerInfoMaxAttempts = 3 + issuerInfoRetryDelay = 2 * time.Second ) type shutdownCloser struct{ s interface{ Shutdown() } } @@ -431,6 +439,13 @@ func (r *Runner) Start(ctx context.Context, eg ...*errgroup.Group) error { ec := entityserver.NewClient(r.Log, eas) + // Distributed runners mint workload identity tokens via the coordinator, + // since they do not hold the cluster signing key. A failure here degrades + // to no sandbox tokens rather than blocking runner startup. + if err := r.setupRemoteWorkloadIssuer(ctx, rs); err != nil { + r.Log.Warn("failed to set up workload identity issuer", "error", err) + } + cm, err := r.SetupControllers(ctx, eas, rs.Server()) if err != nil { return err @@ -460,6 +475,64 @@ func (r *Runner) Start(ctx context.Context, eg ...*errgroup.Group) error { return nil } +// setupRemoteWorkloadIssuer wires a remote workload identity issuer for +// distributed runners. Runners do not hold the cluster signing key, so they +// mint tokens by calling the coordinator's RunnerRegistration service. When the +// coordinator reports no issuer is configured, token issuance stays disabled +// (deps.WorkloadIssuer remains nil). The coordinator's embedded runner +// (r.Config == nil) keeps the concrete issuer it was constructed with. +func (r *Runner) setupRemoteWorkloadIssuer(ctx context.Context, rs *rpc.State) error { + if r.Config == nil || r.deps.WorkloadIssuer != nil { + return nil + } + + client, err := rs.Client(string(rpc.ServiceRunner)) + if err != nil { + return fmt.Errorf("connecting to coordinator runner service: %w", err) + } + + regClient := runner_v1alpha.NewRunnerRegistrationClient(client) + + // Retry transient failures: the entities connection was just established, so + // a failure here is usually a brief blip. Giving up immediately would leave + // the runner with no token issuance until it is restarted. + var info *runner_v1alpha.RunnerRegistrationClientWorkloadIssuerInfoResults + for attempt := 1; ; attempt++ { + info, err = queryWorkloadIssuerInfo(ctx, regClient) + if err == nil { + break + } + if attempt >= issuerInfoMaxAttempts { + return fmt.Errorf("querying workload issuer info after %d attempts: %w", attempt, err) + } + r.Log.Warn("workload issuer info query failed; retrying", + "attempt", attempt, "max", issuerInfoMaxAttempts, "error", err) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(issuerInfoRetryDelay): + } + } + + if !info.Enabled() { + r.Log.Info("coordinator has no workload identity issuer; sandbox tokens disabled") + return nil + } + + r.deps.WorkloadIssuer = newRemoteIssuer(ctx, regClient, info.IssuerUrl()) + r.Log.Info("workload identity issuer enabled via coordinator", "issuer", info.IssuerUrl()) + return nil +} + +// queryWorkloadIssuerInfo performs a single WorkloadIssuerInfo call bounded by a +// per-attempt timeout, so a hung coordinator RPC cannot stall runner startup +// indefinitely and the retry budget is allowed to expire. +func queryWorkloadIssuerInfo(ctx context.Context, regClient *runner_v1alpha.RunnerRegistrationClient) (*runner_v1alpha.RunnerRegistrationClientWorkloadIssuerInfoResults, error) { + ctx, cancel := context.WithTimeout(ctx, remoteTokenTimeout) + defer cancel() + return regClient.WorkloadIssuerInfo(ctx) +} + // initializeNetwork sets up the Flannel network for distributed runners. // This is only called when EtcdEndpoints are configured (distributed runner mode). func (r *Runner) initializeNetwork(ctx context.Context, eg ...*errgroup.Group) error { diff --git a/controllers/sandbox/sandbox.go b/controllers/sandbox/sandbox.go index 2d520dfbd..7ae491a5b 100644 --- a/controllers/sandbox/sandbox.go +++ b/controllers/sandbox/sandbox.go @@ -83,7 +83,7 @@ type SandboxControllerDeps struct { StatusMon *observability.StatusMonitor Resolver netresolve.Resolver Metrics *Metrics - WorkloadIssuer *workloadidentity.Issuer + WorkloadIssuer workloadidentity.TokenIssuer } type SandboxController struct { @@ -110,7 +110,7 @@ type SandboxController struct { Resolver netresolve.Resolver Metrics *Metrics - WorkloadIssuer *workloadidentity.Issuer + WorkloadIssuer workloadidentity.TokenIssuer tokenRefresher *tokenRefresher tokenSecrets *tokenSecretRegistry diff --git a/controllers/sandbox/sandbox_frozen_test.go b/controllers/sandbox/sandbox_frozen_test.go index ef9782c85..6cf98595a 100644 --- a/controllers/sandbox/sandbox_frozen_test.go +++ b/controllers/sandbox/sandbox_frozen_test.go @@ -24,7 +24,7 @@ import ( // sha256sum controllers/sandbox/sandbox.go controllers/sandbox/volume.go controllers/sandbox/firewall.go func TestSandboxControllerFrozen(t *testing.T) { frozen := map[string]string{ - "sandbox.go": "65e398f232a887de37250abd8da5f27bf66e9ace6625331145d4e788a08f1707", + "sandbox.go": "bc84259d85ac797bbf7d031cef36b32294183ac082ea4a0fd76b3da730af82bf", "volume.go": "292dbc050cd94901ab704a23605f5537c944787c9e06077a3fc004f40e9c0b6c", "firewall.go": "648cb5d91091d5eb7400152b19695a8045585feae59c5dd36c12d663a27bb91f", } diff --git a/controllers/sandbox/token_server_test.go b/controllers/sandbox/token_server_test.go index 8c4fb8d96..7ae56286e 100644 --- a/controllers/sandbox/token_server_test.go +++ b/controllers/sandbox/token_server_test.go @@ -71,7 +71,7 @@ func TestTokenServer_DefaultToken(t *testing.T) { require.NotEmpty(t, resp.Value) token, err := jwt.ParseWithClaims(resp.Value, &workloadidentity.WorkloadClaims{}, func(tok *jwt.Token) (interface{}, error) { - return c.WorkloadIssuer.PublicKey(), nil + return c.WorkloadIssuer.(*workloadidentity.Issuer).PublicKey(), nil }) require.NoError(t, err) @@ -95,7 +95,7 @@ func TestTokenServer_CustomAudience(t *testing.T) { require.NoError(t, err) token, err := jwt.ParseWithClaims(resp.Value, &workloadidentity.WorkloadClaims{}, func(tok *jwt.Token) (interface{}, error) { - return c.WorkloadIssuer.PublicKey(), nil + return c.WorkloadIssuer.(*workloadidentity.Issuer).PublicKey(), nil }, jwt.WithAudience("sts.amazonaws.com")) require.NoError(t, err) @@ -116,7 +116,7 @@ func TestTokenServer_CustomTTL(t *testing.T) { require.NoError(t, err) token, err := jwt.ParseWithClaims(resp.Value, &workloadidentity.WorkloadClaims{}, func(tok *jwt.Token) (interface{}, error) { - return c.WorkloadIssuer.PublicKey(), nil + return c.WorkloadIssuer.(*workloadidentity.Issuer).PublicKey(), nil }) require.NoError(t, err) diff --git a/pkg/workloadidentity/issuer.go b/pkg/workloadidentity/issuer.go index 550c3fd46..4e1f9d3d1 100644 --- a/pkg/workloadidentity/issuer.go +++ b/pkg/workloadidentity/issuer.go @@ -60,6 +60,18 @@ type Issuer struct { previousKey *jose.JSONWebKey } +// TokenIssuer is the minting surface the sandbox controller depends on. The +// concrete *Issuer satisfies it directly (the coordinator holds the signing +// key). Distributed runners have no signing key, so they supply an +// implementation that proxies minting to the coordinator over RPC. +type TokenIssuer interface { + IssueToken(app, sandboxID string) (string, error) + IssueTokenWithOptions(app, sandboxID string, opts TokenOptions) (string, error) + IssuerURL() string +} + +var _ TokenIssuer = (*Issuer)(nil) + type WorkloadClaims struct { jwt.RegisteredClaims OrganizationID string `json:"organization_id,omitempty"` diff --git a/servers/runner/registration.go b/servers/runner/registration.go index 28d803050..a4f4ced0b 100644 --- a/servers/runner/registration.go +++ b/servers/runner/registration.go @@ -20,7 +20,9 @@ import ( "miren.dev/runtime/pkg/entity" "miren.dev/runtime/pkg/entity/types" "miren.dev/runtime/pkg/joincode" + "miren.dev/runtime/pkg/rpc" "miren.dev/runtime/pkg/rpc/standard" + "miren.dev/runtime/pkg/workloadidentity" ) const ( @@ -42,6 +44,11 @@ type RegistrationServerConfig struct { // Observability endpoints provided to runners at join time VictoriametricsAddress string VictorialogsAddress string + + // WorkloadIssuer mints workload identity tokens. Distributed runners, which + // do not hold the cluster signing key, request tokens from the coordinator + // through this server. May be nil when no issuer is configured. + WorkloadIssuer *workloadidentity.Issuer } type RegistrationServer struct { @@ -240,11 +247,7 @@ func (s *RegistrationServer) Join(ctx context.Context, req *runner_v1alpha.Runne // Now that invite is claimed, issue the certificate with proper SANs // so the coordinator can connect to the runner's API by IP. - runnerIDPrefix := runnerID - if len(runnerIDPrefix) > 8 { - runnerIDPrefix = runnerIDPrefix[:8] - } - certName := fmt.Sprintf("runner-%s", runnerIDPrefix) + certName := runnerCertName(runnerID) ips := []net.IP{ net.ParseIP("127.0.0.1"), @@ -604,6 +607,175 @@ func (s *RegistrationServer) RemoveRunner(ctx context.Context, req *runner_v1alp return nil } +// WorkloadIssuerInfo reports whether the coordinator has a workload identity +// issuer configured and, if so, its issuer URL. Distributed runners call this +// once at startup to decide whether to mint workload identity tokens via the +// coordinator. +func (s *RegistrationServer) WorkloadIssuerInfo(ctx context.Context, req *runner_v1alpha.RunnerRegistrationWorkloadIssuerInfo) error { + results := req.Results() + + if s.WorkloadIssuer == nil { + results.SetEnabled(false) + return nil + } + + results.SetEnabled(true) + results.SetIssuerUrl(s.WorkloadIssuer.IssuerURL()) + return nil +} + +// IssueWorkloadToken mints a workload identity token for a sandbox on behalf of +// a distributed runner, which does not hold the cluster signing key. The caller +// is an mTLS-authenticated runner. +func (s *RegistrationServer) IssueWorkloadToken(ctx context.Context, req *runner_v1alpha.RunnerRegistrationIssueWorkloadToken) error { + args := req.Args() + results := req.Results() + + if s.WorkloadIssuer == nil { + results.SetError("workload identity issuer is not configured") + return nil + } + + if !args.HasSandboxId() || args.SandboxId() == "" { + results.SetError("sandbox_id is required") + return nil + } + sandboxID := args.SandboxId() + + // A runner may only mint tokens for sandboxes scheduled to it. This prevents + // a compromised or buggy runner from obtaining identities for workloads + // running on other runners. + if err := s.authorizeSandboxOwnership(ctx, sandboxID); err != nil { + s.Log.Warn("workload token request denied", "sandbox", sandboxID, "error", err) + results.SetError("not authorized to issue a token for this sandbox") + return nil + } + + // Derive the app identity from the sandbox itself rather than trusting the + // caller. The app is part of the token subject that external verifiers + // federate on, so a runner must not be able to forge it. + appName := s.resolveSandboxApp(ctx, sandboxID) + + opts := workloadidentity.TokenOptions{} + if args.HasAudience() { + opts.Audience = args.Audience() + } + if args.HasTtlSeconds() && args.TtlSeconds() > 0 { + opts.TTL = time.Duration(args.TtlSeconds()) * time.Second + } + + token, err := s.WorkloadIssuer.IssueTokenWithOptions(appName, sandboxID, opts) + if err != nil { + s.Log.Error("failed to issue workload identity token", + "sandbox", sandboxID, "app", appName, "error", err) + results.SetError("failed to issue token") + return nil + } + + results.SetToken(token) + return nil +} + +// runnerCertName is the client-certificate CommonName issued to a runner during +// Join. It embeds the full runner ID so the coordinator can attribute an mTLS +// connection back to a specific runner and authorize per-runner actions. +// +// The full ID (rather than a prefix) is required for authorization: a runner +// chooses its own runner ID at Join, so a short prefix would let a malicious +// runner pick an ID whose prefix collides with a victim's cert name and mint +// tokens for the victim's sandboxes. A runner ID is a UUID, so "runner-" plus +// the ID stays within the certificate CommonName length limit. +func runnerCertName(runnerID string) string { + return fmt.Sprintf("runner-%s", runnerID) +} + +// authorizeSandboxOwnership verifies the authenticated caller is the runner the +// given sandbox is scheduled to. Callers authenticate with the mTLS client +// certificate issued during Join, whose CommonName maps back to the runner via +// runnerCertName. When authentication is disabled (anonymous), there is nothing +// to verify against and the check is skipped. +func (s *RegistrationServer) authorizeSandboxOwnership(ctx context.Context, sandboxID string) error { + identity := rpc.IdentityFromContext(ctx) + if identity == nil { + return fmt.Errorf("no caller identity") + } + if identity.Method == rpc.AuthMethodAnonymous { + // Authentication is disabled on this coordinator; ownership cannot be + // established, so don't block issuance. + return nil + } + if identity.Method != rpc.AuthMethodCert { + return fmt.Errorf("caller must authenticate with a runner certificate, got %q", identity.Method) + } + + sbResp, err := s.EAC.Get(ctx, sandboxID) + if err != nil { + return fmt.Errorf("looking up sandbox %s: %w", sandboxID, err) + } + + var sch compute_v1alpha.Schedule + sch.Decode(sbResp.Entity().Entity()) + if sch.Key.Node == "" { + return fmt.Errorf("sandbox %s is not scheduled to a node", sandboxID) + } + + nodeResp, err := s.EAC.Get(ctx, string(sch.Key.Node)) + if err != nil { + return fmt.Errorf("looking up node %s: %w", sch.Key.Node, err) + } + + var node compute_v1alpha.Node + node.Decode(nodeResp.Entity().Entity()) + if node.RunnerId == "" { + return fmt.Errorf("node %s has no runner id", sch.Key.Node) + } + + if expected := runnerCertName(node.RunnerId); identity.Subject != expected { + return fmt.Errorf("caller %q is not the owner of sandbox %s (owned by %q)", + identity.Subject, sandboxID, expected) + } + + return nil +} + +// resolveSandboxApp derives the application name for a sandbox from the entity +// store (sandbox -> app version -> app metadata name), mirroring the sandbox +// controller's local resolution. The app name is part of the workload identity +// token subject, so it must be derived server-side rather than trusted from the +// calling runner. Returns "" when the app cannot be resolved. +func (s *RegistrationServer) resolveSandboxApp(ctx context.Context, sandboxID string) string { + sbResp, err := s.EAC.Get(ctx, sandboxID) + if err != nil { + return "" + } + + var sb compute_v1alpha.Sandbox + sb.Decode(sbResp.Entity().Entity()) + if sb.Spec.Version == "" { + return "" + } + + versionResp, err := s.EAC.Get(ctx, sb.Spec.Version.String()) + if err != nil { + return "" + } + + var version core_v1alpha.AppVersion + version.Decode(versionResp.Entity().Entity()) + if version.App == "" { + return "" + } + + appResp, err := s.EAC.Get(ctx, version.App.String()) + if err != nil { + return "" + } + + var appMeta core_v1alpha.Metadata + appMeta.Decode(appResp.Entity().Entity()) + return appMeta.Name +} + // findNodeByQuery looks up a node entity by name, runner ID, entity ID, or short ID prefix. // Exact matches (name, runner ID, entity ID) are returned immediately. Prefix // matches are collected and only returned when unambiguous. diff --git a/servers/runner/registration_authz_test.go b/servers/runner/registration_authz_test.go new file mode 100644 index 000000000..af3710e20 --- /dev/null +++ b/servers/runner/registration_authz_test.go @@ -0,0 +1,212 @@ +package runner + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "miren.dev/runtime/api/compute/compute_v1alpha" + "miren.dev/runtime/api/core/core_v1alpha" + "miren.dev/runtime/pkg/caauth" + "miren.dev/runtime/pkg/entity" + "miren.dev/runtime/pkg/entity/testutils" + "miren.dev/runtime/pkg/rpc" +) + +const ( + authzRunnerID = "abcd1234-5678-90ab-cdef-1234567890ab" + authzSandboxID = "sandbox/myapp-web-aaa111" +) + +// newOwnershipTestServer builds a RegistrationServer backed by an in-memory +// entity store containing a node owned by authzRunnerID and a sandbox scheduled +// to it. +func newOwnershipTestServer(t *testing.T) (*RegistrationServer, func()) { + t.Helper() + + es, cleanup := testutils.NewInMemEntityServer(t) + + ca, err := caauth.New(caauth.Options{CommonName: "test-ca", Organization: "test"}) + require.NoError(t, err) + + srv := NewRegistrationServer(RegistrationServerConfig{ + Log: testutils.TestLogger(t), + Authority: ca, + EAC: es.EAC, + }) + + ctx := context.Background() + nodeID := entity.Id("node/" + authzRunnerID) + + _, err = es.EAC.Create(ctx, entity.New( + entity.DBId, nodeID, + (&compute_v1alpha.Node{RunnerId: authzRunnerID}).Encode, + ).Attrs()) + require.NoError(t, err) + + schedule := compute_v1alpha.Schedule{ + Key: compute_v1alpha.Key{Kind: compute_v1alpha.KindSandbox, Node: nodeID}, + } + _, err = es.EAC.Create(ctx, entity.New( + entity.DBId, entity.Id(authzSandboxID), + (&compute_v1alpha.Sandbox{}).Encode, + schedule.Encode, + ).Attrs()) + require.NoError(t, err) + + return srv, cleanup +} + +func ctxWithCert(subject string) context.Context { + return rpc.ContextWithIdentity(context.Background(), &rpc.Identity{ + Subject: subject, + Method: rpc.AuthMethodCert, + }) +} + +func TestAuthorizeSandboxOwnership_Owner(t *testing.T) { + srv, cleanup := newOwnershipTestServer(t) + defer cleanup() + + ctx := ctxWithCert(runnerCertName(authzRunnerID)) + require.NoError(t, srv.authorizeSandboxOwnership(ctx, authzSandboxID)) +} + +func TestAuthorizeSandboxOwnership_OtherRunnerDenied(t *testing.T) { + srv, cleanup := newOwnershipTestServer(t) + defer cleanup() + + ctx := ctxWithCert(runnerCertName("99999999-0000-0000-0000-000000000000")) + require.Error(t, srv.authorizeSandboxOwnership(ctx, authzSandboxID)) +} + +func TestAuthorizeSandboxOwnership_AnonymousSkipped(t *testing.T) { + srv, cleanup := newOwnershipTestServer(t) + defer cleanup() + + ctx := rpc.ContextWithIdentity(context.Background(), &rpc.Identity{ + Subject: "anonymous", + Method: rpc.AuthMethodAnonymous, + }) + require.NoError(t, srv.authorizeSandboxOwnership(ctx, authzSandboxID)) +} + +func TestAuthorizeSandboxOwnership_NonCertDenied(t *testing.T) { + srv, cleanup := newOwnershipTestServer(t) + defer cleanup() + + ctx := rpc.ContextWithIdentity(context.Background(), &rpc.Identity{ + Subject: "user@example.com", + Method: rpc.AuthMethodJWT, + }) + require.Error(t, srv.authorizeSandboxOwnership(ctx, authzSandboxID)) +} + +func TestAuthorizeSandboxOwnership_NoIdentityDenied(t *testing.T) { + srv, cleanup := newOwnershipTestServer(t) + defer cleanup() + + require.Error(t, srv.authorizeSandboxOwnership(context.Background(), authzSandboxID)) +} + +func TestAuthorizeSandboxOwnership_MissingSandboxDenied(t *testing.T) { + srv, cleanup := newOwnershipTestServer(t) + defer cleanup() + + ctx := ctxWithCert(runnerCertName(authzRunnerID)) + require.Error(t, srv.authorizeSandboxOwnership(ctx, "sandbox/does-not-exist")) +} + +func TestAuthorizeSandboxOwnership_UnscheduledDenied(t *testing.T) { + ctx := context.Background() + es, cleanup := testutils.NewInMemEntityServer(t) + defer cleanup() + + ca, err := caauth.New(caauth.Options{CommonName: "test-ca", Organization: "test"}) + require.NoError(t, err) + + srv := NewRegistrationServer(RegistrationServerConfig{ + Log: testutils.TestLogger(t), + Authority: ca, + EAC: es.EAC, + }) + + // A sandbox that exists but has no scheduling node, exercising the + // sch.Key.Node == "" branch rather than the missing-entity path. + _, err = es.EAC.Create(ctx, entity.New( + entity.DBId, entity.Id("sandbox/unscheduled"), + (&compute_v1alpha.Sandbox{}).Encode, + ).Attrs()) + require.NoError(t, err) + + authCtx := ctxWithCert(runnerCertName(authzRunnerID)) + err = srv.authorizeSandboxOwnership(authCtx, "sandbox/unscheduled") + require.ErrorContains(t, err, "not scheduled to a node") +} + +func TestRunnerCertNameUsesFullID(t *testing.T) { + // Two runner IDs sharing an 8-char prefix must map to distinct cert names, + // otherwise a runner could choose a colliding ID and impersonate another. + a := "abcd1234-1111-1111-1111-111111111111" + b := "abcd1234-2222-2222-2222-222222222222" + require.NotEqual(t, runnerCertName(a), runnerCertName(b)) + require.Equal(t, "runner-"+a, runnerCertName(a)) +} + +func TestResolveSandboxApp(t *testing.T) { + ctx := context.Background() + es, cleanup := testutils.NewInMemEntityServer(t) + defer cleanup() + + ca, err := caauth.New(caauth.Options{CommonName: "test-ca", Organization: "test"}) + require.NoError(t, err) + + srv := NewRegistrationServer(RegistrationServerConfig{ + Log: testutils.TestLogger(t), + Authority: ca, + EAC: es.EAC, + }) + + _, err = es.EAC.Create(ctx, entity.New( + entity.DBId, entity.Id("app/myapp"), + (&core_v1alpha.Metadata{Name: "myapp"}).Encode, + ).Attrs()) + require.NoError(t, err) + + _, err = es.EAC.Create(ctx, entity.New( + entity.DBId, entity.Id("version/v1"), + (&core_v1alpha.AppVersion{App: "app/myapp"}).Encode, + ).Attrs()) + require.NoError(t, err) + + _, err = es.EAC.Create(ctx, entity.New( + entity.DBId, entity.Id("sandbox/s1"), + (&compute_v1alpha.Sandbox{Spec: compute_v1alpha.SandboxSpec{Version: "version/v1"}}).Encode, + ).Attrs()) + require.NoError(t, err) + + require.Equal(t, "myapp", srv.resolveSandboxApp(ctx, "sandbox/s1")) +} + +func TestResolveSandboxApp_NoVersionReturnsEmpty(t *testing.T) { + ctx := context.Background() + es, cleanup := testutils.NewInMemEntityServer(t) + defer cleanup() + + ca, err := caauth.New(caauth.Options{CommonName: "test-ca", Organization: "test"}) + require.NoError(t, err) + + srv := NewRegistrationServer(RegistrationServerConfig{ + Log: testutils.TestLogger(t), + Authority: ca, + EAC: es.EAC, + }) + + _, err = es.EAC.Create(ctx, entity.New( + entity.DBId, entity.Id("sandbox/no-version"), + (&compute_v1alpha.Sandbox{}).Encode, + ).Attrs()) + require.NoError(t, err) + + require.Equal(t, "", srv.resolveSandboxApp(ctx, "sandbox/no-version")) +}