From a0d23d462f1ba0d60158873ce65574ce653dd5ea Mon Sep 17 00:00:00 2001 From: Paul Hinze Date: Thu, 28 May 2026 11:25:05 -0500 Subject: [PATCH 1/2] servers/build: convert build process to saga framework The build pipeline (BuildFromTar / BuildFromPrepared / buildFromDir) ran as a 450-line synchronous function whose only crash recovery was defer-based cleanup that died with the process. A server restart mid build could leave orphaned ConfigVersion or AppVersion entities, with no way to either resume the build or compensate the partial state. Converted to the saga framework from MIR-439, following the sandbox saga pattern from MIR-440. Each phase of the build becomes a saga action with an undo; a new StreamRegistry stages incoming tar data to disk so recovery survives the loss of the original io.Reader, and a symmetric StatusRegistry handles outbound progress streaming without coupling actions to the per-request RPC stream. SagaBuilder implements the Builder RPC interface and coordinate.go selects it behind the existing labs.Sagas() flag. Closes MIR-441 --- components/coordinate/coordinate.go | 13 +- pkg/labs/features.yaml | 2 +- pkg/labs/labs.gen.go | 4 +- servers/build/build.go | 238 +-------- servers/build/build_saga.go | 733 ++++++++++++++++++++++++++ servers/build/build_saga_buildkit.go | 243 +++++++++ servers/build/build_saga_test.go | 366 +++++++++++++ servers/build/saga_builder.go | 311 +++++++++++ servers/build/status_registry.go | 174 ++++++ servers/build/status_registry_test.go | 131 +++++ servers/build/stream_registry.go | 183 +++++++ servers/build/stream_registry_test.go | 261 +++++++++ 12 files changed, 2438 insertions(+), 221 deletions(-) create mode 100644 servers/build/build_saga.go create mode 100644 servers/build/build_saga_buildkit.go create mode 100644 servers/build/build_saga_test.go create mode 100644 servers/build/saga_builder.go create mode 100644 servers/build/status_registry.go create mode 100644 servers/build/status_registry_test.go create mode 100644 servers/build/stream_registry.go create mode 100644 servers/build/stream_registry_test.go diff --git a/components/coordinate/coordinate.go b/components/coordinate/coordinate.go index 0ddbd1377..4e9f20fc8 100644 --- a/components/coordinate/coordinate.go +++ b/components/coordinate/coordinate.go @@ -1118,7 +1118,18 @@ func (c *Coordinator) Start(ctx context.Context) error { appClient := appclient.NewClient(c.Log, loopback) bs := build.NewBuilder(c.Log, eac, appClient, addonsClient, c.Resolver, c.TempDir, c.LogWriter, c.CloudAuth.DNSHostname, c.BuildKit, c.DataPath) - server.ExposeValue("dev.miren.runtime/build", build_v1alpha.AdaptBuilder(bs)) + + var buildHandler build_v1alpha.Builder = bs + if labs.Sagas() { + sagaStorage := saga.NewEntityStorage(etcdStore, c.Log) + sagaBuilder := build.NewSagaBuilder(bs, sagaStorage, c.Log) + if err := sagaBuilder.Init(ctx); err != nil { + c.Log.Error("failed to initialize saga builder", "error", err) + return err + } + buildHandler = sagaBuilder + } + server.ExposeValue("dev.miren.runtime/build", build_v1alpha.AdaptBuilder(buildHandler)) ls := logs.NewServer(c.Log, ec, c.Logs) server.ExposeValue("dev.miren.runtime/logs", app_v1alpha.AdaptLogs(ls)) diff --git a/pkg/labs/features.yaml b/pkg/labs/features.yaml index 424e19627..a40e92823 100644 --- a/pkg/labs/features.yaml +++ b/pkg/labs/features.yaml @@ -16,5 +16,5 @@ features: - name: sagas predicate: Sagas - description: Use saga-based crash-recoverable workflows for sandbox lifecycle + description: Use saga-based crash-recoverable workflows default: false diff --git a/pkg/labs/labs.gen.go b/pkg/labs/labs.gen.go index 0fbfe6ce0..ec224f381 100644 --- a/pkg/labs/labs.gen.go +++ b/pkg/labs/labs.gen.go @@ -32,7 +32,7 @@ func FeatureDescriptions() map[string]string { FeatureGlobalRouter: "Use global NAT traversal router for connectivity", FeatureDistributedRunners: "Schedule jobs across multiple runner nodes", FeatureAdminAPI: "Enable the admin API for application management functions", - FeatureSagas: "Use saga-based crash-recoverable workflows for sandbox lifecycle", + FeatureSagas: "Use saga-based crash-recoverable workflows", } } @@ -153,7 +153,7 @@ func AdminAPI() bool { } // Sagas returns whether the sagas feature is enabled. -// Use saga-based crash-recoverable workflows for sandbox lifecycle +// Use saga-based crash-recoverable workflows func Sagas() bool { return IsEnabled(FeatureSagas) } diff --git a/servers/build/build.go b/servers/build/build.go index d113b0f46..71db61c89 100644 --- a/servers/build/build.go +++ b/servers/build/build.go @@ -14,7 +14,6 @@ import ( "sync" "time" - "github.com/moby/buildkit/client" "github.com/tonistiigi/fsutil" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -1124,8 +1123,6 @@ func (b *Builder) buildFromDir(ctx context.Context, name string, path string, envVars []*build_v1alpha.EnvironmentVariable, ephemeral *ephemeralOpts) (*buildResult, error) { - so := new(build_v1alpha.Status) - // -- build.setup span: app config, stack detection, buildkit connect ctx, setupSpan := buildTracer.Start(ctx, "build.setup") @@ -1149,94 +1146,18 @@ func (b *Builder) buildFromDir(ctx context.Context, name string, path string, b.Log.Info("loaded app config", "name", ac.Name, "envVarCount", len(ac.EnvVars), "serviceCount", len(ac.Services)) } - var buildStack BuildStack - buildStack.CodeDir = path - - if ac != nil && ac.Build != nil { - buildStack.OnBuild = ac.Build.OnBuild - buildStack.Version = ac.Build.Version - buildStack.AlpineImage = ac.Build.AlpineImage - - if ac.Build.Dockerfile != "" { - buildStack.Stack = "dockerfile" - buildStack.Input = ac.Build.Dockerfile - - b.Log.Info("using dockerfile from app config", "dockerfile", ac.Build.Dockerfile) - } - } - - if buildStack.Stack == "" { - dr, err := tr.Open("Dockerfile.miren") - if err == nil { - buildStack.Stack = "dockerfile" - buildStack.Input = "Dockerfile.miren" - dr.Close() - } else { - buildStack.Stack = "auto" - } - } - - // Check if stack is supported before launching buildkit - if buildStack.Stack == "auto" { - detectOpts := stackbuild.BuildOptions{ - Log: b.Log, - Name: name, - OnBuild: buildStack.OnBuild, - Version: buildStack.Version, - AlpineImage: buildStack.AlpineImage, - } - _, err := stackbuild.DetectStack(buildStack.CodeDir, detectOpts) - if err != nil { - setupSpan.RecordError(err) - setupSpan.SetStatus(codes.Error, err.Error()) - setupSpan.End() - b.Log.Error("stack detection failed", "error", err, "app", name, "codeDir", buildStack.CodeDir) - b.sendErrorStatus(ctx, status, "No supported stack detected for app %s: %v", name, err) - return nil, fmt.Errorf("no supported stack detected for app %s: %w", name, err) - } - b.Log.Debug("stack detection successful, proceeding with build") - } - - // Now we know the stack is valid, proceed with buildkit setup - b.Log.Debug("setting up buildkit") - - if b.BuildKit == nil { - setupSpan.RecordError(fmt.Errorf("buildkit component not configured")) - setupSpan.SetStatus(codes.Error, "buildkit component not configured") - setupSpan.End() - b.Log.Error("buildkit component not configured") - b.sendErrorStatus(ctx, status, "BuildKit not configured - ensure server is running with BuildKit enabled") - return nil, fmt.Errorf("buildkit component not configured") - } - - b.Log.Info("connecting to buildkit daemon") - bkc, err := b.BuildKit.Client(ctx) + buildStack, err := b.detectBuildStack(path, ac, name, tr) if err != nil { setupSpan.RecordError(err) setupSpan.SetStatus(codes.Error, err.Error()) setupSpan.End() - b.Log.Error("failed to get buildkit client", "error", err) - b.sendErrorStatus(ctx, status, "Failed to connect to BuildKit: %v", err) + b.sendErrorStatus(ctx, status, "No supported stack detected for app %s: %v", name, err) return nil, err } - defer bkc.Close() - - b.Log.Debug("getting buildkit daemon info") - ci, err := bkc.Info(ctx) - if err != nil { - b.Log.Error("error getting buildkitd info", "error", err) - } else { - b.Log.Debug("buildkitd info", "version", ci.BuildkitVersion.Version, "rev", ci.BuildkitVersion.Revision) - } setupSpan.SetAttributes(attribute.String("miren.build.stack", buildStack.Stack)) setupSpan.End() - bk := &Buildkit{ - Client: bkc, - Log: b.Log, - } - appRec, mrv, existingCfg, _, err := b.nextVersion(ctx, name) if err != nil { b.Log.Error("error getting next version", "error", err) @@ -1244,7 +1165,6 @@ func (b *Builder) buildFromDir(ctx context.Context, name string, path string, return nil, err } - // Initialize build log writer for persisting build output to VictoriaLogs buildLog := &buildLogWriter{ log: b.Log, writer: b.LogWriter, @@ -1252,150 +1172,34 @@ func (b *Builder) buildFromDir(ctx context.Context, name string, path string, version: mrv.Version, } - // Compute env vars to inject into the build process - buildEnvVars := computeBuildEnvVars(existingCfg.Variables, ac, envVars) - if len(buildEnvVars) > 0 { - b.Log.Info("injecting env vars into build", "count", len(buildEnvVars)) - } - - var tos []TransformOptions - - tos = append(tos, - WithBuildArg("MIREN_VERSION", mrv.Version), - ) - - // Inject user env vars as build args (for Dockerfile builds) - if len(buildEnvVars) > 0 { - tos = append(tos, WithBuildArgs(buildEnvVars)) - } - - // Pass env vars for auto-stack builds - buildStack.EnvVars = buildEnvVars - - // Track vertices we've already logged to avoid duplicates - vertexStarted := make(map[string]bool) - vertexCompleted := make(map[string]bool) - - if status != nil { - tos = append(tos, WithPhaseUpdates(func(phase string) { - switch phase { - case "export": - so.Update().SetMessage("Registering image") - _, _ = status.Send(ctx, so) - case "solving": - so.Update().SetMessage("Calculating build") - _, _ = status.Send(ctx, so) - case "solved": - so.Update().SetMessage("Building image") - _, _ = status.Send(ctx, so) - default: - so.Update().SetMessage(phase) - _, _ = status.Send(ctx, so) - } - })) - } - - // Single status callback that both persists logs and sends to client - tos = append(tos, WithStatusUpdates(func(ss *client.SolveStatus, sj []byte) { - // Log vertex status (build steps starting/completing/cached) - for _, v := range ss.Vertexes { - digestStr := v.Digest.String() - - // Log when a vertex starts - if v.Started != nil && !vertexStarted[digestStr] { - vertexStarted[digestStr] = true - buildLog.write(fmt.Sprintf("[buildkit] %s", v.Name)) - } - - // Log when a vertex completes with cache status - if v.Completed != nil && !vertexCompleted[digestStr] { - vertexCompleted[digestStr] = true - if v.Cached { - buildLog.write(fmt.Sprintf("[buildkit] %s CACHED", v.Name)) - } - } - } - - // Log command output from build steps - for _, log := range ss.Logs { - if log.Data != nil { - lines := strings.Split(string(log.Data), "\n") - for _, line := range lines { - line = strings.TrimRight(line, " \t\r\n") - if strings.TrimSpace(line) != "" { - buildLog.write(line) - } - } - } - } - - // Send raw status to client if connected - if status != nil { - so := new(build_v1alpha.Status) - so.Update().SetBuildkit(sj) - _, err := status.Send(ctx, so) - if err != nil { - b.Log.Warn("error sending status update", "error", err) - } - } - })) - - if status != nil { - so.Update().SetMessage("Calculating build") - _, _ = status.Send(ctx, so) - } - - imgName := mrv.ImageUrl - - // -- build.buildkit span + // runBuildkitBuild is also the saga action's implementation, so the + // span structure (buildkit + locate_artifact bundled together) is + // slightly coarser than the pre-saga code's two separate spans. + // Trace continuity matters more than the extra granularity. bkCtx, bkSpan := buildTracer.Start(ctx, "build.buildkit", - trace.WithAttributes(attribute.String("miren.build.image", imgName))) - res, err := bk.BuildImage(bkCtx, tr, buildStack, name, imgName, tos...) + trace.WithAttributes(attribute.String("miren.build.image", mrv.ImageUrl))) + statusSender := NewRPCStatusSender(status, b.Log) + res, artifactID, finalURL, err := b.runBuildkitBuild(bkCtx, runBuildkitBuildInputs{ + SourceDir: path, + AppName: name, + VersionName: mrv.Version, + ImageURL: mrv.ImageUrl, + BuildStack: buildStack, + AppConfig: ac, + ExistingConfig: existingCfg, + CLIEnvVars: envVars, + }, statusSender, buildLog) if err != nil { bkSpan.RecordError(err) bkSpan.SetStatus(codes.Error, err.Error()) bkSpan.End() - b.Log.Error("error building image", "error", err) - b.sendErrorStatus(ctx, status, "Error building image: %v", err) return nil, err } bkSpan.End() - // Log detection events from stack analysis - for _, event := range res.DetectionEvents { - buildLog.write(fmt.Sprintf("[detect] %s: %s", event.Name, event.Message)) - } - - if res.ManifestDigest == "" { - b.Log.Error("build did not return manifest digest") - b.sendErrorStatus(ctx, status, "Build did not return manifest digest") - return nil, fmt.Errorf("build did not return manifest digest") - } - - // -- build.locate_artifact span - locateCtx, locateSpan := buildTracer.Start(ctx, "build.locate_artifact", - trace.WithAttributes(attribute.String("miren.build.manifest_digest", res.ManifestDigest))) - - var artifact core_v1alpha.Artifact - - err = b.ec.OneAtIndex(locateCtx, entity.String(core_v1alpha.ArtifactManifestDigestId, res.ManifestDigest), &artifact) - if err != nil { - locateSpan.RecordError(err) - locateSpan.SetStatus(codes.Error, err.Error()) - locateSpan.End() - b.Log.Error("error locating artifact by digest", "digest", res.ManifestDigest, "error", err) - return nil, fmt.Errorf("error locating artifact by digest %s: %w", res.ManifestDigest, err) - } - locateSpan.End() - - b.Log.Debug("located stored artifact", "artifact", artifact.ID, "digest", res.ManifestDigest) - - mrv.Artifact = artifact.ID - - // Update ImageUrl to match the artifact we found (which may be reused due to deduplication) - artifactName := strings.TrimPrefix(string(artifact.ID), "artifact/") - mrv.ImageUrl = "cluster.local:5000/" + name + ":" + artifactName - + mrv.Artifact = entity.Id(artifactID) + mrv.ImageUrl = finalURL + artifactName := strings.TrimPrefix(artifactID, "artifact/") b.Log.Debug("build complete", "image", mrv.ImageUrl) procfileServices, err := b.readProcFile(tr) diff --git a/servers/build/build_saga.go b/servers/build/build_saga.go new file mode 100644 index 000000000..0200e1950 --- /dev/null +++ b/servers/build/build_saga.go @@ -0,0 +1,733 @@ +package build + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "os" + "path/filepath" + "strings" + "time" + + "github.com/tonistiigi/fsutil" + + "miren.dev/runtime/api/build/build_v1alpha" + "miren.dev/runtime/api/core/core_v1alpha" + "miren.dev/runtime/appconfig" + "miren.dev/runtime/pkg/entity" + ephemeralx "miren.dev/runtime/pkg/ephemeral" + "miren.dev/runtime/pkg/saga" + "miren.dev/runtime/pkg/stackbuild" +) + +// Saga definition + action names. Keeping them as constants makes them +// easy to search for, and lets recovery logs reference stable action +// identifiers across restarts. +const ( + sagaBuildFromTar = "build-from-tar" + actionReceiveTar = "receive-tar" + actionLoadSource = "load-source" + actionGetNextVer = "get-next-version" + actionBuildImage = "build-image" + actionPrepareConfig = "prepare-config" + actionHandleEphemera = "handle-ephemeral" + actionCreateConfigVer = "create-config-version" + actionCreateVersion = "create-version" + actionProvisionAddons = "provision-addons" + actionSetActiveVer = "set-active-version" + actionFinalize = "finalize" +) + +// buildSagaDeps holds the collaborators injected into the saga context. +// Actions retrieve it via saga.Get[*buildSagaDeps](ctx) and call through to +// the inner Builder for real operations (entity writes, buildkit calls, +// log writes, etc.), the StreamRegistry for tar staging, or the +// StatusRegistry to emit live progress/log/error updates to the deploy +// CLI for the duration of one saga execution. +type buildSagaDeps struct { + builder *Builder + streams *StreamRegistry + statuses *StatusRegistry +} + +// receiveTarIn carries the initial inputs the entry point seeds the saga +// with. AppName is mostly used for logging and cache writes; the StreamID +// is the handle that lets us pull bytes out of StreamRegistry. +type receiveTarIn struct { + AppName string `json:"app_name" saga:"app_name"` + StreamID string `json:"stream_id" saga:"stream_id"` +} + +// receiveTarOut publishes the staged source directory so downstream +// actions can read app.toml, run buildkit against it, etc. +type receiveTarOut struct { + SourceDir string `json:"source_dir" saga:"source_dir"` +} + +// receiveTar stages the incoming tar stream to disk and (best-effort) +// updates the source code cache so the next delta upload can skip +// unchanged files. The cache write matches existing BuildFromTar +// behavior — failure logs a warning rather than failing the build, +// since cache is a perf optimization, not correctness. +func receiveTar(ctx context.Context, in receiveTarIn) (receiveTarOut, error) { + deps := saga.Get[*buildSagaDeps](ctx) + status := deps.statuses.SenderFor(in.StreamID) + + status.SendMessage("Reading application data") + + path, err := deps.streams.Stage(in.StreamID) + if err != nil { + status.SendError("Error untaring data: %v", err) + return receiveTarOut{}, fmt.Errorf("staging tar stream %s: %w", in.StreamID, err) + } + + if deps.builder.DataPath != "" { + cache := &sourceCache{ + dataPath: deps.builder.DataPath, + log: deps.builder.Log, + locks: deps.builder.cacheLocks, + } + if err := cache.saveSourceImage(in.AppName, path); err != nil { + deps.builder.Log.Warn("failed to save source code cache", "app", in.AppName, "error", err) + } + } + + status.SendMessage("Launching builder") + return receiveTarOut{SourceDir: path}, nil +} + +func undoReceiveTar(ctx context.Context, in receiveTarIn, _ receiveTarOut) error { + deps := saga.Get[*buildSagaDeps](ctx) + return deps.streams.Cleanup(in.StreamID) +} + +// loadSource reads the staged tree to produce the inputs every later +// action needs: the parsed app.toml (if present), the detected/declared +// BuildStack (with stack detection actually performed when stack=auto so +// we fail fast before launching buildkit), and the Procfile services. +// +// Pure file IO + parsing, no entity writes, so the undo is a no-op. +// Recovery just re-runs Execute against the same staged dir. + +type loadSourceIn struct { + AppName string `json:"app_name" saga:"app_name"` + SourceDir string `json:"source_dir" saga:"source_dir"` +} + +type loadSourceOut struct { + AppConfig *appconfig.AppConfig `json:"app_config,omitempty" saga:"app_config"` + BuildStack BuildStack `json:"build_stack" saga:"build_stack"` + ProcfileServices map[string]string `json:"procfile_services,omitempty" saga:"procfile_services"` +} + +func loadSource(ctx context.Context, in loadSourceIn) (loadSourceOut, error) { + deps := saga.Get[*buildSagaDeps](ctx) + b := deps.builder + + tr, err := fsutil.NewFS(in.SourceDir) + if err != nil { + return loadSourceOut{}, fmt.Errorf("opening source dir %s: %w", in.SourceDir, err) + } + + ac, err := b.loadAppConfig(tr) + if err != nil { + return loadSourceOut{}, fmt.Errorf("loading app config: %w", err) + } + + stack, err := b.detectBuildStack(in.SourceDir, ac, in.AppName, tr) + if err != nil { + return loadSourceOut{}, err + } + + procfile, err := b.readProcFile(tr) + if err != nil { + return loadSourceOut{}, fmt.Errorf("reading procfile: %w", err) + } + + return loadSourceOut{ + AppConfig: ac, + BuildStack: stack, + ProcfileServices: procfile, + }, nil +} + +func undoLoadSource(_ context.Context, _ loadSourceIn, _ loadSourceOut) error { + return nil +} + +// getNextVersion allocates a fresh version id + artifact suffix for the +// build and (idempotently) creates the App entity if this is the very +// first deploy. Mirrors the pre-saga Builder.nextVersion semantics: +// existing App is left alone, current config is loaded for env-var +// carryover, new ids are generated locally and only persisted by later +// actions (createConfigVersion / createAppVersion). +// +// The undo is a no-op. The generated ids are not durable state worth +// freeing, and the App entity might already exist (created by a prior +// deploy) or be needed by other concurrent operations — deleting it on +// build failure is the wrong instinct. + +type getNextVersionIn struct { + AppName string `json:"app_name" saga:"app_name"` +} + +type getNextVersionOut struct { + AppID string `json:"app_id" saga:"app_id"` + VersionName string `json:"version_name" saga:"version_name"` + ArtifactSuffix string `json:"artifact_suffix" saga:"artifact_suffix"` + ImageURL string `json:"image_url" saga:"image_url"` + AdminToken string `json:"admin_token" saga:"admin_token"` + ExistingConfig string `json:"existing_config_json" saga:"existing_config_json"` +} + +func getNextVersion(ctx context.Context, in getNextVersionIn) (getNextVersionOut, error) { + deps := saga.Get[*buildSagaDeps](ctx) + + appRec, mrv, existing, art, err := deps.builder.nextVersion(ctx, in.AppName) + if err != nil { + return getNextVersionOut{}, fmt.Errorf("allocating next version for %s: %w", in.AppName, err) + } + + existingJSON, err := marshalConfigSpec(existing) + if err != nil { + return getNextVersionOut{}, fmt.Errorf("serializing existing config: %w", err) + } + + return getNextVersionOut{ + AppID: string(appRec.ID), + VersionName: mrv.Version, + ArtifactSuffix: art, + ImageURL: mrv.ImageUrl, + AdminToken: mrv.AdminToken, + ExistingConfig: existingJSON, + }, nil +} + +func undoGetNextVersion(_ context.Context, _ getNextVersionIn, _ getNextVersionOut) error { + return nil +} + +// prepareConfig assembles the final ConfigSpec for the new version by +// merging build outputs, app.toml, Procfile, and existing app config, +// then runs every blocking validation (services exist, required vars +// have values, node ports are free, disk references resolve). Pure +// computation + entity reads — no side effects, no undo needed. +// +// Validation failures surface as the saga error and a user-facing +// status update. The pre-saga path called validateNodePorts and +// validateDiskConfigs separately; bundling them with the rest in one +// action keeps the saga DAG simple and matches what the user +// experiences as one logical "config prep" step. + +type prepareConfigIn struct { + AppName string `json:"app_name" saga:"app_name"` + StreamID string `json:"stream_id" saga:"stream_id"` + AppID string `json:"app_id" saga:"app_id"` + BuildResult *BuildResult `json:"build_result,omitempty" saga:"build_result,optional"` + AppConfig *appconfig.AppConfig `json:"app_config,omitempty" saga:"app_config,optional"` + ProcfileServices map[string]string `json:"procfile_services,omitempty" saga:"procfile_services,optional"` + ExistingConfig string `json:"existing_config_json" saga:"existing_config_json"` + CLIEnvVars []*build_v1alpha.EnvironmentVariable `json:"cli_env_vars,omitempty" saga:"cli_env_vars,optional"` +} + +type prepareConfigOut struct { + ConfigSpec string `json:"config_spec_json" saga:"config_spec_json"` +} + +func prepareConfig(ctx context.Context, in prepareConfigIn) (prepareConfigOut, error) { + deps := saga.Get[*buildSagaDeps](ctx) + b := deps.builder + status := deps.statuses.SenderFor(in.StreamID) + + existing, err := unmarshalConfigSpec(in.ExistingConfig) + if err != nil { + return prepareConfigOut{}, fmt.Errorf("deserializing existing config: %w", err) + } + + spec := buildVersionConfig(ConfigInputs{ + BuildResult: in.BuildResult, + AppConfig: in.AppConfig, + ProcfileServices: in.ProcfileServices, + ExistingConfig: existing, + CliEnvVars: in.CLIEnvVars, + }) + + if err := validateServicesExist(spec); err != nil { + status.SendError("%s. See https://miren.md/services", err) + return prepareConfigOut{}, err + } + if err := validateRequiredVars(spec); err != nil { + status.SendError("%s", err) + return prepareConfigOut{}, err + } + if err := validateNodePorts(ctx, b.ec.EAC(), entity.Id(in.AppID), spec); err != nil { + status.SendError("Deploy failed: %v", err) + return prepareConfigOut{}, err + } + if err := validateDiskConfigs(ctx, b.ec.EAC(), spec); err != nil { + status.SendError("Deploy failed: %v", err) + return prepareConfigOut{}, err + } + + specJSON, err := json.Marshal(spec) + if err != nil { + return prepareConfigOut{}, fmt.Errorf("serializing config spec: %w", err) + } + return prepareConfigOut{ConfigSpec: string(specJSON)}, nil +} + +func undoPrepareConfig(_ context.Context, _ prepareConfigIn, _ prepareConfigOut) error { + return nil +} + +// handleEphemeral covers the ephemeral-version-specific bookkeeping: +// validating the label, parsing the TTL, deleting any existing version +// with the same label (replace-on-same-label), and enforcing the per- +// app ephemeral version limit. No-op for non-ephemeral deploys. +// +// The undo is intentionally a no-op even though replace-existing +// deletes versions. Those versions were going to be replaced by the +// new build, and an aborted build doesn't make the user want them +// back — they want to retry with new code. Re-creating deleted +// entities would also race with concurrent reconciliation. + +type handleEphemeralIn struct { + AppName string `json:"app_name" saga:"app_name"` + StreamID string `json:"stream_id" saga:"stream_id"` + AppID string `json:"app_id" saga:"app_id"` + EphemeralLabel string `json:"ephemeral_label,omitempty" saga:"ephemeral_label,optional"` + EphemeralTTL string `json:"ephemeral_ttl,omitempty" saga:"ephemeral_ttl,optional"` +} + +type handleEphemeralOut struct { + ExpiresAt string `json:"ephemeral_expires_at,omitempty" saga:"ephemeral_expires_at"` +} + +func handleEphemeral(ctx context.Context, in handleEphemeralIn) (handleEphemeralOut, error) { + if in.EphemeralLabel == "" { + return handleEphemeralOut{}, nil + } + + deps := saga.Get[*buildSagaDeps](ctx) + b := deps.builder + status := deps.statuses.SenderFor(in.StreamID) + + if err := ephemeralx.ValidateLabel(in.EphemeralLabel); err != nil { + status.SendError("invalid ephemeral label: %v", err) + return handleEphemeralOut{}, fmt.Errorf("invalid ephemeral label: %w", err) + } + + ttl := in.EphemeralTTL + if ttl == "" { + ttl = "24h" + } + ttlDuration, err := time.ParseDuration(ttl) + if err != nil { + status.SendError("invalid ephemeral TTL %q: %v", ttl, err) + return handleEphemeralOut{}, fmt.Errorf("invalid ephemeral TTL %q: %w", ttl, err) + } + if ttlDuration <= 0 { + status.SendError("invalid ephemeral TTL %q: must be greater than 0", ttl) + return handleEphemeralOut{}, fmt.Errorf("invalid ephemeral TTL %q: must be greater than 0", ttl) + } + + if err := ephemeralx.ReplaceExisting(ctx, b.ec.EAC(), entity.Id(in.AppID), in.EphemeralLabel, b.Log); err != nil { + return handleEphemeralOut{}, fmt.Errorf("failed to replace existing ephemeral version %q: %w", in.EphemeralLabel, err) + } + if err := ephemeralx.EnforceLimit(ctx, b.ec.EAC(), entity.Id(in.AppID), ephemeralx.DefaultMaxEphemeral, b.Log); err != nil { + return handleEphemeralOut{}, fmt.Errorf("failed to enforce ephemeral limit: %w", err) + } + + expiresAt := time.Now().Add(ttlDuration).Format(time.RFC3339) + return handleEphemeralOut{ExpiresAt: expiresAt}, nil +} + +func undoHandleEphemeral(_ context.Context, _ handleEphemeralIn, _ handleEphemeralOut) error { + return nil +} + +// createConfigVersion creates the ConfigVersion entity that holds the +// new app's full ConfigSpec. AppVersion references it by ID, so it +// must exist before createAppVersion runs. Failures here surface as +// the saga error; recovery would retry creation with the same name +// and a deterministic-enough ConfigSpec to be idempotent in practice, +// though the entity store's create-if-missing semantics carry the +// guarantee. + +type createConfigVersionIn struct { + AppID string `json:"app_id" saga:"app_id"` + VersionName string `json:"version_name" saga:"version_name"` + ConfigSpec string `json:"config_spec_json" saga:"config_spec_json"` +} + +type createConfigVersionOut struct { + ConfigVersionID string `json:"config_version_id" saga:"config_version_id"` +} + +func createConfigVersion(ctx context.Context, in createConfigVersionIn) (createConfigVersionOut, error) { + deps := saga.Get[*buildSagaDeps](ctx) + b := deps.builder + + spec, err := unmarshalConfigSpec(in.ConfigSpec) + if err != nil { + return createConfigVersionOut{}, fmt.Errorf("deserializing config spec: %w", err) + } + + cv := &core_v1alpha.ConfigVersion{ + App: entity.Id(in.AppID), + Spec: spec, + } + name := in.VersionName + "-cfg" + id, err := b.ec.Create(ctx, name, cv) + if err != nil { + return createConfigVersionOut{}, fmt.Errorf("creating config version %s: %w", name, err) + } + return createConfigVersionOut{ConfigVersionID: string(id)}, nil +} + +func undoCreateConfigVersion(ctx context.Context, _ createConfigVersionIn, out createConfigVersionOut) error { + if out.ConfigVersionID == "" { + return nil + } + deps := saga.Get[*buildSagaDeps](ctx) + if err := deps.builder.ec.Delete(ctx, entity.Id(out.ConfigVersionID)); err != nil { + return fmt.Errorf("deleting config version %s: %w", out.ConfigVersionID, err) + } + return nil +} + +// createVersion creates the AppVersion entity that pins together the +// artifact, image URL, config version, and ephemeral metadata. This is +// the durable "the build succeeded" record — once it exists, the +// downstream activate / addon-provisioning steps have something to +// reference. Failures compensate by deleting it. + +type createVersionIn struct { + AppID string `json:"app_id" saga:"app_id"` + VersionName string `json:"version_name" saga:"version_name"` + FinalImageURL string `json:"final_image_url" saga:"final_image_url"` + ArtifactID string `json:"artifact_id" saga:"artifact_id"` + AdminToken string `json:"admin_token" saga:"admin_token"` + ConfigVersionID string `json:"config_version_id" saga:"config_version_id"` + EphemeralLabel string `json:"ephemeral_label,omitempty" saga:"ephemeral_label,optional"` + EphemeralTTL string `json:"ephemeral_ttl,omitempty" saga:"ephemeral_ttl,optional"` + EphemeralExpiresAt string `json:"ephemeral_expires_at,omitempty" saga:"ephemeral_expires_at,optional"` +} + +type createVersionOut struct { + AppVersionID string `json:"app_version_id" saga:"app_version_id"` +} + +func createVersion(ctx context.Context, in createVersionIn) (createVersionOut, error) { + deps := saga.Get[*buildSagaDeps](ctx) + b := deps.builder + + av := &core_v1alpha.AppVersion{ + App: entity.Id(in.AppID), + Version: in.VersionName, + ImageUrl: in.FinalImageURL, + AdminToken: in.AdminToken, + Artifact: entity.Id(in.ArtifactID), + ConfigVersion: entity.Id(in.ConfigVersionID), + Config: core_v1alpha.Config{}, + } + if in.EphemeralLabel != "" { + av.EphemeralLabel = in.EphemeralLabel + av.EphemeralTtl = in.EphemeralTTL + if in.EphemeralExpiresAt != "" { + if t, err := time.Parse(time.RFC3339, in.EphemeralExpiresAt); err == nil { + av.EphemeralExpiresAt = t + } + } + } + + id, err := b.ec.Create(ctx, in.VersionName, av) + if err != nil { + return createVersionOut{}, fmt.Errorf("creating app version %s: %w", in.VersionName, err) + } + return createVersionOut{AppVersionID: string(id)}, nil +} + +func undoCreateVersion(ctx context.Context, _ createVersionIn, out createVersionOut) error { + if out.AppVersionID == "" { + return nil + } + deps := saga.Get[*buildSagaDeps](ctx) + if err := deps.builder.ec.Delete(ctx, entity.Id(out.AppVersionID)); err != nil { + return fmt.Errorf("deleting app version %s: %w", out.AppVersionID, err) + } + return nil +} + +// provisionAddons calls into the addons client to materialize the +// addons declared in app.toml. Skipped for ephemeral deploys (which +// don't get addons) and when there's no app config at all. The undo +// is a no-op: provisionAddons handles "already attached" gracefully +// on retry, and removing addons created during a build would surprise +// users running concurrent ops against the same app. + +type provisionAddonsIn struct { + AppName string `json:"app_name" saga:"app_name"` + AppConfig *appconfig.AppConfig `json:"app_config,omitempty" saga:"app_config,optional"` + EphemeralLabel string `json:"ephemeral_label,omitempty" saga:"ephemeral_label,optional"` + // AppVersionID is consumed only to anchor this action after + // createVersion in the saga DAG; addons are scoped to the app, + // not the version, so we don't actually need the ID at runtime. + AppVersionID string `json:"app_version_id" saga:"app_version_id"` +} + +type provisionAddonsOut struct { + Done saga.Edge `saga:"addons_provisioned"` +} + +func provisionAddons(ctx context.Context, in provisionAddonsIn) (provisionAddonsOut, error) { + if in.EphemeralLabel != "" || in.AppConfig == nil { + return provisionAddonsOut{}, nil + } + deps := saga.Get[*buildSagaDeps](ctx) + if deps.builder.addonsClient == nil { + return provisionAddonsOut{}, nil + } + if err := deps.builder.provisionAddons(ctx, in.AppName, in.AppConfig); err != nil { + return provisionAddonsOut{}, fmt.Errorf("addon provisioning failed: %w", err) + } + return provisionAddonsOut{}, nil +} + +func undoProvisionAddons(_ context.Context, _ provisionAddonsIn, _ provisionAddonsOut) error { + return nil +} + +// setActiveVersion makes the newly-created AppVersion the app's active +// one. Skipped for ephemeral deploys (their whole point is to coexist +// with the active version, not replace it). Records the previous +// active version so undo can restore it on failure. + +type setActiveVersionIn struct { + AppName string `json:"app_name" saga:"app_name"` + AppVersionID string `json:"app_version_id" saga:"app_version_id"` + EphemeralLabel string `json:"ephemeral_label,omitempty" saga:"ephemeral_label,optional"` + AddonsReady saga.Edge `saga:"addons_provisioned"` +} + +type setActiveVersionOut struct { + PreviousVersionID string `json:"previous_version_id,omitempty" saga:"previous_version_id"` + Skipped bool `json:"skipped" saga:"set_active_skipped"` +} + +func setActiveVersion(ctx context.Context, in setActiveVersionIn) (setActiveVersionOut, error) { + if in.EphemeralLabel != "" { + return setActiveVersionOut{Skipped: true}, nil + } + + deps := saga.Get[*buildSagaDeps](ctx) + b := deps.builder + + app, err := b.appClient.GetByName(ctx, in.AppName) + if err != nil { + return setActiveVersionOut{}, fmt.Errorf("looking up app %s: %w", in.AppName, err) + } + previous := string(app.ActiveVersion) + + if err := b.appClient.SetActiveVersion(ctx, in.AppName, in.AppVersionID); err != nil { + return setActiveVersionOut{}, fmt.Errorf("setting active version on %s: %w", in.AppName, err) + } + return setActiveVersionOut{PreviousVersionID: previous}, nil +} + +func undoSetActiveVersion(ctx context.Context, in setActiveVersionIn, out setActiveVersionOut) error { + if out.Skipped { + return nil + } + deps := saga.Get[*buildSagaDeps](ctx) + if err := deps.builder.appClient.SetActiveVersion(ctx, in.AppName, out.PreviousVersionID); err != nil { + return fmt.Errorf("restoring previous active version on %s: %w", in.AppName, err) + } + return nil +} + +// finalize is the saga's terminal step. It writes the deployment log +// entry, runs the local-storage-migration check (non-ephemeral only), +// and tells the StreamRegistry it's safe to remove the staged source. +// Everything here is best-effort or idempotent; the undo is a no-op +// because once we got this far the build succeeded — there's nothing +// to roll back. + +type finalizeIn struct { + AppName string `json:"app_name" saga:"app_name"` + StreamID string `json:"stream_id" saga:"stream_id"` + AppID string `json:"app_id" saga:"app_id"` + VersionName string `json:"version_name" saga:"version_name"` + ArtifactID string `json:"artifact_id" saga:"artifact_id"` + ConfigSpec string `json:"config_spec_json" saga:"config_spec_json"` + EphemeralLabel string `json:"ephemeral_label,omitempty" saga:"ephemeral_label,optional"` + ActiveReady saga.Edge `saga:"set_active_skipped"` +} + +type finalizeOut struct{} + +func finalize(ctx context.Context, in finalizeIn) (finalizeOut, error) { + deps := saga.Get[*buildSagaDeps](ctx) + b := deps.builder + + if in.EphemeralLabel == "" { + spec, err := unmarshalConfigSpec(in.ConfigSpec) + if err == nil { + // checkLocalStorageMigration takes a *SendStreamClient; its + // signature predates StatusSender. Pass nil for now and + // route the migration-warning UX through the sender once + // the function grows a StatusSender-shaped overload. Saga + // deploys skip that warning until then — accepted as a + // temporary regression on the flagged path. + b.checkLocalStorageMigration(ctx, entity.Id(in.AppID), spec, nil) + } + } + + artifactName := strings.TrimPrefix(in.ArtifactID, "artifact/") + b.logDeployment(ctx, in.AppName, in.VersionName, artifactName) + + if err := deps.streams.Cleanup(in.StreamID); err != nil { + b.Log.Warn("cleanup staged tar", "stream", in.StreamID, "error", err) + } + return finalizeOut{}, nil +} + +func undoFinalize(_ context.Context, _ finalizeIn, _ finalizeOut) error { + return nil +} + +// detectBuildStack assembles the BuildStack the same way buildFromDir +// does (app.toml.build > Dockerfile.miren > auto) and performs the +// supported-stack check for auto so the saga can fail fast rather than +// waste a buildkit launch. Extracted here so the saga action and the +// pre-saga path share one source of truth. +func (b *Builder) detectBuildStack(path string, ac *appconfig.AppConfig, name string, _ fsutil.FS) (BuildStack, error) { + var stack BuildStack + stack.CodeDir = path + + if ac != nil && ac.Build != nil { + stack.OnBuild = ac.Build.OnBuild + stack.Version = ac.Build.Version + stack.AlpineImage = ac.Build.AlpineImage + + if ac.Build.Dockerfile != "" { + stack.Stack = "dockerfile" + stack.Input = ac.Build.Dockerfile + b.Log.Info("using dockerfile from app config", "dockerfile", ac.Build.Dockerfile) + } + } + + if stack.Stack == "" { + // Look on disk rather than through fsutil so test/error paths + // don't have to fabricate an fsutil.FS just to peek for a file. + if _, err := osStat(path, "Dockerfile.miren"); err == nil { + stack.Stack = "dockerfile" + stack.Input = "Dockerfile.miren" + } else { + stack.Stack = "auto" + } + } + + if stack.Stack == "auto" { + detectOpts := stackbuild.BuildOptions{ + Log: b.Log, + Name: name, + OnBuild: stack.OnBuild, + Version: stack.Version, + AlpineImage: stack.AlpineImage, + } + if _, err := stackbuild.DetectStack(stack.CodeDir, detectOpts); err != nil { + b.Log.Error("stack detection failed", "error", err, "app", name, "codeDir", stack.CodeDir) + return stack, fmt.Errorf("no supported stack detected for app %s: %w", name, err) + } + b.Log.Debug("stack detection successful") + } + + return stack, nil +} + +// registerBuildSaga assembles the build-from-tar saga definition with all +// actions wired into the given registry. Mirrors the registerCreateSandboxSaga +// shape from controllers/sandbox/create_saga.go so both sagas register the +// same way at server startup. +func registerBuildSaga( + registry *saga.Registry, + builder *Builder, + streams *StreamRegistry, + statuses *StatusRegistry, + log *slog.Logger, +) error { + deps := &buildSagaDeps{ + builder: builder, + streams: streams, + statuses: statuses, + } + + return saga.Define(sagaBuildFromTar). + Using(deps). + Using(log). + Action(actionReceiveTar, receiveTar).Undo(undoReceiveTar). + Action(actionLoadSource, loadSource).Undo(undoLoadSource). + Action(actionGetNextVer, getNextVersion).Undo(undoGetNextVersion). + Action(actionBuildImage, buildImage).Undo(undoBuildImage). + Action(actionPrepareConfig, prepareConfig).Undo(undoPrepareConfig). + Action(actionHandleEphemera, handleEphemeral).Undo(undoHandleEphemeral). + Action(actionCreateConfigVer, createConfigVersion).Undo(undoCreateConfigVersion). + Action(actionCreateVersion, createVersion).Undo(undoCreateVersion). + Action(actionProvisionAddons, provisionAddons).Undo(undoProvisionAddons). + Action(actionSetActiveVer, setActiveVersion).Undo(undoSetActiveVersion). + Action(actionFinalize, finalize).Undo(undoFinalize). + RegisterTo(registry) +} + +// osStat returns os.Stat for a sub-path under base. Tiny wrapper so the +// stack detection logic stays readable. +func osStat(base, name string) (os.FileInfo, error) { + return os.Stat(filepath.Join(base, name)) +} + +// marshalConfigSpec serializes a ConfigSpec to JSON for transport through +// the saga's input/output map. Returns "" for the zero value so downstream +// actions can branch on emptiness without unmarshaling. +func marshalConfigSpec(spec core_v1alpha.ConfigSpec) (string, error) { + // Treat the zero value as empty to keep the saga log compact for + // first-deploy cases. + zero := core_v1alpha.ConfigSpec{} + if isEmptyConfigSpec(spec, zero) { + return "", nil + } + data, err := json.Marshal(spec) + if err != nil { + return "", err + } + return string(data), nil +} + +// unmarshalConfigSpec is the round-trip partner of marshalConfigSpec. +// Empty string means the caller passed a zero ConfigSpec. +func unmarshalConfigSpec(s string) (core_v1alpha.ConfigSpec, error) { + if s == "" { + return core_v1alpha.ConfigSpec{}, nil + } + var spec core_v1alpha.ConfigSpec + if err := json.Unmarshal([]byte(s), &spec); err != nil { + return core_v1alpha.ConfigSpec{}, err + } + return spec, nil +} + +// isEmptyConfigSpec checks whether spec equals zero by JSON shape. Using +// JSON comparison sidesteps the unexported-field issues reflect.DeepEqual +// hits with generated types. +func isEmptyConfigSpec(spec, zero core_v1alpha.ConfigSpec) bool { + a, err := json.Marshal(spec) + if err != nil { + return false + } + b, err := json.Marshal(zero) + if err != nil { + return false + } + return string(a) == string(b) +} diff --git a/servers/build/build_saga_buildkit.go b/servers/build/build_saga_buildkit.go new file mode 100644 index 000000000..b09dead66 --- /dev/null +++ b/servers/build/build_saga_buildkit.go @@ -0,0 +1,243 @@ +package build + +import ( + "context" + "fmt" + "strings" + + "github.com/moby/buildkit/client" + "github.com/tonistiigi/fsutil" + + "miren.dev/runtime/api/build/build_v1alpha" + "miren.dev/runtime/api/core/core_v1alpha" + "miren.dev/runtime/appconfig" + "miren.dev/runtime/pkg/entity" + "miren.dev/runtime/pkg/saga" +) + +// buildImage runs the actual container build via BuildKit and locates +// the resulting artifact entity. This is the long-running step — for +// real apps it dominates total saga wall time. The undo is a no-op: +// images produced by a failed build linger in the registry but they're +// cheap, idempotent, and the next build either reuses them by digest +// or supersedes them. +// +// Recovery resumes here by re-running the build. That's safe because +// buildkit deduplicates by content digest, so a repeated build of the +// same source produces the same artifact entity (with the same ID). +// The action's output (manifest_digest, artifact_id) round-trips +// through the saga log so prepareConfig and downstream actions see +// stable values across crashes. + +type buildImageIn struct { + AppName string `json:"app_name" saga:"app_name"` + StreamID string `json:"stream_id" saga:"stream_id"` + SourceDir string `json:"source_dir" saga:"source_dir"` + BuildStack BuildStack `json:"build_stack" saga:"build_stack"` + VersionName string `json:"version_name" saga:"version_name"` + ImageURL string `json:"image_url" saga:"image_url"` + // AppConfig is nil when the app has no app.toml; the loadSource + // output omits the key in that case, so the saga input is optional. + AppConfig *appconfig.AppConfig `json:"app_config,omitempty" saga:"app_config,optional"` + // ExistingConfig comes from getNextVersion; empty string is + // the marshaled form of a zero ConfigSpec (first deploy). + ExistingConfig string `json:"existing_config_json" saga:"existing_config_json"` + // CLIEnvVars is an initial input from the deploy CLI's -e flags. + // Empty when the user didn't pass any, so optional. + CLIEnvVars []*build_v1alpha.EnvironmentVariable `json:"cli_env_vars,omitempty" saga:"cli_env_vars,optional"` + AppID string `json:"app_id" saga:"app_id"` +} + +type buildImageOut struct { + ManifestDigest string `json:"manifest_digest" saga:"manifest_digest"` + ArtifactID string `json:"artifact_id" saga:"artifact_id"` + FinalImageURL string `json:"final_image_url" saga:"final_image_url"` + BuildResult *BuildResult `json:"build_result,omitempty" saga:"build_result"` +} + +func buildImage(ctx context.Context, in buildImageIn) (buildImageOut, error) { + deps := saga.Get[*buildSagaDeps](ctx) + b := deps.builder + status := deps.statuses.SenderFor(in.StreamID) + + if b.BuildKit == nil { + status.SendError("BuildKit not configured - ensure server is running with BuildKit enabled") + return buildImageOut{}, fmt.Errorf("buildkit component not configured") + } + + existing, err := unmarshalConfigSpec(in.ExistingConfig) + if err != nil { + return buildImageOut{}, fmt.Errorf("deserializing existing config: %w", err) + } + + buildLog := &buildLogWriter{ + log: b.Log, + writer: b.LogWriter, + entityID: in.AppID, + version: in.VersionName, + } + + res, artifactID, finalURL, err := b.runBuildkitBuild(ctx, runBuildkitBuildInputs{ + SourceDir: in.SourceDir, + AppName: in.AppName, + VersionName: in.VersionName, + ImageURL: in.ImageURL, + BuildStack: in.BuildStack, + AppConfig: in.AppConfig, + ExistingConfig: existing, + CLIEnvVars: in.CLIEnvVars, + }, status, buildLog) + if err != nil { + return buildImageOut{}, err + } + + return buildImageOut{ + ManifestDigest: res.ManifestDigest, + ArtifactID: artifactID, + FinalImageURL: finalURL, + BuildResult: res, + }, nil +} + +func undoBuildImage(_ context.Context, _ buildImageIn, _ buildImageOut) error { + return nil +} + +// runBuildkitBuildInputs bundles everything runBuildkitBuild needs. +// Keeping this as a struct (vs. a long parameter list) makes the +// caller readable and lets future arguments slot in without touching +// every call site. +type runBuildkitBuildInputs struct { + SourceDir string + AppName string + VersionName string + ImageURL string + BuildStack BuildStack + AppConfig *appconfig.AppConfig + ExistingConfig core_v1alpha.ConfigSpec + CLIEnvVars []*build_v1alpha.EnvironmentVariable +} + +// runBuildkitBuild connects to BuildKit, runs the actual image build +// with the right transform options + status callbacks, and locates the +// resulting Artifact entity. Returns the build result, artifact ID, +// and the registry image URL adjusted to match the artifact (which +// may have been reused via content-digest deduplication). +// +// Extracted here so both the pre-saga buildFromDir path and the +// buildImage saga action share a single implementation. status may be +// noop and buildLog may have a noop writer. +func (b *Builder) runBuildkitBuild( + ctx context.Context, + in runBuildkitBuildInputs, + status StatusSender, + buildLog *buildLogWriter, +) (*BuildResult, string, string, error) { + tr, err := fsutil.NewFS(in.SourceDir) + if err != nil { + return nil, "", "", fmt.Errorf("opening source dir %s: %w", in.SourceDir, err) + } + + b.Log.Info("connecting to buildkit daemon") + bkc, err := b.BuildKit.Client(ctx) + if err != nil { + b.Log.Error("failed to get buildkit client", "error", err) + status.SendError("Failed to connect to BuildKit: %v", err) + return nil, "", "", err + } + defer bkc.Close() + + if ci, err := bkc.Info(ctx); err != nil { + b.Log.Error("error getting buildkitd info", "error", err) + } else { + b.Log.Debug("buildkitd info", "version", ci.BuildkitVersion.Version, "rev", ci.BuildkitVersion.Revision) + } + + bk := &Buildkit{Client: bkc, Log: b.Log} + + buildEnvVars := computeBuildEnvVars(in.ExistingConfig.Variables, in.AppConfig, in.CLIEnvVars) + if len(buildEnvVars) > 0 { + b.Log.Info("injecting env vars into build", "count", len(buildEnvVars)) + } + + tos := []TransformOptions{ + WithBuildArg("MIREN_VERSION", in.VersionName), + } + if len(buildEnvVars) > 0 { + tos = append(tos, WithBuildArgs(buildEnvVars)) + } + + // Pass env vars for auto-stack builds. We have to copy the stack so + // we don't mutate the saga input. + stack := in.BuildStack + stack.EnvVars = buildEnvVars + + tos = append(tos, WithPhaseUpdates(func(phase string) { + status.SendPhase(phase) + })) + + vertexStarted := map[string]bool{} + vertexCompleted := map[string]bool{} + tos = append(tos, WithStatusUpdates(func(ss *client.SolveStatus, sj []byte) { + for _, v := range ss.Vertexes { + digestStr := v.Digest.String() + if v.Started != nil && !vertexStarted[digestStr] { + vertexStarted[digestStr] = true + buildLog.write(fmt.Sprintf("[buildkit] %s", v.Name)) + } + if v.Completed != nil && !vertexCompleted[digestStr] { + vertexCompleted[digestStr] = true + if v.Cached { + buildLog.write(fmt.Sprintf("[buildkit] %s CACHED", v.Name)) + } + } + } + for _, log := range ss.Logs { + if log.Data != nil { + lines := strings.Split(string(log.Data), "\n") + for _, line := range lines { + line = strings.TrimRight(line, " \t\r\n") + if strings.TrimSpace(line) != "" { + buildLog.write(line) + } + } + } + } + status.SendBuildkit(sj) + })) + + status.SendMessage("Calculating build") + + res, err := bk.BuildImage(ctx, tr, stack, in.AppName, in.ImageURL, tos...) + if err != nil { + b.Log.Error("error building image", "error", err) + status.SendError("Error building image: %v", err) + return nil, "", "", err + } + + for _, event := range res.DetectionEvents { + buildLog.write(fmt.Sprintf("[detect] %s: %s", event.Name, event.Message)) + } + + if res.ManifestDigest == "" { + b.Log.Error("build did not return manifest digest") + status.SendError("Build did not return manifest digest") + return nil, "", "", fmt.Errorf("build did not return manifest digest") + } + + var artifact core_v1alpha.Artifact + if err := b.ec.OneAtIndex(ctx, + entity.String(core_v1alpha.ArtifactManifestDigestId, res.ManifestDigest), + &artifact); err != nil { + b.Log.Error("error locating artifact by digest", "digest", res.ManifestDigest, "error", err) + return nil, "", "", fmt.Errorf("error locating artifact by digest %s: %w", res.ManifestDigest, err) + } + b.Log.Debug("located stored artifact", "artifact", artifact.ID, "digest", res.ManifestDigest) + + // The artifact may have been reused by digest, so adjust the image + // URL to point at the canonical artifact name in the registry. + artifactName := strings.TrimPrefix(string(artifact.ID), "artifact/") + finalURL := "cluster.local:5000/" + in.AppName + ":" + artifactName + + return res, string(artifact.ID), finalURL, nil +} diff --git a/servers/build/build_saga_test.go b/servers/build/build_saga_test.go new file mode 100644 index 000000000..6b421cc83 --- /dev/null +++ b/servers/build/build_saga_test.go @@ -0,0 +1,366 @@ +package build + +import ( + "context" + "encoding/json" + "errors" + "log/slog" + "os" + "strings" + "testing" + + "miren.dev/runtime/api/app" + "miren.dev/runtime/api/core/core_v1alpha" + "miren.dev/runtime/api/entityserver" + "miren.dev/runtime/api/entityserver/entityserver_v1alpha" + "miren.dev/runtime/pkg/entity" + "miren.dev/runtime/pkg/entity/testutils" + "miren.dev/runtime/pkg/rpc" + "miren.dev/runtime/pkg/saga" +) + +// sagaTestHarness bundles the infrastructure each saga test needs: +// in-memory entity server, a Builder configured against it, a fresh +// StreamRegistry, and a registry+executor wired to the build-from-tar +// definition. Keeps each test self-contained and avoids global state. +type sagaTestHarness struct { + t *testing.T + inmem *testutils.InMemEntityServer + builder *Builder + streams *StreamRegistry + statuses *StatusRegistry + registry *saga.Registry + executor *saga.Executor +} + +func newSagaTestHarness(t *testing.T) *sagaTestHarness { + t.Helper() + log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + + inmem, cleanup := testutils.NewInMemEntityServer(t) + t.Cleanup(cleanup) + + tempDir := t.TempDir() + + rpcClient := rpc.LocalClient(entityserver_v1alpha.AdaptEntityAccess(inmem.Server)) + builder := &Builder{ + Log: log, + EAS: inmem.EAC, + ec: entityserver.NewClient(log, inmem.EAC), + appClient: app.NewClient(log, rpcClient), + TempDir: tempDir, + cacheLocks: newAppLocks(), + } + + streams := NewStreamRegistry(tempDir, log) + statuses := NewStatusRegistry() + + registry := saga.NewRegistry() + // Use a test-mode registration that swaps in a stub buildImage so + // we don't need a real BuildKit component for unit tests. The + // stub returns a synthetic digest/artifact ID so downstream actions + // have something deterministic to work with. The real buildImage + // path is exercised by blackbox tests under iso. + deps := &buildSagaDeps{builder: builder, streams: streams, statuses: statuses} + if err := saga.Define(sagaBuildFromTar). + Using(deps). + Using(log). + Action(actionReceiveTar, receiveTar).Undo(undoReceiveTar). + Action(actionLoadSource, loadSource).Undo(undoLoadSource). + Action(actionGetNextVer, getNextVersion).Undo(undoGetNextVersion). + Action(actionBuildImage, stubBuildImage).Undo(undoBuildImage). + Action(actionPrepareConfig, prepareConfig).Undo(undoPrepareConfig). + Action(actionHandleEphemera, handleEphemeral).Undo(undoHandleEphemeral). + Action(actionCreateConfigVer, createConfigVersion).Undo(undoCreateConfigVersion). + Action(actionCreateVersion, createVersion).Undo(undoCreateVersion). + Action(actionProvisionAddons, provisionAddons).Undo(undoProvisionAddons). + Action(actionSetActiveVer, setActiveVersion).Undo(undoSetActiveVersion). + Action(actionFinalize, finalize).Undo(undoFinalize). + RegisterTo(registry); err != nil { + t.Fatalf("registering build saga: %v", err) + } + + executor := saga.NewExecutor( + saga.NewMemoryStorage(), + saga.WithRegistry(registry), + saga.WithLogger(log), + ) + + return &sagaTestHarness{ + t: t, + inmem: inmem, + builder: builder, + streams: streams, + statuses: statuses, + registry: registry, + executor: executor, + } +} + +// stubBuildImage replaces the real buildImage action in unit tests so +// we don't need a live BuildKit daemon. It returns a deterministic +// digest/artifact ID derived from the version name so the test can +// assert on what downstream actions saw, and it pre-creates the +// matching Artifact entity so any "locate artifact" code path the +// real action would have walked finds something. Real buildImage is +// exercised by blackbox tests under iso. +func stubBuildImage(ctx context.Context, in buildImageIn) (buildImageOut, error) { + deps := saga.Get[*buildSagaDeps](ctx) + + digest := "sha256:" + in.VersionName + "-digest" + // Including VersionName (which itself carries a random suffix from + // nextVersion) keeps the entity name unique when a test runs the + // saga twice against the same in-memory store. + artifactName := in.AppName + "-" + in.VersionName + "-stub" + artifact := &core_v1alpha.Artifact{ + ManifestDigest: digest, + App: entity.Id(in.AppID), + Status: core_v1alpha.ACTIVE, + } + id, err := deps.builder.ec.Create(ctx, artifactName, artifact) + if err != nil { + return buildImageOut{}, err + } + return buildImageOut{ + ManifestDigest: digest, + ArtifactID: string(id), + FinalImageURL: "cluster.local:5000/" + in.AppName + ":" + artifactName, + BuildResult: &BuildResult{ + ManifestDigest: digest, + Entrypoint: "echo hi", + Command: "", + WorkingDir: "/app", + }, + }, nil +} + +// dockerfileTarball returns a tar containing the bare minimum to satisfy +// every saga action up through getNextVersion: a .miren/app.toml at the +// path appconfig.AppConfigPath expects, and a Dockerfile.miren so stack +// detection short-circuits on the dockerfile path without invoking the +// auto-detector (which would fail without a recognized stack). +func dockerfileTarball(t *testing.T) map[string]string { + t.Helper() + return map[string]string{ + ".miren/app.toml": "name = 'demo'\n", + "Dockerfile.miren": "FROM alpine\nCMD echo hi\n", + "Procfile": "web: echo hi\n", + } +} + +func TestBuildSaga_HappyPath_RunsFullPipeline(t *testing.T) { + ctx := context.Background() + + h := newSagaTestHarness(t) + h.streams.Register("stream-1", makeTar(t, dockerfileTarball(t))) + + err := h.executor.Start(sagaBuildFromTar). + Input("app_name", "demo"). + Input("stream_id", "stream-1"). + WithID("test-happy-path"). + Execute(ctx) + if err != nil { + t.Fatalf("saga: %v", err) + } + + // finalize cleans up the staged tar on success — the registry + // should no longer have a path for this ID. + if _, ok := h.streams.StagedPath("stream-1"); ok { + t.Error("expected staged source to be cleaned up by finalize") + } + + // get-next-version creates the App entity on first deploy. + var application core_v1alpha.App + if err := h.builder.ec.Get(ctx, "demo", &application); err != nil { + t.Fatalf("expected app entity 'demo' to exist: %v", err) + } + // set-active-version should have populated the new version. + if application.ActiveVersion == "" { + t.Error("expected app to have an active version after build saga") + } +} + +func TestBuildSaga_ReceiveTar_EmitsStatusUpdates(t *testing.T) { + ctx := context.Background() + + h := newSagaTestHarness(t) + h.streams.Register("stream-status", makeTar(t, dockerfileTarball(t))) + + rec := &recordingSender{} + h.statuses.Register("stream-status", rec) + t.Cleanup(func() { h.statuses.Unregister("stream-status") }) + + err := h.executor.Start(sagaBuildFromTar). + Input("app_name", "demo"). + Input("stream_id", "stream-status"). + WithID("test-status-emit"). + Execute(ctx) + if err != nil { + t.Fatalf("saga: %v", err) + } + + // receive-tar should have emitted both progress messages bracketing + // the staging work. We don't pin exact equality on the entire slice + // (later actions might emit too as the saga grows) — just verify + // these two appeared in order. + wantInOrder := []string{"Reading application data", "Launching builder"} + if !containsInOrder(rec.Messages, wantInOrder) { + t.Errorf("missing expected messages in order: got %v, want subsequence %v", rec.Messages, wantInOrder) + } +} + +// containsInOrder checks that all of `want` appear in `got` in the +// listed order (allowing other entries in between). Used so tests for +// individual actions tolerate additional status messages emitted by +// later actions in the same saga. +func containsInOrder(got, want []string) bool { + i := 0 + for _, g := range got { + if i < len(want) && g == want[i] { + i++ + } + } + return i == len(want) +} + +// TestBuildSaga_FailedActivate_CompensatesEntities verifies the core +// saga guarantee: when a late action fails, prior entity-creating +// actions undo themselves in reverse. set-active-version is the last +// real side-effecting step; swapping it with a failing version should +// trigger create-version and create-config-version compensations, +// leaving the entity store free of orphaned ConfigVersion / AppVersion +// rows. +func TestBuildSaga_FailedActivate_CompensatesEntities(t *testing.T) { + ctx := context.Background() + + log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + inmem, cleanup := testutils.NewInMemEntityServer(t) + t.Cleanup(cleanup) + + rpcClient := rpc.LocalClient(entityserver_v1alpha.AdaptEntityAccess(inmem.Server)) + tempDir := t.TempDir() + builder := &Builder{ + Log: log, + EAS: inmem.EAC, + ec: entityserver.NewClient(log, inmem.EAC), + appClient: app.NewClient(log, rpcClient), + TempDir: tempDir, + cacheLocks: newAppLocks(), + } + streams := NewStreamRegistry(tempDir, log) + statuses := NewStatusRegistry() + streams.Register("stream-fail", makeTar(t, dockerfileTarball(t))) + + // Swap setActiveVersion with a deterministic failure. Same signature, + // same input/output keys — the framework can't tell the difference, + // but the saga compensates everything after createConfigVersion. + failingSetActive := func(ctx context.Context, in setActiveVersionIn) (setActiveVersionOut, error) { + return setActiveVersionOut{}, errSimulatedActivate + } + + registry := saga.NewRegistry() + deps := &buildSagaDeps{builder: builder, streams: streams, statuses: statuses} + if err := saga.Define(sagaBuildFromTar). + Using(deps). + Using(log). + Action(actionReceiveTar, receiveTar).Undo(undoReceiveTar). + Action(actionLoadSource, loadSource).Undo(undoLoadSource). + Action(actionGetNextVer, getNextVersion).Undo(undoGetNextVersion). + Action(actionBuildImage, stubBuildImage).Undo(undoBuildImage). + Action(actionPrepareConfig, prepareConfig).Undo(undoPrepareConfig). + Action(actionHandleEphemera, handleEphemeral).Undo(undoHandleEphemeral). + Action(actionCreateConfigVer, createConfigVersion).Undo(undoCreateConfigVersion). + Action(actionCreateVersion, createVersion).Undo(undoCreateVersion). + Action(actionProvisionAddons, provisionAddons).Undo(undoProvisionAddons). + Action(actionSetActiveVer, failingSetActive).Undo(undoSetActiveVersion). + Action(actionFinalize, finalize).Undo(undoFinalize). + RegisterTo(registry); err != nil { + t.Fatalf("registering build saga: %v", err) + } + + storage := saga.NewMemoryStorage() + executor := saga.NewExecutor( + storage, + saga.WithRegistry(registry), + saga.WithLogger(log), + ) + + err := executor.Start(sagaBuildFromTar). + Input("app_name", "demo"). + Input("stream_id", "stream-fail"). + WithID("test-failed-activate"). + Execute(ctx) + if err == nil { + t.Fatal("saga should have failed when set-active-version errors") + } + + // Pull the saga execution back and verify both create-config-version + // and create-version recorded UndoneAt timestamps. Then pull the + // created entity IDs out of each action's output and confirm the + // entities themselves are actually gone — UndoneAt only says we + // called the undo, not that it succeeded. + exec, err := storage.Get(ctx, "test-failed-activate") + if err != nil { + t.Fatalf("loading saga execution: %v", err) + } + for _, action := range []string{actionCreateConfigVer, actionCreateVersion} { + r := exec.ExecutedActions[action] + if r == nil { + t.Errorf("expected %s to have run before failure", action) + continue + } + if r.UndoneAt == nil { + t.Errorf("expected %s to be undone, but UndoneAt is nil", action) + } + } + + var cvOut createConfigVersionOut + if r := exec.ExecutedActions[actionCreateConfigVer]; r != nil && len(r.Output) > 0 { + if err := json.Unmarshal(r.Output, &cvOut); err == nil && cvOut.ConfigVersionID != "" { + var dummy core_v1alpha.ConfigVersion + if err := builder.ec.GetById(ctx, entity.Id(cvOut.ConfigVersionID), &dummy); err == nil { + t.Errorf("expected ConfigVersion %s to be deleted, but Get succeeded", cvOut.ConfigVersionID) + } + } + } + var avOut createVersionOut + if r := exec.ExecutedActions[actionCreateVersion]; r != nil && len(r.Output) > 0 { + if err := json.Unmarshal(r.Output, &avOut); err == nil && avOut.AppVersionID != "" { + var dummy core_v1alpha.AppVersion + if err := builder.ec.GetById(ctx, entity.Id(avOut.AppVersionID), &dummy); err == nil { + t.Errorf("expected AppVersion %s to be deleted, but Get succeeded", avOut.AppVersionID) + } + } + } + + // Staged source is cleaned up via undoReceiveTar. + if _, ok := streams.StagedPath("stream-fail"); ok { + t.Error("expected staged source to be cleaned up after compensation") + } +} + +var errSimulatedActivate = errors.New("simulated activate failure") + +func TestBuildSaga_FailsWhenStreamUnavailable(t *testing.T) { + ctx := context.Background() + + h := newSagaTestHarness(t) + // Note: no Register call — simulates a crash before the stream arrived, + // or a saga revived after the in-process stream was lost. + + err := h.executor.Start(sagaBuildFromTar). + Input("app_name", "demo"). + Input("stream_id", "stream-gone"). + WithID("test-stream-gone"). + Execute(ctx) + if err == nil { + t.Fatal("saga should have failed when stream is unavailable") + } + // The framework serializes action errors to strings and reconstructs + // them on failure, which breaks errors.Is chains. Match the message + // so we still confirm the stream-unavailable signal made it through. + if !strings.Contains(err.Error(), ErrStreamUnavailable.Error()) { + t.Errorf("expected error to mention %q, got %v", ErrStreamUnavailable.Error(), err) + } + +} diff --git a/servers/build/saga_builder.go b/servers/build/saga_builder.go new file mode 100644 index 000000000..90d1111ca --- /dev/null +++ b/servers/build/saga_builder.go @@ -0,0 +1,311 @@ +package build + +import ( + "context" + "fmt" + "log/slog" + + "miren.dev/runtime/api/build/build_v1alpha" + "miren.dev/runtime/api/core/core_v1alpha" + "miren.dev/runtime/pkg/entity" + "miren.dev/runtime/pkg/idgen" + "miren.dev/runtime/pkg/rpc" + "miren.dev/runtime/pkg/rpc/stream" + "miren.dev/runtime/pkg/saga" + "miren.dev/runtime/pkg/tarx" +) + +// buildArgs abstracts the shape both BuildFromTarArgs and +// BuildFromPreparedArgs share. Lets the saga entry points share the +// "pull initial inputs from args" logic. +type buildArgs interface { + EnvVars() []*build_v1alpha.EnvironmentVariable + HasEphemeralLabel() bool + EphemeralLabel() string + HasEphemeralTtl() bool + EphemeralTtl() string +} + +// buildResults is the shared shape of both Results types we set on +// completion. Generated types don't share an interface, so we declare +// the minimum surface here. +type buildResults interface { + SetVersion(string) + SetVersionShortId(string) + SetAccessInfo(**build_v1alpha.AccessInfo) +} + +// SagaBuilder is the saga-backed implementation of the build_v1alpha +// Builder RPC interface. It wraps the existing *Builder (which keeps +// shared collaborators like the entity client, app client, BuildKit +// component, and source cache locks), adds a per-instance saga +// executor + registry, and exposes BuildFromTar / BuildFromPrepared +// implementations that drive the build-from-tar saga end to end. +// +// PrepareUpload and AnalyzeApp delegate straight through to the inner +// Builder. Prepare is just session management, not a build, and +// AnalyzeApp is read-only — neither benefits from saga durability. +type SagaBuilder struct { + inner *Builder + executor *saga.Executor + registry *saga.Registry + streams *StreamRegistry + statuses *StatusRegistry + log *slog.Logger +} + +// NewSagaBuilder constructs a SagaBuilder over an existing Builder. The +// caller still owns the Builder's collaborators (clients, log writer, +// BuildKit component); SagaBuilder layers saga execution + crash +// recovery on top without forking the underlying state. +func NewSagaBuilder(inner *Builder, sagaStorage saga.Storage, log *slog.Logger) *SagaBuilder { + registry := saga.NewRegistry() + executor := saga.NewExecutor( + sagaStorage, + saga.WithRegistry(registry), + saga.WithLogger(log.With("module", "saga-build")), + ) + streams := NewStreamRegistry(inner.TempDir, log) + statuses := NewStatusRegistry() + return &SagaBuilder{ + inner: inner, + executor: executor, + registry: registry, + streams: streams, + statuses: statuses, + log: log.With("module", "saga-builder"), + } +} + +// Init registers the build-from-tar saga definition with the executor +// and recovers any incomplete sagas left over from a previous process. +// Should be called once at server startup before serving RPC requests. +func (s *SagaBuilder) Init(ctx context.Context) error { + if err := registerBuildSaga(s.registry, s.inner, s.streams, s.statuses, s.log); err != nil { + return fmt.Errorf("registering build-from-tar saga: %w", err) + } + if err := s.executor.Recover(ctx); err != nil { + // Recovery failures don't block startup — they're already + // logged inside Recover. New requests can still come in. + s.log.Error("saga recovery completed with errors", "error", err) + } + return nil +} + +// BuildFromTar is the saga-backed implementation of the RPC method. +// Stream registration, status sender registration, and saga start all +// key off the same generated stream ID so the saga's actions can find +// the live reader and status sink by ID alone. +func (s *SagaBuilder) BuildFromTar(ctx context.Context, state *build_v1alpha.BuilderBuildFromTar) error { + args := state.Args() + name := args.Application() + if !rpc.AllowApp(ctx, name) { + return rpc.AppAccessError(ctx, name) + } + + streamID := idgen.Gen("bt") + status := args.Status() + sender := NewRPCStatusSender(status, s.inner.Log) + s.statuses.Register(streamID, sender) + defer s.statuses.Unregister(streamID) + + tardata := args.Tardata() + s.streams.Register(streamID, stream.ToReader(ctx, tardata)) + // streams.Cleanup is idempotent and finalize calls it on success; + // the deferred call covers crash paths between Execute return and + // the next request. + defer func() { + if err := s.streams.Cleanup(streamID); err != nil { + s.log.Warn("post-saga stream cleanup", "stream", streamID, "error", err) + } + }() + + executionID := "build-from-tar-" + streamID + if err := s.startBuild(ctx, executionID, name, streamID, args.EnvVars(), ephemeralFromArgs(args)); err != nil { + return err + } + + return s.populateResults(ctx, executionID, name, state.Results(), ephemeralLabelFromArgs(args)) +} + +// BuildFromPrepared mirrors BuildFromTar but starts from an upload +// session whose source tree is already on disk. The session lookup +// stays out of the saga (it's a pre-flight check, not a step we'd +// want to retry on recovery), then MarkStaged tells the registry the +// path is ready so the saga's receive-tar action returns it directly. +func (s *SagaBuilder) BuildFromPrepared(ctx context.Context, state *build_v1alpha.BuilderBuildFromPrepared) error { + args := state.Args() + sessionID := args.SessionId() + + val, ok := s.inner.sessions.LoadAndDelete(sessionID) + if !ok { + return fmt.Errorf("unknown or expired upload session: %s", sessionID) + } + sess := val.(*buildSession) + sess.cancelFunc() + + name := sess.appName + if !rpc.AllowApp(ctx, name) { + return rpc.AppAccessError(ctx, name) + } + + status := args.Status() + sender := NewRPCStatusSender(status, s.inner.Log) + s.statuses.Register(sessionID, sender) + defer s.statuses.Unregister(sessionID) + + // If the client is sending an incremental tar of changed files, + // extract it into the session dir before the saga starts. This + // matches the pre-saga flow and keeps the receive-tar action + // uniform (it just sees a populated directory). + if td := args.Tardata(); td != nil { + sender.SendMessage("Receiving changed files") + if err := extractTarIntoDir(ctx, td, sess.dir); err != nil { + sender.SendError("Error extracting changed files: %v", err) + return fmt.Errorf("error extracting changed files: %w", err) + } + } + + if s.inner.DataPath != "" { + cache := &sourceCache{dataPath: s.inner.DataPath, log: s.inner.Log, locks: s.inner.cacheLocks} + if err := cache.saveSourceImage(name, sess.dir); err != nil { + s.inner.Log.Warn("failed to save source code cache", "error", err, "app", name) + } + } + + s.streams.MarkStaged(sessionID, sess.dir) + defer func() { + if err := s.streams.Cleanup(sessionID); err != nil { + s.log.Warn("post-saga stream cleanup", "stream", sessionID, "error", err) + } + }() + + executionID := "build-from-prepared-" + sessionID + if err := s.startBuild(ctx, executionID, name, sessionID, args.EnvVars(), ephemeralFromArgs(args)); err != nil { + return err + } + + return s.populateResults(ctx, executionID, name, state.Results(), ephemeralLabelFromArgs(args)) +} + +// PrepareUpload is a session-management op, not a build, so it routes +// straight to the inner Builder. +func (s *SagaBuilder) PrepareUpload(ctx context.Context, state *build_v1alpha.BuilderPrepareUpload) error { + return s.inner.PrepareUpload(ctx, state) +} + +// AnalyzeApp is read-only. +func (s *SagaBuilder) AnalyzeApp(ctx context.Context, state *build_v1alpha.BuilderAnalyzeApp) error { + return s.inner.AnalyzeApp(ctx, state) +} + +// startBuild fans out the initial inputs and runs the saga. +func (s *SagaBuilder) startBuild( + ctx context.Context, + executionID, appName, streamID string, + cliEnvVars []*build_v1alpha.EnvironmentVariable, + eph *ephemeralOpts, +) error { + sb := s.executor.Start(sagaBuildFromTar). + Input("app_name", appName). + Input("stream_id", streamID). + WithID(executionID) + + if len(cliEnvVars) > 0 { + sb = sb.Input("cli_env_vars", cliEnvVars) + } + if eph != nil { + sb = sb.Input("ephemeral_label", eph.label) + if eph.ttl != "" { + sb = sb.Input("ephemeral_ttl", eph.ttl) + } + } + + if err := sb.Execute(ctx); err != nil { + return fmt.Errorf("build saga: %w", err) + } + return nil +} + +// populateResults fishes the saga's outputs back out of storage to +// populate the RPC response. The version short ID and access info +// stay outside the saga since they're cheap reads against entities +// the saga already created. +func (s *SagaBuilder) populateResults( + ctx context.Context, + executionID, appName string, + results buildResults, + ephemeralLabel string, +) error { + out, err := s.executor.ExecutionOutputs(ctx, executionID) + if err != nil { + return fmt.Errorf("loading saga outputs for %s: %w", executionID, err) + } + + var versionName string + if err := out.Get("version_name", &versionName); err != nil { + return fmt.Errorf("reading version_name from saga: %w", err) + } + results.SetVersion(versionName) + + var appVersionID string + _ = out.Get("app_version_id", &appVersionID) + if appVersionID != "" { + if shortID, ok := s.lookupVersionShortID(ctx, appVersionID); ok { + results.SetVersionShortId(shortID) + } + } + + accessInfo := s.inner.getAccessInfo(ctx, appName, ephemeralLabel) + results.SetAccessInfo(&accessInfo) + return nil +} + +// lookupVersionShortID fetches the short ID for a created AppVersion +// entity. Returns false if the entity isn't found or doesn't have a +// short ID attribute — callers should fall back to omitting the field. +func (s *SagaBuilder) lookupVersionShortID(ctx context.Context, appVersionID string) (string, bool) { + var av core_v1alpha.AppVersion + ent, err := s.inner.ec.GetByIdWithEntity(ctx, entity.Id(appVersionID), &av) + if err != nil { + return "", false + } + for _, attr := range ent.Attrs() { + if entity.Id(attr.ID) == entity.DBShortId { + return attr.Value.String(), true + } + } + return "", false +} + +// ephemeralFromArgs builds the ephemeralOpts the saga needs from +// either BuildFromTar or BuildFromPrepared args. Returns nil when no +// label is present — the saga treats nil as "regular deploy". +func ephemeralFromArgs(args buildArgs) *ephemeralOpts { + if !args.HasEphemeralLabel() || args.EphemeralLabel() == "" { + return nil + } + ttl := "24h" + if args.HasEphemeralTtl() && args.EphemeralTtl() != "" { + ttl = args.EphemeralTtl() + } + return &ephemeralOpts{label: args.EphemeralLabel(), ttl: ttl} +} + +// ephemeralLabelFromArgs returns just the label string, "" for +// non-ephemeral deploys. Used for the post-saga access-info lookup +// where the label affects route resolution. +func ephemeralLabelFromArgs(args buildArgs) string { + if !args.HasEphemeralLabel() { + return "" + } + return args.EphemeralLabel() +} + +// extractTarIntoDir reads the RPC tar stream into dir, the same shape +// BuildFromPrepared's pre-saga path uses for incremental uploads. +func extractTarIntoDir(ctx context.Context, td *stream.RecvStreamClient[[]byte], dir string) error { + r := stream.ToReader(ctx, td) + _, err := tarx.TarFS(r, dir) + return err +} diff --git a/servers/build/status_registry.go b/servers/build/status_registry.go new file mode 100644 index 000000000..89e7af47e --- /dev/null +++ b/servers/build/status_registry.go @@ -0,0 +1,174 @@ +package build + +import ( + "context" + "fmt" + "log/slog" + "sync" + + "miren.dev/runtime/api/build/build_v1alpha" + "miren.dev/runtime/pkg/rpc/stream" +) + +// StatusSender lets saga actions emit progress, log lines, and errors +// back to the client that started the build without holding a direct +// reference to the per-request RPC stream. The SagaBuilder wraps a +// concrete stream into a sender and registers it under the stream ID +// for the duration of a saga; actions look it up by the same ID they +// already carry through saga inputs. +// +// During recovery the sender returned by StatusRegistry.SenderFor is a +// noop, which is the right default — nobody's listening for live +// progress when the original CLI invocation is long gone. +type StatusSender interface { + // SendMessage emits a free-form progress message ("Reading + // application data", "Launching builder", etc.). + SendMessage(msg string) + + // SendPhase translates a buildkit phase name into a user-facing + // progress message, matching the mapping the pre-saga path used. + SendPhase(phase string) + + // SendBuildkit emits the raw buildkit JSON status payload so the + // CLI can render live vertex/log output. + SendBuildkit(payload []byte) + + // SendError emits a user-facing error message. Returning the error + // from the action is what surfaces failure through the saga; this + // is the channel for the human-readable explanation. + SendError(format string, args ...any) + + // SendLog emits a structured log entry. Currently only the + // pre-saga "warn" path for local storage migration uses this. + SendLog(level, text string, fields ...*build_v1alpha.LogField) +} + +// noopStatusSender is the zero-cost StatusSender used during recovery +// or any other time no live RPC stream is registered for a given ID. +type noopStatusSender struct{} + +func (noopStatusSender) SendMessage(string) {} +func (noopStatusSender) SendPhase(string) {} +func (noopStatusSender) SendBuildkit([]byte) {} +func (noopStatusSender) SendError(string, ...any) {} +func (noopStatusSender) SendLog(string, string, ...*build_v1alpha.LogField) {} + +// rpcStatusSender adapts the existing per-request SendStreamClient into +// the StatusSender interface so saga actions don't have to know about +// the RPC wiring. Errors on Send are logged and swallowed; a dropped +// client stream shouldn't fail the saga. +type rpcStatusSender struct { + stream *stream.SendStreamClient[*build_v1alpha.Status] + log *slog.Logger +} + +// NewRPCStatusSender wraps an RPC status stream. Passing a nil stream +// returns a sender that no-ops cleanly so callers don't have to special- +// case the "client didn't request status updates" path. +func NewRPCStatusSender(s *stream.SendStreamClient[*build_v1alpha.Status], log *slog.Logger) StatusSender { + if s == nil { + return noopStatusSender{} + } + if log == nil { + log = slog.Default() + } + return &rpcStatusSender{stream: s, log: log} +} + +func (r *rpcStatusSender) send(so *build_v1alpha.Status) { + if _, err := r.stream.Send(context.Background(), so); err != nil { + r.log.Warn("status send", "error", err) + } +} + +func (r *rpcStatusSender) SendMessage(msg string) { + so := new(build_v1alpha.Status) + so.Update().SetMessage(msg) + r.send(so) +} + +func (r *rpcStatusSender) SendPhase(phase string) { + // Mapping matches the pre-saga path's WithPhaseUpdates callback so + // the CLI's progress display behaves identically on both code paths. + var msg string + switch phase { + case "export": + msg = "Registering image" + case "solving": + msg = "Calculating build" + case "solved": + msg = "Building image" + default: + msg = phase + } + r.SendMessage(msg) +} + +func (r *rpcStatusSender) SendBuildkit(payload []byte) { + so := new(build_v1alpha.Status) + so.Update().SetBuildkit(payload) + r.send(so) +} + +func (r *rpcStatusSender) SendError(format string, args ...any) { + so := new(build_v1alpha.Status) + so.Update().SetError(fmt.Sprintf(format, args...)) + r.send(so) +} + +func (r *rpcStatusSender) SendLog(level, text string, fields ...*build_v1alpha.LogField) { + so := new(build_v1alpha.Status) + entry := &build_v1alpha.LogEntry{} + entry.SetLevel(level) + entry.SetText(text) + if len(fields) > 0 { + entry.SetFields(fields) + } + so.Update().SetLog(entry) + r.send(so) +} + +// StatusRegistry maps stream IDs (the same handles StreamRegistry uses) +// to live StatusSenders. Saga actions retrieve a sender by ID without +// caring whether anyone's currently listening — SenderFor returns noop +// when nothing's registered, which gives recovery the right behavior +// for free. +type StatusRegistry struct { + mu sync.Mutex + senders map[string]StatusSender +} + +func NewStatusRegistry() *StatusRegistry { + return &StatusRegistry{senders: make(map[string]StatusSender)} +} + +// Register associates a sender with a stream ID. The SagaBuilder calls +// this before saga.Start and Unregister after, so a sender's lifetime +// is bounded by one saga execution. +func (s *StatusRegistry) Register(streamID string, sender StatusSender) { + if sender == nil { + sender = noopStatusSender{} + } + s.mu.Lock() + defer s.mu.Unlock() + s.senders[streamID] = sender +} + +// Unregister drops the live sender for an ID. Idempotent — calling +// twice (e.g., from both a defer and an error path) is fine. +func (s *StatusRegistry) Unregister(streamID string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.senders, streamID) +} + +// SenderFor returns the registered sender or a noop. Always returns a +// usable StatusSender so callers can avoid nil checks. +func (s *StatusRegistry) SenderFor(streamID string) StatusSender { + s.mu.Lock() + defer s.mu.Unlock() + if sender, ok := s.senders[streamID]; ok { + return sender + } + return noopStatusSender{} +} diff --git a/servers/build/status_registry_test.go b/servers/build/status_registry_test.go new file mode 100644 index 000000000..0c199241a --- /dev/null +++ b/servers/build/status_registry_test.go @@ -0,0 +1,131 @@ +package build + +import ( + "sync" + "testing" + + "miren.dev/runtime/api/build/build_v1alpha" +) + +// recordingSender captures every Send* call so tests can assert on the +// sequence of progress messages a saga action emitted. Thread-safe so +// it works regardless of which goroutine the action runs on. +type recordingSender struct { + mu sync.Mutex + Messages []string + Phases []string + Buildkit [][]byte + Errors []string + Logs []recordedLog +} + +type recordedLog struct { + Level string + Text string + Fields []*build_v1alpha.LogField +} + +func (r *recordingSender) SendMessage(msg string) { + r.mu.Lock() + defer r.mu.Unlock() + r.Messages = append(r.Messages, msg) +} + +func (r *recordingSender) SendPhase(phase string) { + r.mu.Lock() + defer r.mu.Unlock() + r.Phases = append(r.Phases, phase) +} + +func (r *recordingSender) SendBuildkit(payload []byte) { + r.mu.Lock() + defer r.mu.Unlock() + r.Buildkit = append(r.Buildkit, append([]byte(nil), payload...)) +} + +func (r *recordingSender) SendError(format string, args ...any) { + r.mu.Lock() + defer r.mu.Unlock() + r.Errors = append(r.Errors, format) // tests don't need full Sprintf +} + +func (r *recordingSender) SendLog(level, text string, fields ...*build_v1alpha.LogField) { + r.mu.Lock() + defer r.mu.Unlock() + r.Logs = append(r.Logs, recordedLog{Level: level, Text: text, Fields: fields}) +} + +func TestStatusRegistry_UnregisteredIDReturnsNoop(t *testing.T) { + reg := NewStatusRegistry() + // SenderFor must always return a usable sender; the noop is what + // makes the recovery path work without special-casing. + sender := reg.SenderFor("not-registered") + // Calling every method should not panic. + sender.SendMessage("hi") + sender.SendPhase("solving") + sender.SendBuildkit([]byte("x")) + sender.SendError("oops") + sender.SendLog("info", "text") +} + +func TestStatusRegistry_RegisteredSenderReceives(t *testing.T) { + reg := NewStatusRegistry() + rec := &recordingSender{} + reg.Register("s1", rec) + + sender := reg.SenderFor("s1") + sender.SendMessage("hello") + sender.SendPhase("solving") + sender.SendBuildkit([]byte("payload")) + sender.SendError("bad %s", "thing") + + if got, want := rec.Messages, []string{"hello"}; !equalStringSlice(got, want) { + t.Errorf("Messages = %v, want %v", got, want) + } + if got, want := rec.Phases, []string{"solving"}; !equalStringSlice(got, want) { + t.Errorf("Phases = %v, want %v", got, want) + } + if len(rec.Buildkit) != 1 || string(rec.Buildkit[0]) != "payload" { + t.Errorf("Buildkit = %v, want one entry with payload bytes", rec.Buildkit) + } + if len(rec.Errors) != 1 { + t.Errorf("Errors len = %d, want 1", len(rec.Errors)) + } +} + +func TestStatusRegistry_UnregisterReturnsNoop(t *testing.T) { + reg := NewStatusRegistry() + rec := &recordingSender{} + reg.Register("s1", rec) + reg.Unregister("s1") + // Double unregister is fine. + reg.Unregister("s1") + + sender := reg.SenderFor("s1") + sender.SendMessage("after-unregister") + + if len(rec.Messages) != 0 { + t.Errorf("recorder should not have received post-unregister messages, got %v", rec.Messages) + } +} + +func TestStatusRegistry_NilSenderRegistersAsNoop(t *testing.T) { + reg := NewStatusRegistry() + // Passing nil shouldn't blow up callers who didn't construct an + // rpcStatusSender — Register normalizes to noop. + reg.Register("s1", nil) + sender := reg.SenderFor("s1") + sender.SendMessage("should be safe") // no panic +} + +func equalStringSlice(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} diff --git a/servers/build/stream_registry.go b/servers/build/stream_registry.go new file mode 100644 index 000000000..25d0523b8 --- /dev/null +++ b/servers/build/stream_registry.go @@ -0,0 +1,183 @@ +package build + +import ( + "errors" + "fmt" + "io" + "log/slog" + "os" + "path/filepath" + "sync" + + "miren.dev/runtime/pkg/tarx" +) + +// ErrStreamUnavailable is returned by Stage when neither an active reader +// nor a previously staged path exists for the given stream ID. This is the +// expected failure when recovering a saga whose tar stream was lost in a +// crash before the first stage attempt. +var ErrStreamUnavailable = errors.New("stream unavailable") + +// StreamRegistry bridges non-serializable tar streams into the saga world by +// staging them to durable filesystem paths. The pattern, per RFD-35: +// +// 1. Entry point registers an io.Reader with a generated stream ID and +// starts a saga carrying only the ID. +// 2. The first saga action calls Stage to read the registered stream and +// extract its tar contents to a fresh subdirectory. +// 3. If the saga later resumes after a crash, the stream is gone but the +// staged path is durable — Stage returns the existing path without +// touching any reader. +// +// Stage and Cleanup are idempotent. Cleanup is safe to call from action +// compensation (on failure) and from a terminal saga step (on success); +// running it twice is a no-op. +type StreamRegistry struct { + baseDir string + log *slog.Logger + + mu sync.Mutex + streams map[string]io.Reader + staged map[string]string +} + +// NewStreamRegistry creates a registry that stages streams under baseDir. +// Each registered stream gets its own subdirectory so Cleanup is a single +// RemoveAll. +func NewStreamRegistry(baseDir string, log *slog.Logger) *StreamRegistry { + if log == nil { + log = slog.Default() + } + return &StreamRegistry{ + baseDir: baseDir, + log: log.With("component", "stream-registry"), + streams: make(map[string]io.Reader), + staged: make(map[string]string), + } +} + +// Register associates an io.Reader with a stream ID. The reader is held +// in memory until Stage consumes it or Cleanup drops it. Re-registering an +// ID with an active reader replaces it; re-registering after staging is a +// no-op so recovery flows don't accidentally restart staging. +func (s *StreamRegistry) Register(streamID string, r io.Reader) { + s.mu.Lock() + defer s.mu.Unlock() + + if _, alreadyStaged := s.staged[streamID]; alreadyStaged { + return + } + s.streams[streamID] = r +} + +// Stage extracts the registered stream's tar contents to a per-stream +// subdirectory and returns the path. On recovery, if the original reader +// is gone but a staged path exists, returns that path unchanged. Returns +// ErrStreamUnavailable when neither is available. +func (s *StreamRegistry) Stage(streamID string) (string, error) { + s.mu.Lock() + if path, ok := s.staged[streamID]; ok { + s.mu.Unlock() + s.log.Debug("stream already staged", "id", streamID, "path", path) + return path, nil + } + + reader, ok := s.streams[streamID] + if !ok { + s.mu.Unlock() + return "", fmt.Errorf("%w: %s", ErrStreamUnavailable, streamID) + } + // Remove the reader from the map so two concurrent Stage callers + // can't both consume it. If MkdirTemp fails the reader is + // untouched and we can safely restore it for a retry; TarFS failure + // past that point has consumed bytes and isn't recoverable. + delete(s.streams, streamID) + s.mu.Unlock() + + path, err := os.MkdirTemp(s.baseDir, "stream-"+streamID+"-") + if err != nil { + s.mu.Lock() + s.streams[streamID] = reader + s.mu.Unlock() + return "", fmt.Errorf("creating staging dir for %s: %w", streamID, err) + } + + if _, err := tarx.TarFS(reader, path); err != nil { + // Best-effort cleanup; the partial directory is small and the + // outer saga will fail the action anyway. + _ = os.RemoveAll(path) + return "", fmt.Errorf("extracting tar for %s: %w", streamID, err) + } + + s.mu.Lock() + s.staged[streamID] = path + s.mu.Unlock() + + s.log.Debug("staged stream", "id", streamID, "path", path) + return path, nil +} + +// StagedPath returns the staged directory if Stage previously succeeded +// for this ID. Useful for diagnostics and tests; the saga path itself +// flows through Stage's return value. +func (s *StreamRegistry) StagedPath(streamID string) (string, bool) { + s.mu.Lock() + defer s.mu.Unlock() + path, ok := s.staged[streamID] + return path, ok +} + +// MarkStaged records an externally-prepared path as already staged for +// streamID. Used by BuildFromPrepared, where the source is already on +// disk from a PrepareUpload session and the saga's receive-tar action +// just needs to discover that path. Refuses to overwrite an existing +// entry: re-marking is silently dropped (with a warn-log) so a stale +// session ID can't blow away a legitimately-staged directory and leak +// it on disk. +func (s *StreamRegistry) MarkStaged(streamID, path string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.streams, streamID) + if existing, ok := s.staged[streamID]; ok { + s.log.Warn("MarkStaged called for already-staged id", + "id", streamID, "existing", existing, "new", path) + return + } + s.staged[streamID] = path +} + +// Cleanup removes any staged directory and forgets the stream ID. +// Idempotent: safe to call from both action compensation (failure path) +// and a terminal cleanup action (success path), and safe to call when +// nothing was ever staged. Keeps the registry entry until RemoveAll +// succeeds so a transient filesystem error doesn't leak the path — +// the next call can retry with the same ID. +func (s *StreamRegistry) Cleanup(streamID string) error { + s.mu.Lock() + path, hadStaged := s.staged[streamID] + delete(s.streams, streamID) + s.mu.Unlock() + + if !hadStaged { + return nil + } + + if err := os.RemoveAll(path); err != nil { + return fmt.Errorf("removing staged dir %s: %w", path, err) + } + + s.mu.Lock() + delete(s.staged, streamID) + s.mu.Unlock() + + s.log.Debug("cleaned up staged stream", "id", streamID, "path", path) + return nil +} + +// stagedFileExists is used by tests to confirm the staging directory was +// actually written and later removed. Kept package-private; saga code uses +// the path returned from Stage directly. +func stagedFileExists(dir, name string) bool { + _, err := os.Stat(filepath.Join(dir, name)) + return err == nil +} diff --git a/servers/build/stream_registry_test.go b/servers/build/stream_registry_test.go new file mode 100644 index 000000000..c9be484bd --- /dev/null +++ b/servers/build/stream_registry_test.go @@ -0,0 +1,261 @@ +package build + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "errors" + "io" + "strings" + "sync" + "testing" +) + +// makeTar returns an io.Reader producing a gzip'd tar archive (tarx.TarFS +// always decompresses) containing the given files. Parent directories are +// emitted automatically for nested paths so the receiver doesn't need to +// pre-create them. Same shape of data BuildFromTar receives over the wire. +func makeTar(t *testing.T, files map[string]string) io.Reader { + t.Helper() + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + tw := tar.NewWriter(gw) + + seenDirs := map[string]bool{} + emitDir := func(dir string) { + if dir == "" || dir == "." || seenDirs[dir] { + return + } + seenDirs[dir] = true + hdr := &tar.Header{ + Name: dir + "/", + Mode: 0o755, + Typeflag: tar.TypeDir, + } + if err := tw.WriteHeader(hdr); err != nil { + t.Fatalf("tar header for dir %q: %v", dir, err) + } + } + + for name, body := range files { + // Emit each parent directory once so nested file entries can be + // created without ENOENT on os.Create. + if i := strings.LastIndex(name, "/"); i > 0 { + emitDir(name[:i]) + } + hdr := &tar.Header{ + Name: name, + Mode: 0o644, + Size: int64(len(body)), + } + if err := tw.WriteHeader(hdr); err != nil { + t.Fatalf("tar header for %q: %v", name, err) + } + if _, err := tw.Write([]byte(body)); err != nil { + t.Fatalf("tar body for %q: %v", name, err) + } + } + if err := tw.Close(); err != nil { + t.Fatalf("closing tar: %v", err) + } + if err := gw.Close(); err != nil { + t.Fatalf("closing gzip: %v", err) + } + return &buf +} + +func TestStreamRegistry_StageExtractsAndReturnsPath(t *testing.T) { + reg := NewStreamRegistry(t.TempDir(), nil) + reg.Register("s1", makeTar(t, map[string]string{ + "app.toml": "name = 'demo'", + "main.go": "package main", + })) + + path, err := reg.Stage("s1") + if err != nil { + t.Fatalf("Stage: %v", err) + } + if path == "" { + t.Fatal("Stage returned empty path") + } + + if !stagedFileExists(path, "app.toml") { + t.Errorf("expected app.toml at %s", path) + } + if !stagedFileExists(path, "main.go") { + t.Errorf("expected main.go at %s", path) + } +} + +func TestStreamRegistry_StageIsIdempotent(t *testing.T) { + reg := NewStreamRegistry(t.TempDir(), nil) + reg.Register("s1", makeTar(t, map[string]string{"a": "x"})) + + first, err := reg.Stage("s1") + if err != nil { + t.Fatalf("first Stage: %v", err) + } + + // Second call must return the same path without consuming a reader. + // The reader is gone after the first Stage, so a non-idempotent impl + // would either re-stage (changing path) or fail with ErrStreamUnavailable. + second, err := reg.Stage("s1") + if err != nil { + t.Fatalf("second Stage: %v", err) + } + if first != second { + t.Errorf("Stage paths differ: first=%s second=%s", first, second) + } +} + +func TestStreamRegistry_StageRecoveryWithoutStream(t *testing.T) { + // Simulate: caller staged once, then "crashed" — re-staging without + // ever calling Register again must still succeed because the path is + // remembered in-process. (Note: cross-process recovery would require + // persisting the path; that's a future iteration.) + reg := NewStreamRegistry(t.TempDir(), nil) + reg.Register("s1", makeTar(t, map[string]string{"a": "x"})) + staged, err := reg.Stage("s1") + if err != nil { + t.Fatalf("initial Stage: %v", err) + } + + recovered, err := reg.Stage("s1") + if err != nil { + t.Fatalf("recovery Stage: %v", err) + } + if recovered != staged { + t.Errorf("recovery path mismatch: got %s want %s", recovered, staged) + } +} + +func TestStreamRegistry_StageFailsWhenStreamGoneAndUnstaged(t *testing.T) { + reg := NewStreamRegistry(t.TempDir(), nil) + // Never registered. This is the "crash before any stage attempt" case. + _, err := reg.Stage("never-seen") + if err == nil { + t.Fatal("Stage should fail when nothing registered") + } + if !errors.Is(err, ErrStreamUnavailable) { + t.Errorf("expected ErrStreamUnavailable, got %v", err) + } +} + +func TestStreamRegistry_StageFailsOnBadTar(t *testing.T) { + reg := NewStreamRegistry(t.TempDir(), nil) + reg.Register("s1", strings.NewReader("not a tar")) + + _, err := reg.Stage("s1") + if err == nil { + t.Fatal("Stage should fail on malformed tar") + } + + // Failure must not leave the ID staged; a retry should report + // ErrStreamUnavailable rather than a half-extracted directory. + if _, ok := reg.StagedPath("s1"); ok { + t.Error("StagedPath should be empty after Stage failure") + } + _, err = reg.Stage("s1") + if !errors.Is(err, ErrStreamUnavailable) { + t.Errorf("retry after Stage failure: expected ErrStreamUnavailable, got %v", err) + } +} + +func TestStreamRegistry_CleanupRemovesStagedDir(t *testing.T) { + reg := NewStreamRegistry(t.TempDir(), nil) + reg.Register("s1", makeTar(t, map[string]string{"app.toml": "x"})) + path, err := reg.Stage("s1") + if err != nil { + t.Fatalf("Stage: %v", err) + } + + if err := reg.Cleanup("s1"); err != nil { + t.Fatalf("Cleanup: %v", err) + } + if stagedFileExists(path, "app.toml") { + t.Errorf("expected %s to be removed", path) + } + if _, ok := reg.StagedPath("s1"); ok { + t.Error("StagedPath should be empty after Cleanup") + } +} + +func TestStreamRegistry_CleanupIsIdempotent(t *testing.T) { + reg := NewStreamRegistry(t.TempDir(), nil) + reg.Register("s1", makeTar(t, map[string]string{"a": "x"})) + if _, err := reg.Stage("s1"); err != nil { + t.Fatalf("Stage: %v", err) + } + + // First cleanup removes the dir. + if err := reg.Cleanup("s1"); err != nil { + t.Fatalf("first Cleanup: %v", err) + } + // Second cleanup is a no-op — important because both action compensation + // and a terminal saga step might call it. + if err := reg.Cleanup("s1"); err != nil { + t.Fatalf("second Cleanup: %v", err) + } + // Cleanup on a never-staged ID is also fine. + if err := reg.Cleanup("never-seen"); err != nil { + t.Fatalf("cleanup of unknown id: %v", err) + } +} + +func TestStreamRegistry_RegisterAfterStageDoesNotResetState(t *testing.T) { + // A confused caller re-registering after staging shouldn't blow away + // the staged path. Recovery flows might re-register a reader that's + // already been consumed; we just ignore it. + reg := NewStreamRegistry(t.TempDir(), nil) + reg.Register("s1", makeTar(t, map[string]string{"a": "x"})) + first, err := reg.Stage("s1") + if err != nil { + t.Fatalf("Stage: %v", err) + } + + reg.Register("s1", makeTar(t, map[string]string{"b": "y"})) + second, err := reg.Stage("s1") + if err != nil { + t.Fatalf("Stage after re-register: %v", err) + } + if first != second { + t.Errorf("path changed after Register-after-Stage: first=%s second=%s", first, second) + } +} + +func TestStreamRegistry_ConcurrentStreams(t *testing.T) { + reg := NewStreamRegistry(t.TempDir(), nil) + const n = 8 + for i := 0; i < n; i++ { + id := streamID(i) + reg.Register(id, makeTar(t, map[string]string{id: id})) + } + + var wg sync.WaitGroup + paths := make([]string, n) + errs := make([]error, n) + for i := 0; i < n; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + paths[i], errs[i] = reg.Stage(streamID(i)) + }(i) + } + wg.Wait() + + seen := map[string]bool{} + for i, err := range errs { + if err != nil { + t.Errorf("Stage %d: %v", i, err) + continue + } + if seen[paths[i]] { + t.Errorf("duplicate staged path %s for stream %d", paths[i], i) + } + seen[paths[i]] = true + } +} + +func streamID(i int) string { + return "s" + string(rune('a'+i)) +} From ee27cb3617167f24b309cc0bfea2acf3e44bb8a0 Mon Sep 17 00:00:00 2001 From: Paul Hinze Date: Fri, 5 Jun 2026 08:09:04 -0500 Subject: [PATCH 2/2] pkg/saga: make saga state durable and scope recovery per executor Converting the build process to sagas (the parent commit) surfaced two framework bugs that only bite a saga which reads its own state back or shares storage with another executor. The build saga does both. EntityStorage.Save and EACStorage.Save persisted through create-if- absent primitives (EnsureEntity / EAC Ensure), so every save after the first was silently dropped. A saga's record froze at its initial pending state: status never advanced to completed, no action results were recorded. Reading outputs back failed ("status pending, expected completed"), and on restart recovery re-ran already-finished sagas because they still looked pending. MemoryStorage overwrites, so unit tests stayed green while both production backends were broken. Save now upserts: EntityStorage ensures-then-replaces, EACStorage uses Put. Recovery treated a saga whose definition isn't in the executor's registry as an error. But storage is shared across executors (build and sandbox each run their own) and ListIncomplete returns every executor's sagas, so each tripped over the other's on startup. Recovery now skips definitions it doesn't own, leaving them to their owning executor. A storage conformance suite runs the persistence scenarios against all three Storage backends so a backend that silently drops updates can't pass again. --- pkg/saga/eac_storage.go | 11 +- pkg/saga/executor.go | 13 ++- pkg/saga/storage.go | 13 ++- pkg/saga/storage_conformance_test.go | 169 +++++++++++++++++++++++++++ 4 files changed, 198 insertions(+), 8 deletions(-) create mode 100644 pkg/saga/storage_conformance_test.go diff --git a/pkg/saga/eac_storage.go b/pkg/saga/eac_storage.go index 018b59641..5660aa6d6 100644 --- a/pkg/saga/eac_storage.go +++ b/pkg/saga/eac_storage.go @@ -65,8 +65,15 @@ func (s *EACStorage) Save(ctx context.Context, exec *Execution) error { sagaEntity.Encode(), ) - _, err = s.eac.Ensure(ctx, ent.Attrs()) - if err != nil { + // Put is an upsert (update-then-create): unlike Ensure, it applies our + // attributes even when the entity already exists. Ensure is create-if-absent + // and would silently drop every save after the first, freezing the saga at + // its initial pending state. + rpcEnt := &es.Entity{} + rpcEnt.SetId(exec.ID) + rpcEnt.SetAttrs(ent.Attrs()) + + if _, err = s.eac.Put(ctx, rpcEnt); err != nil { return fmt.Errorf("saving saga entity via EAC: %w", err) } diff --git a/pkg/saga/executor.go b/pkg/saga/executor.go index 91f619161..66821bb20 100644 --- a/pkg/saga/executor.go +++ b/pkg/saga/executor.go @@ -465,15 +465,20 @@ func (e *Executor) Recover(ctx context.Context) error { continue } - e.log.Info("recovering saga", "saga", exec.DefinitionName, "execution", exec.ID, "status", exec.Status) - def, ok := e.registry.Get(exec.DefinitionName) if !ok { - e.log.Error("saga definition not found for recovery", "saga", exec.DefinitionName) - recoverErrors = append(recoverErrors, fmt.Errorf("definition %q not found", exec.DefinitionName)) + // Storage is shared across executors (e.g. build and sandbox each + // run their own), but ListIncomplete returns every executor's + // sagas. A definition we don't recognize belongs to a different + // executor, which will recover it from its own registry. Skip + // quietly rather than treating it as our recovery error. + e.log.Debug("skipping saga with unregistered definition (owned by another executor)", + "saga", exec.DefinitionName, "execution", exec.ID) continue } + e.log.Info("recovering saga", "saga", exec.DefinitionName, "execution", exec.ID, "status", exec.Status) + // Check version compatibility if def.Version != exec.DefinitionVersion { e.log.Warn("saga definition version mismatch", diff --git a/pkg/saga/storage.go b/pkg/saga/storage.go index 4d6ec28cb..fcc1183e8 100644 --- a/pkg/saga/storage.go +++ b/pkg/saga/storage.go @@ -60,16 +60,25 @@ func (s *EntityStorage) Save(ctx context.Context, exec *Execution) error { Error: exec.Error, } - // Create or update the entity + // Create or update the entity. EnsureEntity is create-if-absent and + // does NOT apply our attributes when the entity already exists, so on + // every save after the first we must explicitly replace. Without this, + // the saga record stays frozen at its initial pending state and later + // status/action-progress writes are silently dropped. ent := entity.New( entity.DBId, entity.Id(exec.ID), sagaEntity.Encode(), ) - _, _, err = s.store.EnsureEntity(ctx, ent) + _, created, err := s.store.EnsureEntity(ctx, ent) if err != nil { return fmt.Errorf("saving saga entity: %w", err) } + if !created { + if _, err := s.store.ReplaceEntity(ctx, ent); err != nil { + return fmt.Errorf("updating saga entity: %w", err) + } + } return nil } diff --git a/pkg/saga/storage_conformance_test.go b/pkg/saga/storage_conformance_test.go new file mode 100644 index 000000000..2625286ad --- /dev/null +++ b/pkg/saga/storage_conformance_test.go @@ -0,0 +1,169 @@ +package saga + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "miren.dev/runtime/pkg/entity/testutils" +) + +// storageFactory builds a fresh, empty Storage for one test. It registers any +// teardown via t.Cleanup so callers get a clean backend per subtest. +type storageFactory struct { + name string + make func(t *testing.T) Storage +} + +// allStorageBackends returns every production Storage implementation behind a +// uniform factory. The whole point of this suite is that every backend must +// satisfy the same contract: the bug in MIR-441 lived in two of these three +// (EntityStorage and EACStorage both used create-if-absent semantics and +// silently dropped every save after the first), while MemoryStorage was +// correct, so memory-backed unit tests stayed green while production froze +// every saga at its initial pending state. Running the same scenarios against +// all backends is what closes that gap. +// +// This suite tests a different question than pkg/entity's Store conformance +// suite, and the two are complementary. The entity suite proves MockStore +// behaves like the real EtcdStore on the underlying write primitives. This +// suite proves saga storage *uses* those primitives correctly (the MIR-441 bug +// was a create-only call where an upsert was needed, which no amount of +// store-level testing would catch). Because the entity suite vouches for the +// mock, the EntityStorage backend here is mock-backed on purpose and we do not +// add a real-etcd backend; the EACStorage backend additionally exercises the +// EntityAccessClient RPC path, which the entity suite does not reach. +func allStorageBackends() []storageFactory { + return []storageFactory{ + { + name: "MemoryStorage", + make: func(t *testing.T) Storage { + return NewMemoryStorage() + }, + }, + { + name: "EntityStorage", + make: func(t *testing.T) Storage { + inmem, cleanup := testutils.NewInMemEntityServer(t) + t.Cleanup(cleanup) + return NewEntityStorage(inmem.Store, testutils.TestLogger(t)) + }, + }, + { + name: "EACStorage", + make: func(t *testing.T) Storage { + inmem, cleanup := testutils.NewInMemEntityServer(t) + t.Cleanup(cleanup) + return NewEACStorage(inmem.EAC, testutils.TestLogger(t)) + }, + }, + } +} + +// TestStorageConformance_SaveUpdatesExistingExecution is the direct regression +// for the EnsureEntity/Ensure create-if-absent bug. A saga is saved repeatedly +// as it progresses (pending -> running -> completed, accumulating action +// results). Every save after the first must overwrite the stored state. The +// buggy backends dropped these updates, so Get returned a pending execution +// with no recorded actions even though the saga had completed. +func TestStorageConformance_SaveUpdatesExistingExecution(t *testing.T) { + for _, backend := range allStorageBackends() { + t.Run(backend.name, func(t *testing.T) { + ctx := context.Background() + storage := backend.make(t) + + exec := &Execution{ + ID: "build-from-prepared-abc123", + DefinitionName: "build-from-tar", + Status: StatusPending, + InitialInputs: map[string]any{"app_name": "demo"}, + ExecutedActions: map[string]*ActionResult{}, + ExecutionOrder: []string{}, + } + + // First save: creates the entity. + require.NoError(t, storage.Save(ctx, exec)) + + // Progress the saga and save again. Under the old create-if-absent + // behavior this save was a silent no-op. + exec.Status = StatusRunning + exec.ExecutedActions["receive-tar"] = &ActionResult{ + Output: []byte(`{"source_dir":"/tmp/build"}`), + ExecutedAt: time.Unix(0, 0), + } + exec.ExecutionOrder = []string{"receive-tar"} + require.NoError(t, storage.Save(ctx, exec)) + + // Complete the saga and save a final time. + exec.Status = StatusCompleted + exec.ExecutedActions["create-version"] = &ActionResult{ + Output: []byte(`{"version_name":"demo-v1"}`), + ExecutedAt: time.Unix(0, 0), + } + exec.ExecutionOrder = []string{"receive-tar", "create-version"} + require.NoError(t, storage.Save(ctx, exec)) + + got, err := storage.Get(ctx, exec.ID) + require.NoError(t, err) + + assert.Equal(t, StatusCompleted, got.Status, + "final status must persist; a stuck pending status means saves after the first were dropped") + assert.Len(t, got.ExecutedActions, 2, + "executed actions recorded across saves must all persist") + assert.Equal(t, []string{"receive-tar", "create-version"}, got.ExecutionOrder) + + if action, ok := got.ExecutedActions["create-version"]; assert.True(t, ok, "create-version action must persist") { + assert.JSONEq(t, `{"version_name":"demo-v1"}`, string(action.Output)) + } + }) + } +} + +// TestStorageConformance_CompletedExecutionLeavesIncompleteList verifies that a +// saga which has reached a terminal status no longer shows up in +// ListIncomplete. This is the operational consequence of the same bug: when the +// completed save was dropped, the execution stayed pending in storage forever, +// so every process restart re-ran an already-finished saga during recovery. +func TestStorageConformance_CompletedExecutionLeavesIncompleteList(t *testing.T) { + for _, backend := range allStorageBackends() { + t.Run(backend.name, func(t *testing.T) { + ctx := context.Background() + storage := backend.make(t) + + exec := &Execution{ + ID: "saga-incomplete-check", + DefinitionName: "build-from-tar", + Status: StatusPending, + InitialInputs: map[string]any{}, + ExecutedActions: map[string]*ActionResult{}, + ExecutionOrder: []string{}, + } + require.NoError(t, storage.Save(ctx, exec)) + + incomplete, err := storage.ListIncomplete(ctx) + require.NoError(t, err) + assert.True(t, containsExecution(incomplete, exec.ID), + "a pending saga must appear in ListIncomplete") + + exec.Status = StatusCompleted + require.NoError(t, storage.Save(ctx, exec)) + + incomplete, err = storage.ListIncomplete(ctx) + require.NoError(t, err) + assert.False(t, containsExecution(incomplete, exec.ID), + "a completed saga must NOT appear in ListIncomplete; if it does, recovery will re-run finished work") + }) + } +} + +func containsExecution(execs []*Execution, id string) bool { + for _, e := range execs { + if e != nil && e.ID == id { + return true + } + } + return false +}