diff --git a/components/coordinate/coordinate.go b/components/coordinate/coordinate.go index 0ddbd137..4e9f20fc 100644 --- a/components/coordinate/coordinate.go +++ b/components/coordinate/coordinate.go @@ -1118,7 +1118,18 @@ func (c *Coordinator) Start(ctx context.Context) error { appClient := appclient.NewClient(c.Log, loopback) bs := build.NewBuilder(c.Log, eac, appClient, addonsClient, c.Resolver, c.TempDir, c.LogWriter, c.CloudAuth.DNSHostname, c.BuildKit, c.DataPath) - server.ExposeValue("dev.miren.runtime/build", build_v1alpha.AdaptBuilder(bs)) + + var buildHandler build_v1alpha.Builder = bs + if labs.Sagas() { + sagaStorage := saga.NewEntityStorage(etcdStore, c.Log) + sagaBuilder := build.NewSagaBuilder(bs, sagaStorage, c.Log) + if err := sagaBuilder.Init(ctx); err != nil { + c.Log.Error("failed to initialize saga builder", "error", err) + return err + } + buildHandler = sagaBuilder + } + server.ExposeValue("dev.miren.runtime/build", build_v1alpha.AdaptBuilder(buildHandler)) ls := logs.NewServer(c.Log, ec, c.Logs) server.ExposeValue("dev.miren.runtime/logs", app_v1alpha.AdaptLogs(ls)) diff --git a/pkg/labs/features.yaml b/pkg/labs/features.yaml index 424e1962..a40e9282 100644 --- a/pkg/labs/features.yaml +++ b/pkg/labs/features.yaml @@ -16,5 +16,5 @@ features: - name: sagas predicate: Sagas - description: Use saga-based crash-recoverable workflows for sandbox lifecycle + description: Use saga-based crash-recoverable workflows default: false diff --git a/pkg/labs/labs.gen.go b/pkg/labs/labs.gen.go index 0fbfe6ce..ec224f38 100644 --- a/pkg/labs/labs.gen.go +++ b/pkg/labs/labs.gen.go @@ -32,7 +32,7 @@ func FeatureDescriptions() map[string]string { FeatureGlobalRouter: "Use global NAT traversal router for connectivity", FeatureDistributedRunners: "Schedule jobs across multiple runner nodes", FeatureAdminAPI: "Enable the admin API for application management functions", - FeatureSagas: "Use saga-based crash-recoverable workflows for sandbox lifecycle", + FeatureSagas: "Use saga-based crash-recoverable workflows", } } @@ -153,7 +153,7 @@ func AdminAPI() bool { } // Sagas returns whether the sagas feature is enabled. -// Use saga-based crash-recoverable workflows for sandbox lifecycle +// Use saga-based crash-recoverable workflows func Sagas() bool { return IsEnabled(FeatureSagas) } diff --git a/pkg/saga/eac_storage.go b/pkg/saga/eac_storage.go index 018b5964..5660aa6d 100644 --- a/pkg/saga/eac_storage.go +++ b/pkg/saga/eac_storage.go @@ -65,8 +65,15 @@ func (s *EACStorage) Save(ctx context.Context, exec *Execution) error { sagaEntity.Encode(), ) - _, err = s.eac.Ensure(ctx, ent.Attrs()) - if err != nil { + // Put is an upsert (update-then-create): unlike Ensure, it applies our + // attributes even when the entity already exists. Ensure is create-if-absent + // and would silently drop every save after the first, freezing the saga at + // its initial pending state. + rpcEnt := &es.Entity{} + rpcEnt.SetId(exec.ID) + rpcEnt.SetAttrs(ent.Attrs()) + + if _, err = s.eac.Put(ctx, rpcEnt); err != nil { return fmt.Errorf("saving saga entity via EAC: %w", err) } diff --git a/pkg/saga/executor.go b/pkg/saga/executor.go index 91f61916..66821bb2 100644 --- a/pkg/saga/executor.go +++ b/pkg/saga/executor.go @@ -465,15 +465,20 @@ func (e *Executor) Recover(ctx context.Context) error { continue } - e.log.Info("recovering saga", "saga", exec.DefinitionName, "execution", exec.ID, "status", exec.Status) - def, ok := e.registry.Get(exec.DefinitionName) if !ok { - e.log.Error("saga definition not found for recovery", "saga", exec.DefinitionName) - recoverErrors = append(recoverErrors, fmt.Errorf("definition %q not found", exec.DefinitionName)) + // Storage is shared across executors (e.g. build and sandbox each + // run their own), but ListIncomplete returns every executor's + // sagas. A definition we don't recognize belongs to a different + // executor, which will recover it from its own registry. Skip + // quietly rather than treating it as our recovery error. + e.log.Debug("skipping saga with unregistered definition (owned by another executor)", + "saga", exec.DefinitionName, "execution", exec.ID) continue } + e.log.Info("recovering saga", "saga", exec.DefinitionName, "execution", exec.ID, "status", exec.Status) + // Check version compatibility if def.Version != exec.DefinitionVersion { e.log.Warn("saga definition version mismatch", diff --git a/pkg/saga/storage.go b/pkg/saga/storage.go index 4d6ec28c..fcc1183e 100644 --- a/pkg/saga/storage.go +++ b/pkg/saga/storage.go @@ -60,16 +60,25 @@ func (s *EntityStorage) Save(ctx context.Context, exec *Execution) error { Error: exec.Error, } - // Create or update the entity + // Create or update the entity. EnsureEntity is create-if-absent and + // does NOT apply our attributes when the entity already exists, so on + // every save after the first we must explicitly replace. Without this, + // the saga record stays frozen at its initial pending state and later + // status/action-progress writes are silently dropped. ent := entity.New( entity.DBId, entity.Id(exec.ID), sagaEntity.Encode(), ) - _, _, err = s.store.EnsureEntity(ctx, ent) + _, created, err := s.store.EnsureEntity(ctx, ent) if err != nil { return fmt.Errorf("saving saga entity: %w", err) } + if !created { + if _, err := s.store.ReplaceEntity(ctx, ent); err != nil { + return fmt.Errorf("updating saga entity: %w", err) + } + } return nil } diff --git a/pkg/saga/storage_conformance_test.go b/pkg/saga/storage_conformance_test.go new file mode 100644 index 00000000..2625286a --- /dev/null +++ b/pkg/saga/storage_conformance_test.go @@ -0,0 +1,169 @@ +package saga + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "miren.dev/runtime/pkg/entity/testutils" +) + +// storageFactory builds a fresh, empty Storage for one test. It registers any +// teardown via t.Cleanup so callers get a clean backend per subtest. +type storageFactory struct { + name string + make func(t *testing.T) Storage +} + +// allStorageBackends returns every production Storage implementation behind a +// uniform factory. The whole point of this suite is that every backend must +// satisfy the same contract: the bug in MIR-441 lived in two of these three +// (EntityStorage and EACStorage both used create-if-absent semantics and +// silently dropped every save after the first), while MemoryStorage was +// correct, so memory-backed unit tests stayed green while production froze +// every saga at its initial pending state. Running the same scenarios against +// all backends is what closes that gap. +// +// This suite tests a different question than pkg/entity's Store conformance +// suite, and the two are complementary. The entity suite proves MockStore +// behaves like the real EtcdStore on the underlying write primitives. This +// suite proves saga storage *uses* those primitives correctly (the MIR-441 bug +// was a create-only call where an upsert was needed, which no amount of +// store-level testing would catch). Because the entity suite vouches for the +// mock, the EntityStorage backend here is mock-backed on purpose and we do not +// add a real-etcd backend; the EACStorage backend additionally exercises the +// EntityAccessClient RPC path, which the entity suite does not reach. +func allStorageBackends() []storageFactory { + return []storageFactory{ + { + name: "MemoryStorage", + make: func(t *testing.T) Storage { + return NewMemoryStorage() + }, + }, + { + name: "EntityStorage", + make: func(t *testing.T) Storage { + inmem, cleanup := testutils.NewInMemEntityServer(t) + t.Cleanup(cleanup) + return NewEntityStorage(inmem.Store, testutils.TestLogger(t)) + }, + }, + { + name: "EACStorage", + make: func(t *testing.T) Storage { + inmem, cleanup := testutils.NewInMemEntityServer(t) + t.Cleanup(cleanup) + return NewEACStorage(inmem.EAC, testutils.TestLogger(t)) + }, + }, + } +} + +// TestStorageConformance_SaveUpdatesExistingExecution is the direct regression +// for the EnsureEntity/Ensure create-if-absent bug. A saga is saved repeatedly +// as it progresses (pending -> running -> completed, accumulating action +// results). Every save after the first must overwrite the stored state. The +// buggy backends dropped these updates, so Get returned a pending execution +// with no recorded actions even though the saga had completed. +func TestStorageConformance_SaveUpdatesExistingExecution(t *testing.T) { + for _, backend := range allStorageBackends() { + t.Run(backend.name, func(t *testing.T) { + ctx := context.Background() + storage := backend.make(t) + + exec := &Execution{ + ID: "build-from-prepared-abc123", + DefinitionName: "build-from-tar", + Status: StatusPending, + InitialInputs: map[string]any{"app_name": "demo"}, + ExecutedActions: map[string]*ActionResult{}, + ExecutionOrder: []string{}, + } + + // First save: creates the entity. + require.NoError(t, storage.Save(ctx, exec)) + + // Progress the saga and save again. Under the old create-if-absent + // behavior this save was a silent no-op. + exec.Status = StatusRunning + exec.ExecutedActions["receive-tar"] = &ActionResult{ + Output: []byte(`{"source_dir":"/tmp/build"}`), + ExecutedAt: time.Unix(0, 0), + } + exec.ExecutionOrder = []string{"receive-tar"} + require.NoError(t, storage.Save(ctx, exec)) + + // Complete the saga and save a final time. + exec.Status = StatusCompleted + exec.ExecutedActions["create-version"] = &ActionResult{ + Output: []byte(`{"version_name":"demo-v1"}`), + ExecutedAt: time.Unix(0, 0), + } + exec.ExecutionOrder = []string{"receive-tar", "create-version"} + require.NoError(t, storage.Save(ctx, exec)) + + got, err := storage.Get(ctx, exec.ID) + require.NoError(t, err) + + assert.Equal(t, StatusCompleted, got.Status, + "final status must persist; a stuck pending status means saves after the first were dropped") + assert.Len(t, got.ExecutedActions, 2, + "executed actions recorded across saves must all persist") + assert.Equal(t, []string{"receive-tar", "create-version"}, got.ExecutionOrder) + + if action, ok := got.ExecutedActions["create-version"]; assert.True(t, ok, "create-version action must persist") { + assert.JSONEq(t, `{"version_name":"demo-v1"}`, string(action.Output)) + } + }) + } +} + +// TestStorageConformance_CompletedExecutionLeavesIncompleteList verifies that a +// saga which has reached a terminal status no longer shows up in +// ListIncomplete. This is the operational consequence of the same bug: when the +// completed save was dropped, the execution stayed pending in storage forever, +// so every process restart re-ran an already-finished saga during recovery. +func TestStorageConformance_CompletedExecutionLeavesIncompleteList(t *testing.T) { + for _, backend := range allStorageBackends() { + t.Run(backend.name, func(t *testing.T) { + ctx := context.Background() + storage := backend.make(t) + + exec := &Execution{ + ID: "saga-incomplete-check", + DefinitionName: "build-from-tar", + Status: StatusPending, + InitialInputs: map[string]any{}, + ExecutedActions: map[string]*ActionResult{}, + ExecutionOrder: []string{}, + } + require.NoError(t, storage.Save(ctx, exec)) + + incomplete, err := storage.ListIncomplete(ctx) + require.NoError(t, err) + assert.True(t, containsExecution(incomplete, exec.ID), + "a pending saga must appear in ListIncomplete") + + exec.Status = StatusCompleted + require.NoError(t, storage.Save(ctx, exec)) + + incomplete, err = storage.ListIncomplete(ctx) + require.NoError(t, err) + assert.False(t, containsExecution(incomplete, exec.ID), + "a completed saga must NOT appear in ListIncomplete; if it does, recovery will re-run finished work") + }) + } +} + +func containsExecution(execs []*Execution, id string) bool { + for _, e := range execs { + if e != nil && e.ID == id { + return true + } + } + return false +} diff --git a/servers/build/build.go b/servers/build/build.go index d113b0f4..71db61c8 100644 --- a/servers/build/build.go +++ b/servers/build/build.go @@ -14,7 +14,6 @@ import ( "sync" "time" - "github.com/moby/buildkit/client" "github.com/tonistiigi/fsutil" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -1124,8 +1123,6 @@ func (b *Builder) buildFromDir(ctx context.Context, name string, path string, envVars []*build_v1alpha.EnvironmentVariable, ephemeral *ephemeralOpts) (*buildResult, error) { - so := new(build_v1alpha.Status) - // -- build.setup span: app config, stack detection, buildkit connect ctx, setupSpan := buildTracer.Start(ctx, "build.setup") @@ -1149,94 +1146,18 @@ func (b *Builder) buildFromDir(ctx context.Context, name string, path string, b.Log.Info("loaded app config", "name", ac.Name, "envVarCount", len(ac.EnvVars), "serviceCount", len(ac.Services)) } - var buildStack BuildStack - buildStack.CodeDir = path - - if ac != nil && ac.Build != nil { - buildStack.OnBuild = ac.Build.OnBuild - buildStack.Version = ac.Build.Version - buildStack.AlpineImage = ac.Build.AlpineImage - - if ac.Build.Dockerfile != "" { - buildStack.Stack = "dockerfile" - buildStack.Input = ac.Build.Dockerfile - - b.Log.Info("using dockerfile from app config", "dockerfile", ac.Build.Dockerfile) - } - } - - if buildStack.Stack == "" { - dr, err := tr.Open("Dockerfile.miren") - if err == nil { - buildStack.Stack = "dockerfile" - buildStack.Input = "Dockerfile.miren" - dr.Close() - } else { - buildStack.Stack = "auto" - } - } - - // Check if stack is supported before launching buildkit - if buildStack.Stack == "auto" { - detectOpts := stackbuild.BuildOptions{ - Log: b.Log, - Name: name, - OnBuild: buildStack.OnBuild, - Version: buildStack.Version, - AlpineImage: buildStack.AlpineImage, - } - _, err := stackbuild.DetectStack(buildStack.CodeDir, detectOpts) - if err != nil { - setupSpan.RecordError(err) - setupSpan.SetStatus(codes.Error, err.Error()) - setupSpan.End() - b.Log.Error("stack detection failed", "error", err, "app", name, "codeDir", buildStack.CodeDir) - b.sendErrorStatus(ctx, status, "No supported stack detected for app %s: %v", name, err) - return nil, fmt.Errorf("no supported stack detected for app %s: %w", name, err) - } - b.Log.Debug("stack detection successful, proceeding with build") - } - - // Now we know the stack is valid, proceed with buildkit setup - b.Log.Debug("setting up buildkit") - - if b.BuildKit == nil { - setupSpan.RecordError(fmt.Errorf("buildkit component not configured")) - setupSpan.SetStatus(codes.Error, "buildkit component not configured") - setupSpan.End() - b.Log.Error("buildkit component not configured") - b.sendErrorStatus(ctx, status, "BuildKit not configured - ensure server is running with BuildKit enabled") - return nil, fmt.Errorf("buildkit component not configured") - } - - b.Log.Info("connecting to buildkit daemon") - bkc, err := b.BuildKit.Client(ctx) + buildStack, err := b.detectBuildStack(path, ac, name, tr) if err != nil { setupSpan.RecordError(err) setupSpan.SetStatus(codes.Error, err.Error()) setupSpan.End() - b.Log.Error("failed to get buildkit client", "error", err) - b.sendErrorStatus(ctx, status, "Failed to connect to BuildKit: %v", err) + b.sendErrorStatus(ctx, status, "No supported stack detected for app %s: %v", name, err) return nil, err } - defer bkc.Close() - - b.Log.Debug("getting buildkit daemon info") - ci, err := bkc.Info(ctx) - if err != nil { - b.Log.Error("error getting buildkitd info", "error", err) - } else { - b.Log.Debug("buildkitd info", "version", ci.BuildkitVersion.Version, "rev", ci.BuildkitVersion.Revision) - } setupSpan.SetAttributes(attribute.String("miren.build.stack", buildStack.Stack)) setupSpan.End() - bk := &Buildkit{ - Client: bkc, - Log: b.Log, - } - appRec, mrv, existingCfg, _, err := b.nextVersion(ctx, name) if err != nil { b.Log.Error("error getting next version", "error", err) @@ -1244,7 +1165,6 @@ func (b *Builder) buildFromDir(ctx context.Context, name string, path string, return nil, err } - // Initialize build log writer for persisting build output to VictoriaLogs buildLog := &buildLogWriter{ log: b.Log, writer: b.LogWriter, @@ -1252,150 +1172,34 @@ func (b *Builder) buildFromDir(ctx context.Context, name string, path string, version: mrv.Version, } - // Compute env vars to inject into the build process - buildEnvVars := computeBuildEnvVars(existingCfg.Variables, ac, envVars) - if len(buildEnvVars) > 0 { - b.Log.Info("injecting env vars into build", "count", len(buildEnvVars)) - } - - var tos []TransformOptions - - tos = append(tos, - WithBuildArg("MIREN_VERSION", mrv.Version), - ) - - // Inject user env vars as build args (for Dockerfile builds) - if len(buildEnvVars) > 0 { - tos = append(tos, WithBuildArgs(buildEnvVars)) - } - - // Pass env vars for auto-stack builds - buildStack.EnvVars = buildEnvVars - - // Track vertices we've already logged to avoid duplicates - vertexStarted := make(map[string]bool) - vertexCompleted := make(map[string]bool) - - if status != nil { - tos = append(tos, WithPhaseUpdates(func(phase string) { - switch phase { - case "export": - so.Update().SetMessage("Registering image") - _, _ = status.Send(ctx, so) - case "solving": - so.Update().SetMessage("Calculating build") - _, _ = status.Send(ctx, so) - case "solved": - so.Update().SetMessage("Building image") - _, _ = status.Send(ctx, so) - default: - so.Update().SetMessage(phase) - _, _ = status.Send(ctx, so) - } - })) - } - - // Single status callback that both persists logs and sends to client - tos = append(tos, WithStatusUpdates(func(ss *client.SolveStatus, sj []byte) { - // Log vertex status (build steps starting/completing/cached) - for _, v := range ss.Vertexes { - digestStr := v.Digest.String() - - // Log when a vertex starts - if v.Started != nil && !vertexStarted[digestStr] { - vertexStarted[digestStr] = true - buildLog.write(fmt.Sprintf("[buildkit] %s", v.Name)) - } - - // Log when a vertex completes with cache status - if v.Completed != nil && !vertexCompleted[digestStr] { - vertexCompleted[digestStr] = true - if v.Cached { - buildLog.write(fmt.Sprintf("[buildkit] %s CACHED", v.Name)) - } - } - } - - // Log command output from build steps - for _, log := range ss.Logs { - if log.Data != nil { - lines := strings.Split(string(log.Data), "\n") - for _, line := range lines { - line = strings.TrimRight(line, " \t\r\n") - if strings.TrimSpace(line) != "" { - buildLog.write(line) - } - } - } - } - - // Send raw status to client if connected - if status != nil { - so := new(build_v1alpha.Status) - so.Update().SetBuildkit(sj) - _, err := status.Send(ctx, so) - if err != nil { - b.Log.Warn("error sending status update", "error", err) - } - } - })) - - if status != nil { - so.Update().SetMessage("Calculating build") - _, _ = status.Send(ctx, so) - } - - imgName := mrv.ImageUrl - - // -- build.buildkit span + // runBuildkitBuild is also the saga action's implementation, so the + // span structure (buildkit + locate_artifact bundled together) is + // slightly coarser than the pre-saga code's two separate spans. + // Trace continuity matters more than the extra granularity. bkCtx, bkSpan := buildTracer.Start(ctx, "build.buildkit", - trace.WithAttributes(attribute.String("miren.build.image", imgName))) - res, err := bk.BuildImage(bkCtx, tr, buildStack, name, imgName, tos...) + trace.WithAttributes(attribute.String("miren.build.image", mrv.ImageUrl))) + statusSender := NewRPCStatusSender(status, b.Log) + res, artifactID, finalURL, err := b.runBuildkitBuild(bkCtx, runBuildkitBuildInputs{ + SourceDir: path, + AppName: name, + VersionName: mrv.Version, + ImageURL: mrv.ImageUrl, + BuildStack: buildStack, + AppConfig: ac, + ExistingConfig: existingCfg, + CLIEnvVars: envVars, + }, statusSender, buildLog) if err != nil { bkSpan.RecordError(err) bkSpan.SetStatus(codes.Error, err.Error()) bkSpan.End() - b.Log.Error("error building image", "error", err) - b.sendErrorStatus(ctx, status, "Error building image: %v", err) return nil, err } bkSpan.End() - // Log detection events from stack analysis - for _, event := range res.DetectionEvents { - buildLog.write(fmt.Sprintf("[detect] %s: %s", event.Name, event.Message)) - } - - if res.ManifestDigest == "" { - b.Log.Error("build did not return manifest digest") - b.sendErrorStatus(ctx, status, "Build did not return manifest digest") - return nil, fmt.Errorf("build did not return manifest digest") - } - - // -- build.locate_artifact span - locateCtx, locateSpan := buildTracer.Start(ctx, "build.locate_artifact", - trace.WithAttributes(attribute.String("miren.build.manifest_digest", res.ManifestDigest))) - - var artifact core_v1alpha.Artifact - - err = b.ec.OneAtIndex(locateCtx, entity.String(core_v1alpha.ArtifactManifestDigestId, res.ManifestDigest), &artifact) - if err != nil { - locateSpan.RecordError(err) - locateSpan.SetStatus(codes.Error, err.Error()) - locateSpan.End() - b.Log.Error("error locating artifact by digest", "digest", res.ManifestDigest, "error", err) - return nil, fmt.Errorf("error locating artifact by digest %s: %w", res.ManifestDigest, err) - } - locateSpan.End() - - b.Log.Debug("located stored artifact", "artifact", artifact.ID, "digest", res.ManifestDigest) - - mrv.Artifact = artifact.ID - - // Update ImageUrl to match the artifact we found (which may be reused due to deduplication) - artifactName := strings.TrimPrefix(string(artifact.ID), "artifact/") - mrv.ImageUrl = "cluster.local:5000/" + name + ":" + artifactName - + mrv.Artifact = entity.Id(artifactID) + mrv.ImageUrl = finalURL + artifactName := strings.TrimPrefix(artifactID, "artifact/") b.Log.Debug("build complete", "image", mrv.ImageUrl) procfileServices, err := b.readProcFile(tr) diff --git a/servers/build/build_saga.go b/servers/build/build_saga.go new file mode 100644 index 00000000..0200e195 --- /dev/null +++ b/servers/build/build_saga.go @@ -0,0 +1,733 @@ +package build + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "os" + "path/filepath" + "strings" + "time" + + "github.com/tonistiigi/fsutil" + + "miren.dev/runtime/api/build/build_v1alpha" + "miren.dev/runtime/api/core/core_v1alpha" + "miren.dev/runtime/appconfig" + "miren.dev/runtime/pkg/entity" + ephemeralx "miren.dev/runtime/pkg/ephemeral" + "miren.dev/runtime/pkg/saga" + "miren.dev/runtime/pkg/stackbuild" +) + +// Saga definition + action names. Keeping them as constants makes them +// easy to search for, and lets recovery logs reference stable action +// identifiers across restarts. +const ( + sagaBuildFromTar = "build-from-tar" + actionReceiveTar = "receive-tar" + actionLoadSource = "load-source" + actionGetNextVer = "get-next-version" + actionBuildImage = "build-image" + actionPrepareConfig = "prepare-config" + actionHandleEphemera = "handle-ephemeral" + actionCreateConfigVer = "create-config-version" + actionCreateVersion = "create-version" + actionProvisionAddons = "provision-addons" + actionSetActiveVer = "set-active-version" + actionFinalize = "finalize" +) + +// buildSagaDeps holds the collaborators injected into the saga context. +// Actions retrieve it via saga.Get[*buildSagaDeps](ctx) and call through to +// the inner Builder for real operations (entity writes, buildkit calls, +// log writes, etc.), the StreamRegistry for tar staging, or the +// StatusRegistry to emit live progress/log/error updates to the deploy +// CLI for the duration of one saga execution. +type buildSagaDeps struct { + builder *Builder + streams *StreamRegistry + statuses *StatusRegistry +} + +// receiveTarIn carries the initial inputs the entry point seeds the saga +// with. AppName is mostly used for logging and cache writes; the StreamID +// is the handle that lets us pull bytes out of StreamRegistry. +type receiveTarIn struct { + AppName string `json:"app_name" saga:"app_name"` + StreamID string `json:"stream_id" saga:"stream_id"` +} + +// receiveTarOut publishes the staged source directory so downstream +// actions can read app.toml, run buildkit against it, etc. +type receiveTarOut struct { + SourceDir string `json:"source_dir" saga:"source_dir"` +} + +// receiveTar stages the incoming tar stream to disk and (best-effort) +// updates the source code cache so the next delta upload can skip +// unchanged files. The cache write matches existing BuildFromTar +// behavior — failure logs a warning rather than failing the build, +// since cache is a perf optimization, not correctness. +func receiveTar(ctx context.Context, in receiveTarIn) (receiveTarOut, error) { + deps := saga.Get[*buildSagaDeps](ctx) + status := deps.statuses.SenderFor(in.StreamID) + + status.SendMessage("Reading application data") + + path, err := deps.streams.Stage(in.StreamID) + if err != nil { + status.SendError("Error untaring data: %v", err) + return receiveTarOut{}, fmt.Errorf("staging tar stream %s: %w", in.StreamID, err) + } + + if deps.builder.DataPath != "" { + cache := &sourceCache{ + dataPath: deps.builder.DataPath, + log: deps.builder.Log, + locks: deps.builder.cacheLocks, + } + if err := cache.saveSourceImage(in.AppName, path); err != nil { + deps.builder.Log.Warn("failed to save source code cache", "app", in.AppName, "error", err) + } + } + + status.SendMessage("Launching builder") + return receiveTarOut{SourceDir: path}, nil +} + +func undoReceiveTar(ctx context.Context, in receiveTarIn, _ receiveTarOut) error { + deps := saga.Get[*buildSagaDeps](ctx) + return deps.streams.Cleanup(in.StreamID) +} + +// loadSource reads the staged tree to produce the inputs every later +// action needs: the parsed app.toml (if present), the detected/declared +// BuildStack (with stack detection actually performed when stack=auto so +// we fail fast before launching buildkit), and the Procfile services. +// +// Pure file IO + parsing, no entity writes, so the undo is a no-op. +// Recovery just re-runs Execute against the same staged dir. + +type loadSourceIn struct { + AppName string `json:"app_name" saga:"app_name"` + SourceDir string `json:"source_dir" saga:"source_dir"` +} + +type loadSourceOut struct { + AppConfig *appconfig.AppConfig `json:"app_config,omitempty" saga:"app_config"` + BuildStack BuildStack `json:"build_stack" saga:"build_stack"` + ProcfileServices map[string]string `json:"procfile_services,omitempty" saga:"procfile_services"` +} + +func loadSource(ctx context.Context, in loadSourceIn) (loadSourceOut, error) { + deps := saga.Get[*buildSagaDeps](ctx) + b := deps.builder + + tr, err := fsutil.NewFS(in.SourceDir) + if err != nil { + return loadSourceOut{}, fmt.Errorf("opening source dir %s: %w", in.SourceDir, err) + } + + ac, err := b.loadAppConfig(tr) + if err != nil { + return loadSourceOut{}, fmt.Errorf("loading app config: %w", err) + } + + stack, err := b.detectBuildStack(in.SourceDir, ac, in.AppName, tr) + if err != nil { + return loadSourceOut{}, err + } + + procfile, err := b.readProcFile(tr) + if err != nil { + return loadSourceOut{}, fmt.Errorf("reading procfile: %w", err) + } + + return loadSourceOut{ + AppConfig: ac, + BuildStack: stack, + ProcfileServices: procfile, + }, nil +} + +func undoLoadSource(_ context.Context, _ loadSourceIn, _ loadSourceOut) error { + return nil +} + +// getNextVersion allocates a fresh version id + artifact suffix for the +// build and (idempotently) creates the App entity if this is the very +// first deploy. Mirrors the pre-saga Builder.nextVersion semantics: +// existing App is left alone, current config is loaded for env-var +// carryover, new ids are generated locally and only persisted by later +// actions (createConfigVersion / createAppVersion). +// +// The undo is a no-op. The generated ids are not durable state worth +// freeing, and the App entity might already exist (created by a prior +// deploy) or be needed by other concurrent operations — deleting it on +// build failure is the wrong instinct. + +type getNextVersionIn struct { + AppName string `json:"app_name" saga:"app_name"` +} + +type getNextVersionOut struct { + AppID string `json:"app_id" saga:"app_id"` + VersionName string `json:"version_name" saga:"version_name"` + ArtifactSuffix string `json:"artifact_suffix" saga:"artifact_suffix"` + ImageURL string `json:"image_url" saga:"image_url"` + AdminToken string `json:"admin_token" saga:"admin_token"` + ExistingConfig string `json:"existing_config_json" saga:"existing_config_json"` +} + +func getNextVersion(ctx context.Context, in getNextVersionIn) (getNextVersionOut, error) { + deps := saga.Get[*buildSagaDeps](ctx) + + appRec, mrv, existing, art, err := deps.builder.nextVersion(ctx, in.AppName) + if err != nil { + return getNextVersionOut{}, fmt.Errorf("allocating next version for %s: %w", in.AppName, err) + } + + existingJSON, err := marshalConfigSpec(existing) + if err != nil { + return getNextVersionOut{}, fmt.Errorf("serializing existing config: %w", err) + } + + return getNextVersionOut{ + AppID: string(appRec.ID), + VersionName: mrv.Version, + ArtifactSuffix: art, + ImageURL: mrv.ImageUrl, + AdminToken: mrv.AdminToken, + ExistingConfig: existingJSON, + }, nil +} + +func undoGetNextVersion(_ context.Context, _ getNextVersionIn, _ getNextVersionOut) error { + return nil +} + +// prepareConfig assembles the final ConfigSpec for the new version by +// merging build outputs, app.toml, Procfile, and existing app config, +// then runs every blocking validation (services exist, required vars +// have values, node ports are free, disk references resolve). Pure +// computation + entity reads — no side effects, no undo needed. +// +// Validation failures surface as the saga error and a user-facing +// status update. The pre-saga path called validateNodePorts and +// validateDiskConfigs separately; bundling them with the rest in one +// action keeps the saga DAG simple and matches what the user +// experiences as one logical "config prep" step. + +type prepareConfigIn struct { + AppName string `json:"app_name" saga:"app_name"` + StreamID string `json:"stream_id" saga:"stream_id"` + AppID string `json:"app_id" saga:"app_id"` + BuildResult *BuildResult `json:"build_result,omitempty" saga:"build_result,optional"` + AppConfig *appconfig.AppConfig `json:"app_config,omitempty" saga:"app_config,optional"` + ProcfileServices map[string]string `json:"procfile_services,omitempty" saga:"procfile_services,optional"` + ExistingConfig string `json:"existing_config_json" saga:"existing_config_json"` + CLIEnvVars []*build_v1alpha.EnvironmentVariable `json:"cli_env_vars,omitempty" saga:"cli_env_vars,optional"` +} + +type prepareConfigOut struct { + ConfigSpec string `json:"config_spec_json" saga:"config_spec_json"` +} + +func prepareConfig(ctx context.Context, in prepareConfigIn) (prepareConfigOut, error) { + deps := saga.Get[*buildSagaDeps](ctx) + b := deps.builder + status := deps.statuses.SenderFor(in.StreamID) + + existing, err := unmarshalConfigSpec(in.ExistingConfig) + if err != nil { + return prepareConfigOut{}, fmt.Errorf("deserializing existing config: %w", err) + } + + spec := buildVersionConfig(ConfigInputs{ + BuildResult: in.BuildResult, + AppConfig: in.AppConfig, + ProcfileServices: in.ProcfileServices, + ExistingConfig: existing, + CliEnvVars: in.CLIEnvVars, + }) + + if err := validateServicesExist(spec); err != nil { + status.SendError("%s. See https://miren.md/services", err) + return prepareConfigOut{}, err + } + if err := validateRequiredVars(spec); err != nil { + status.SendError("%s", err) + return prepareConfigOut{}, err + } + if err := validateNodePorts(ctx, b.ec.EAC(), entity.Id(in.AppID), spec); err != nil { + status.SendError("Deploy failed: %v", err) + return prepareConfigOut{}, err + } + if err := validateDiskConfigs(ctx, b.ec.EAC(), spec); err != nil { + status.SendError("Deploy failed: %v", err) + return prepareConfigOut{}, err + } + + specJSON, err := json.Marshal(spec) + if err != nil { + return prepareConfigOut{}, fmt.Errorf("serializing config spec: %w", err) + } + return prepareConfigOut{ConfigSpec: string(specJSON)}, nil +} + +func undoPrepareConfig(_ context.Context, _ prepareConfigIn, _ prepareConfigOut) error { + return nil +} + +// handleEphemeral covers the ephemeral-version-specific bookkeeping: +// validating the label, parsing the TTL, deleting any existing version +// with the same label (replace-on-same-label), and enforcing the per- +// app ephemeral version limit. No-op for non-ephemeral deploys. +// +// The undo is intentionally a no-op even though replace-existing +// deletes versions. Those versions were going to be replaced by the +// new build, and an aborted build doesn't make the user want them +// back — they want to retry with new code. Re-creating deleted +// entities would also race with concurrent reconciliation. + +type handleEphemeralIn struct { + AppName string `json:"app_name" saga:"app_name"` + StreamID string `json:"stream_id" saga:"stream_id"` + AppID string `json:"app_id" saga:"app_id"` + EphemeralLabel string `json:"ephemeral_label,omitempty" saga:"ephemeral_label,optional"` + EphemeralTTL string `json:"ephemeral_ttl,omitempty" saga:"ephemeral_ttl,optional"` +} + +type handleEphemeralOut struct { + ExpiresAt string `json:"ephemeral_expires_at,omitempty" saga:"ephemeral_expires_at"` +} + +func handleEphemeral(ctx context.Context, in handleEphemeralIn) (handleEphemeralOut, error) { + if in.EphemeralLabel == "" { + return handleEphemeralOut{}, nil + } + + deps := saga.Get[*buildSagaDeps](ctx) + b := deps.builder + status := deps.statuses.SenderFor(in.StreamID) + + if err := ephemeralx.ValidateLabel(in.EphemeralLabel); err != nil { + status.SendError("invalid ephemeral label: %v", err) + return handleEphemeralOut{}, fmt.Errorf("invalid ephemeral label: %w", err) + } + + ttl := in.EphemeralTTL + if ttl == "" { + ttl = "24h" + } + ttlDuration, err := time.ParseDuration(ttl) + if err != nil { + status.SendError("invalid ephemeral TTL %q: %v", ttl, err) + return handleEphemeralOut{}, fmt.Errorf("invalid ephemeral TTL %q: %w", ttl, err) + } + if ttlDuration <= 0 { + status.SendError("invalid ephemeral TTL %q: must be greater than 0", ttl) + return handleEphemeralOut{}, fmt.Errorf("invalid ephemeral TTL %q: must be greater than 0", ttl) + } + + if err := ephemeralx.ReplaceExisting(ctx, b.ec.EAC(), entity.Id(in.AppID), in.EphemeralLabel, b.Log); err != nil { + return handleEphemeralOut{}, fmt.Errorf("failed to replace existing ephemeral version %q: %w", in.EphemeralLabel, err) + } + if err := ephemeralx.EnforceLimit(ctx, b.ec.EAC(), entity.Id(in.AppID), ephemeralx.DefaultMaxEphemeral, b.Log); err != nil { + return handleEphemeralOut{}, fmt.Errorf("failed to enforce ephemeral limit: %w", err) + } + + expiresAt := time.Now().Add(ttlDuration).Format(time.RFC3339) + return handleEphemeralOut{ExpiresAt: expiresAt}, nil +} + +func undoHandleEphemeral(_ context.Context, _ handleEphemeralIn, _ handleEphemeralOut) error { + return nil +} + +// createConfigVersion creates the ConfigVersion entity that holds the +// new app's full ConfigSpec. AppVersion references it by ID, so it +// must exist before createAppVersion runs. Failures here surface as +// the saga error; recovery would retry creation with the same name +// and a deterministic-enough ConfigSpec to be idempotent in practice, +// though the entity store's create-if-missing semantics carry the +// guarantee. + +type createConfigVersionIn struct { + AppID string `json:"app_id" saga:"app_id"` + VersionName string `json:"version_name" saga:"version_name"` + ConfigSpec string `json:"config_spec_json" saga:"config_spec_json"` +} + +type createConfigVersionOut struct { + ConfigVersionID string `json:"config_version_id" saga:"config_version_id"` +} + +func createConfigVersion(ctx context.Context, in createConfigVersionIn) (createConfigVersionOut, error) { + deps := saga.Get[*buildSagaDeps](ctx) + b := deps.builder + + spec, err := unmarshalConfigSpec(in.ConfigSpec) + if err != nil { + return createConfigVersionOut{}, fmt.Errorf("deserializing config spec: %w", err) + } + + cv := &core_v1alpha.ConfigVersion{ + App: entity.Id(in.AppID), + Spec: spec, + } + name := in.VersionName + "-cfg" + id, err := b.ec.Create(ctx, name, cv) + if err != nil { + return createConfigVersionOut{}, fmt.Errorf("creating config version %s: %w", name, err) + } + return createConfigVersionOut{ConfigVersionID: string(id)}, nil +} + +func undoCreateConfigVersion(ctx context.Context, _ createConfigVersionIn, out createConfigVersionOut) error { + if out.ConfigVersionID == "" { + return nil + } + deps := saga.Get[*buildSagaDeps](ctx) + if err := deps.builder.ec.Delete(ctx, entity.Id(out.ConfigVersionID)); err != nil { + return fmt.Errorf("deleting config version %s: %w", out.ConfigVersionID, err) + } + return nil +} + +// createVersion creates the AppVersion entity that pins together the +// artifact, image URL, config version, and ephemeral metadata. This is +// the durable "the build succeeded" record — once it exists, the +// downstream activate / addon-provisioning steps have something to +// reference. Failures compensate by deleting it. + +type createVersionIn struct { + AppID string `json:"app_id" saga:"app_id"` + VersionName string `json:"version_name" saga:"version_name"` + FinalImageURL string `json:"final_image_url" saga:"final_image_url"` + ArtifactID string `json:"artifact_id" saga:"artifact_id"` + AdminToken string `json:"admin_token" saga:"admin_token"` + ConfigVersionID string `json:"config_version_id" saga:"config_version_id"` + EphemeralLabel string `json:"ephemeral_label,omitempty" saga:"ephemeral_label,optional"` + EphemeralTTL string `json:"ephemeral_ttl,omitempty" saga:"ephemeral_ttl,optional"` + EphemeralExpiresAt string `json:"ephemeral_expires_at,omitempty" saga:"ephemeral_expires_at,optional"` +} + +type createVersionOut struct { + AppVersionID string `json:"app_version_id" saga:"app_version_id"` +} + +func createVersion(ctx context.Context, in createVersionIn) (createVersionOut, error) { + deps := saga.Get[*buildSagaDeps](ctx) + b := deps.builder + + av := &core_v1alpha.AppVersion{ + App: entity.Id(in.AppID), + Version: in.VersionName, + ImageUrl: in.FinalImageURL, + AdminToken: in.AdminToken, + Artifact: entity.Id(in.ArtifactID), + ConfigVersion: entity.Id(in.ConfigVersionID), + Config: core_v1alpha.Config{}, + } + if in.EphemeralLabel != "" { + av.EphemeralLabel = in.EphemeralLabel + av.EphemeralTtl = in.EphemeralTTL + if in.EphemeralExpiresAt != "" { + if t, err := time.Parse(time.RFC3339, in.EphemeralExpiresAt); err == nil { + av.EphemeralExpiresAt = t + } + } + } + + id, err := b.ec.Create(ctx, in.VersionName, av) + if err != nil { + return createVersionOut{}, fmt.Errorf("creating app version %s: %w", in.VersionName, err) + } + return createVersionOut{AppVersionID: string(id)}, nil +} + +func undoCreateVersion(ctx context.Context, _ createVersionIn, out createVersionOut) error { + if out.AppVersionID == "" { + return nil + } + deps := saga.Get[*buildSagaDeps](ctx) + if err := deps.builder.ec.Delete(ctx, entity.Id(out.AppVersionID)); err != nil { + return fmt.Errorf("deleting app version %s: %w", out.AppVersionID, err) + } + return nil +} + +// provisionAddons calls into the addons client to materialize the +// addons declared in app.toml. Skipped for ephemeral deploys (which +// don't get addons) and when there's no app config at all. The undo +// is a no-op: provisionAddons handles "already attached" gracefully +// on retry, and removing addons created during a build would surprise +// users running concurrent ops against the same app. + +type provisionAddonsIn struct { + AppName string `json:"app_name" saga:"app_name"` + AppConfig *appconfig.AppConfig `json:"app_config,omitempty" saga:"app_config,optional"` + EphemeralLabel string `json:"ephemeral_label,omitempty" saga:"ephemeral_label,optional"` + // AppVersionID is consumed only to anchor this action after + // createVersion in the saga DAG; addons are scoped to the app, + // not the version, so we don't actually need the ID at runtime. + AppVersionID string `json:"app_version_id" saga:"app_version_id"` +} + +type provisionAddonsOut struct { + Done saga.Edge `saga:"addons_provisioned"` +} + +func provisionAddons(ctx context.Context, in provisionAddonsIn) (provisionAddonsOut, error) { + if in.EphemeralLabel != "" || in.AppConfig == nil { + return provisionAddonsOut{}, nil + } + deps := saga.Get[*buildSagaDeps](ctx) + if deps.builder.addonsClient == nil { + return provisionAddonsOut{}, nil + } + if err := deps.builder.provisionAddons(ctx, in.AppName, in.AppConfig); err != nil { + return provisionAddonsOut{}, fmt.Errorf("addon provisioning failed: %w", err) + } + return provisionAddonsOut{}, nil +} + +func undoProvisionAddons(_ context.Context, _ provisionAddonsIn, _ provisionAddonsOut) error { + return nil +} + +// setActiveVersion makes the newly-created AppVersion the app's active +// one. Skipped for ephemeral deploys (their whole point is to coexist +// with the active version, not replace it). Records the previous +// active version so undo can restore it on failure. + +type setActiveVersionIn struct { + AppName string `json:"app_name" saga:"app_name"` + AppVersionID string `json:"app_version_id" saga:"app_version_id"` + EphemeralLabel string `json:"ephemeral_label,omitempty" saga:"ephemeral_label,optional"` + AddonsReady saga.Edge `saga:"addons_provisioned"` +} + +type setActiveVersionOut struct { + PreviousVersionID string `json:"previous_version_id,omitempty" saga:"previous_version_id"` + Skipped bool `json:"skipped" saga:"set_active_skipped"` +} + +func setActiveVersion(ctx context.Context, in setActiveVersionIn) (setActiveVersionOut, error) { + if in.EphemeralLabel != "" { + return setActiveVersionOut{Skipped: true}, nil + } + + deps := saga.Get[*buildSagaDeps](ctx) + b := deps.builder + + app, err := b.appClient.GetByName(ctx, in.AppName) + if err != nil { + return setActiveVersionOut{}, fmt.Errorf("looking up app %s: %w", in.AppName, err) + } + previous := string(app.ActiveVersion) + + if err := b.appClient.SetActiveVersion(ctx, in.AppName, in.AppVersionID); err != nil { + return setActiveVersionOut{}, fmt.Errorf("setting active version on %s: %w", in.AppName, err) + } + return setActiveVersionOut{PreviousVersionID: previous}, nil +} + +func undoSetActiveVersion(ctx context.Context, in setActiveVersionIn, out setActiveVersionOut) error { + if out.Skipped { + return nil + } + deps := saga.Get[*buildSagaDeps](ctx) + if err := deps.builder.appClient.SetActiveVersion(ctx, in.AppName, out.PreviousVersionID); err != nil { + return fmt.Errorf("restoring previous active version on %s: %w", in.AppName, err) + } + return nil +} + +// finalize is the saga's terminal step. It writes the deployment log +// entry, runs the local-storage-migration check (non-ephemeral only), +// and tells the StreamRegistry it's safe to remove the staged source. +// Everything here is best-effort or idempotent; the undo is a no-op +// because once we got this far the build succeeded — there's nothing +// to roll back. + +type finalizeIn struct { + AppName string `json:"app_name" saga:"app_name"` + StreamID string `json:"stream_id" saga:"stream_id"` + AppID string `json:"app_id" saga:"app_id"` + VersionName string `json:"version_name" saga:"version_name"` + ArtifactID string `json:"artifact_id" saga:"artifact_id"` + ConfigSpec string `json:"config_spec_json" saga:"config_spec_json"` + EphemeralLabel string `json:"ephemeral_label,omitempty" saga:"ephemeral_label,optional"` + ActiveReady saga.Edge `saga:"set_active_skipped"` +} + +type finalizeOut struct{} + +func finalize(ctx context.Context, in finalizeIn) (finalizeOut, error) { + deps := saga.Get[*buildSagaDeps](ctx) + b := deps.builder + + if in.EphemeralLabel == "" { + spec, err := unmarshalConfigSpec(in.ConfigSpec) + if err == nil { + // checkLocalStorageMigration takes a *SendStreamClient; its + // signature predates StatusSender. Pass nil for now and + // route the migration-warning UX through the sender once + // the function grows a StatusSender-shaped overload. Saga + // deploys skip that warning until then — accepted as a + // temporary regression on the flagged path. + b.checkLocalStorageMigration(ctx, entity.Id(in.AppID), spec, nil) + } + } + + artifactName := strings.TrimPrefix(in.ArtifactID, "artifact/") + b.logDeployment(ctx, in.AppName, in.VersionName, artifactName) + + if err := deps.streams.Cleanup(in.StreamID); err != nil { + b.Log.Warn("cleanup staged tar", "stream", in.StreamID, "error", err) + } + return finalizeOut{}, nil +} + +func undoFinalize(_ context.Context, _ finalizeIn, _ finalizeOut) error { + return nil +} + +// detectBuildStack assembles the BuildStack the same way buildFromDir +// does (app.toml.build > Dockerfile.miren > auto) and performs the +// supported-stack check for auto so the saga can fail fast rather than +// waste a buildkit launch. Extracted here so the saga action and the +// pre-saga path share one source of truth. +func (b *Builder) detectBuildStack(path string, ac *appconfig.AppConfig, name string, _ fsutil.FS) (BuildStack, error) { + var stack BuildStack + stack.CodeDir = path + + if ac != nil && ac.Build != nil { + stack.OnBuild = ac.Build.OnBuild + stack.Version = ac.Build.Version + stack.AlpineImage = ac.Build.AlpineImage + + if ac.Build.Dockerfile != "" { + stack.Stack = "dockerfile" + stack.Input = ac.Build.Dockerfile + b.Log.Info("using dockerfile from app config", "dockerfile", ac.Build.Dockerfile) + } + } + + if stack.Stack == "" { + // Look on disk rather than through fsutil so test/error paths + // don't have to fabricate an fsutil.FS just to peek for a file. + if _, err := osStat(path, "Dockerfile.miren"); err == nil { + stack.Stack = "dockerfile" + stack.Input = "Dockerfile.miren" + } else { + stack.Stack = "auto" + } + } + + if stack.Stack == "auto" { + detectOpts := stackbuild.BuildOptions{ + Log: b.Log, + Name: name, + OnBuild: stack.OnBuild, + Version: stack.Version, + AlpineImage: stack.AlpineImage, + } + if _, err := stackbuild.DetectStack(stack.CodeDir, detectOpts); err != nil { + b.Log.Error("stack detection failed", "error", err, "app", name, "codeDir", stack.CodeDir) + return stack, fmt.Errorf("no supported stack detected for app %s: %w", name, err) + } + b.Log.Debug("stack detection successful") + } + + return stack, nil +} + +// registerBuildSaga assembles the build-from-tar saga definition with all +// actions wired into the given registry. Mirrors the registerCreateSandboxSaga +// shape from controllers/sandbox/create_saga.go so both sagas register the +// same way at server startup. +func registerBuildSaga( + registry *saga.Registry, + builder *Builder, + streams *StreamRegistry, + statuses *StatusRegistry, + log *slog.Logger, +) error { + deps := &buildSagaDeps{ + builder: builder, + streams: streams, + statuses: statuses, + } + + return saga.Define(sagaBuildFromTar). + Using(deps). + Using(log). + Action(actionReceiveTar, receiveTar).Undo(undoReceiveTar). + Action(actionLoadSource, loadSource).Undo(undoLoadSource). + Action(actionGetNextVer, getNextVersion).Undo(undoGetNextVersion). + Action(actionBuildImage, buildImage).Undo(undoBuildImage). + Action(actionPrepareConfig, prepareConfig).Undo(undoPrepareConfig). + Action(actionHandleEphemera, handleEphemeral).Undo(undoHandleEphemeral). + Action(actionCreateConfigVer, createConfigVersion).Undo(undoCreateConfigVersion). + Action(actionCreateVersion, createVersion).Undo(undoCreateVersion). + Action(actionProvisionAddons, provisionAddons).Undo(undoProvisionAddons). + Action(actionSetActiveVer, setActiveVersion).Undo(undoSetActiveVersion). + Action(actionFinalize, finalize).Undo(undoFinalize). + RegisterTo(registry) +} + +// osStat returns os.Stat for a sub-path under base. Tiny wrapper so the +// stack detection logic stays readable. +func osStat(base, name string) (os.FileInfo, error) { + return os.Stat(filepath.Join(base, name)) +} + +// marshalConfigSpec serializes a ConfigSpec to JSON for transport through +// the saga's input/output map. Returns "" for the zero value so downstream +// actions can branch on emptiness without unmarshaling. +func marshalConfigSpec(spec core_v1alpha.ConfigSpec) (string, error) { + // Treat the zero value as empty to keep the saga log compact for + // first-deploy cases. + zero := core_v1alpha.ConfigSpec{} + if isEmptyConfigSpec(spec, zero) { + return "", nil + } + data, err := json.Marshal(spec) + if err != nil { + return "", err + } + return string(data), nil +} + +// unmarshalConfigSpec is the round-trip partner of marshalConfigSpec. +// Empty string means the caller passed a zero ConfigSpec. +func unmarshalConfigSpec(s string) (core_v1alpha.ConfigSpec, error) { + if s == "" { + return core_v1alpha.ConfigSpec{}, nil + } + var spec core_v1alpha.ConfigSpec + if err := json.Unmarshal([]byte(s), &spec); err != nil { + return core_v1alpha.ConfigSpec{}, err + } + return spec, nil +} + +// isEmptyConfigSpec checks whether spec equals zero by JSON shape. Using +// JSON comparison sidesteps the unexported-field issues reflect.DeepEqual +// hits with generated types. +func isEmptyConfigSpec(spec, zero core_v1alpha.ConfigSpec) bool { + a, err := json.Marshal(spec) + if err != nil { + return false + } + b, err := json.Marshal(zero) + if err != nil { + return false + } + return string(a) == string(b) +} diff --git a/servers/build/build_saga_buildkit.go b/servers/build/build_saga_buildkit.go new file mode 100644 index 00000000..b09dead6 --- /dev/null +++ b/servers/build/build_saga_buildkit.go @@ -0,0 +1,243 @@ +package build + +import ( + "context" + "fmt" + "strings" + + "github.com/moby/buildkit/client" + "github.com/tonistiigi/fsutil" + + "miren.dev/runtime/api/build/build_v1alpha" + "miren.dev/runtime/api/core/core_v1alpha" + "miren.dev/runtime/appconfig" + "miren.dev/runtime/pkg/entity" + "miren.dev/runtime/pkg/saga" +) + +// buildImage runs the actual container build via BuildKit and locates +// the resulting artifact entity. This is the long-running step — for +// real apps it dominates total saga wall time. The undo is a no-op: +// images produced by a failed build linger in the registry but they're +// cheap, idempotent, and the next build either reuses them by digest +// or supersedes them. +// +// Recovery resumes here by re-running the build. That's safe because +// buildkit deduplicates by content digest, so a repeated build of the +// same source produces the same artifact entity (with the same ID). +// The action's output (manifest_digest, artifact_id) round-trips +// through the saga log so prepareConfig and downstream actions see +// stable values across crashes. + +type buildImageIn struct { + AppName string `json:"app_name" saga:"app_name"` + StreamID string `json:"stream_id" saga:"stream_id"` + SourceDir string `json:"source_dir" saga:"source_dir"` + BuildStack BuildStack `json:"build_stack" saga:"build_stack"` + VersionName string `json:"version_name" saga:"version_name"` + ImageURL string `json:"image_url" saga:"image_url"` + // AppConfig is nil when the app has no app.toml; the loadSource + // output omits the key in that case, so the saga input is optional. + AppConfig *appconfig.AppConfig `json:"app_config,omitempty" saga:"app_config,optional"` + // ExistingConfig comes from getNextVersion; empty string is + // the marshaled form of a zero ConfigSpec (first deploy). + ExistingConfig string `json:"existing_config_json" saga:"existing_config_json"` + // CLIEnvVars is an initial input from the deploy CLI's -e flags. + // Empty when the user didn't pass any, so optional. + CLIEnvVars []*build_v1alpha.EnvironmentVariable `json:"cli_env_vars,omitempty" saga:"cli_env_vars,optional"` + AppID string `json:"app_id" saga:"app_id"` +} + +type buildImageOut struct { + ManifestDigest string `json:"manifest_digest" saga:"manifest_digest"` + ArtifactID string `json:"artifact_id" saga:"artifact_id"` + FinalImageURL string `json:"final_image_url" saga:"final_image_url"` + BuildResult *BuildResult `json:"build_result,omitempty" saga:"build_result"` +} + +func buildImage(ctx context.Context, in buildImageIn) (buildImageOut, error) { + deps := saga.Get[*buildSagaDeps](ctx) + b := deps.builder + status := deps.statuses.SenderFor(in.StreamID) + + if b.BuildKit == nil { + status.SendError("BuildKit not configured - ensure server is running with BuildKit enabled") + return buildImageOut{}, fmt.Errorf("buildkit component not configured") + } + + existing, err := unmarshalConfigSpec(in.ExistingConfig) + if err != nil { + return buildImageOut{}, fmt.Errorf("deserializing existing config: %w", err) + } + + buildLog := &buildLogWriter{ + log: b.Log, + writer: b.LogWriter, + entityID: in.AppID, + version: in.VersionName, + } + + res, artifactID, finalURL, err := b.runBuildkitBuild(ctx, runBuildkitBuildInputs{ + SourceDir: in.SourceDir, + AppName: in.AppName, + VersionName: in.VersionName, + ImageURL: in.ImageURL, + BuildStack: in.BuildStack, + AppConfig: in.AppConfig, + ExistingConfig: existing, + CLIEnvVars: in.CLIEnvVars, + }, status, buildLog) + if err != nil { + return buildImageOut{}, err + } + + return buildImageOut{ + ManifestDigest: res.ManifestDigest, + ArtifactID: artifactID, + FinalImageURL: finalURL, + BuildResult: res, + }, nil +} + +func undoBuildImage(_ context.Context, _ buildImageIn, _ buildImageOut) error { + return nil +} + +// runBuildkitBuildInputs bundles everything runBuildkitBuild needs. +// Keeping this as a struct (vs. a long parameter list) makes the +// caller readable and lets future arguments slot in without touching +// every call site. +type runBuildkitBuildInputs struct { + SourceDir string + AppName string + VersionName string + ImageURL string + BuildStack BuildStack + AppConfig *appconfig.AppConfig + ExistingConfig core_v1alpha.ConfigSpec + CLIEnvVars []*build_v1alpha.EnvironmentVariable +} + +// runBuildkitBuild connects to BuildKit, runs the actual image build +// with the right transform options + status callbacks, and locates the +// resulting Artifact entity. Returns the build result, artifact ID, +// and the registry image URL adjusted to match the artifact (which +// may have been reused via content-digest deduplication). +// +// Extracted here so both the pre-saga buildFromDir path and the +// buildImage saga action share a single implementation. status may be +// noop and buildLog may have a noop writer. +func (b *Builder) runBuildkitBuild( + ctx context.Context, + in runBuildkitBuildInputs, + status StatusSender, + buildLog *buildLogWriter, +) (*BuildResult, string, string, error) { + tr, err := fsutil.NewFS(in.SourceDir) + if err != nil { + return nil, "", "", fmt.Errorf("opening source dir %s: %w", in.SourceDir, err) + } + + b.Log.Info("connecting to buildkit daemon") + bkc, err := b.BuildKit.Client(ctx) + if err != nil { + b.Log.Error("failed to get buildkit client", "error", err) + status.SendError("Failed to connect to BuildKit: %v", err) + return nil, "", "", err + } + defer bkc.Close() + + if ci, err := bkc.Info(ctx); err != nil { + b.Log.Error("error getting buildkitd info", "error", err) + } else { + b.Log.Debug("buildkitd info", "version", ci.BuildkitVersion.Version, "rev", ci.BuildkitVersion.Revision) + } + + bk := &Buildkit{Client: bkc, Log: b.Log} + + buildEnvVars := computeBuildEnvVars(in.ExistingConfig.Variables, in.AppConfig, in.CLIEnvVars) + if len(buildEnvVars) > 0 { + b.Log.Info("injecting env vars into build", "count", len(buildEnvVars)) + } + + tos := []TransformOptions{ + WithBuildArg("MIREN_VERSION", in.VersionName), + } + if len(buildEnvVars) > 0 { + tos = append(tos, WithBuildArgs(buildEnvVars)) + } + + // Pass env vars for auto-stack builds. We have to copy the stack so + // we don't mutate the saga input. + stack := in.BuildStack + stack.EnvVars = buildEnvVars + + tos = append(tos, WithPhaseUpdates(func(phase string) { + status.SendPhase(phase) + })) + + vertexStarted := map[string]bool{} + vertexCompleted := map[string]bool{} + tos = append(tos, WithStatusUpdates(func(ss *client.SolveStatus, sj []byte) { + for _, v := range ss.Vertexes { + digestStr := v.Digest.String() + if v.Started != nil && !vertexStarted[digestStr] { + vertexStarted[digestStr] = true + buildLog.write(fmt.Sprintf("[buildkit] %s", v.Name)) + } + if v.Completed != nil && !vertexCompleted[digestStr] { + vertexCompleted[digestStr] = true + if v.Cached { + buildLog.write(fmt.Sprintf("[buildkit] %s CACHED", v.Name)) + } + } + } + for _, log := range ss.Logs { + if log.Data != nil { + lines := strings.Split(string(log.Data), "\n") + for _, line := range lines { + line = strings.TrimRight(line, " \t\r\n") + if strings.TrimSpace(line) != "" { + buildLog.write(line) + } + } + } + } + status.SendBuildkit(sj) + })) + + status.SendMessage("Calculating build") + + res, err := bk.BuildImage(ctx, tr, stack, in.AppName, in.ImageURL, tos...) + if err != nil { + b.Log.Error("error building image", "error", err) + status.SendError("Error building image: %v", err) + return nil, "", "", err + } + + for _, event := range res.DetectionEvents { + buildLog.write(fmt.Sprintf("[detect] %s: %s", event.Name, event.Message)) + } + + if res.ManifestDigest == "" { + b.Log.Error("build did not return manifest digest") + status.SendError("Build did not return manifest digest") + return nil, "", "", fmt.Errorf("build did not return manifest digest") + } + + var artifact core_v1alpha.Artifact + if err := b.ec.OneAtIndex(ctx, + entity.String(core_v1alpha.ArtifactManifestDigestId, res.ManifestDigest), + &artifact); err != nil { + b.Log.Error("error locating artifact by digest", "digest", res.ManifestDigest, "error", err) + return nil, "", "", fmt.Errorf("error locating artifact by digest %s: %w", res.ManifestDigest, err) + } + b.Log.Debug("located stored artifact", "artifact", artifact.ID, "digest", res.ManifestDigest) + + // The artifact may have been reused by digest, so adjust the image + // URL to point at the canonical artifact name in the registry. + artifactName := strings.TrimPrefix(string(artifact.ID), "artifact/") + finalURL := "cluster.local:5000/" + in.AppName + ":" + artifactName + + return res, string(artifact.ID), finalURL, nil +} diff --git a/servers/build/build_saga_test.go b/servers/build/build_saga_test.go new file mode 100644 index 00000000..6b421cc8 --- /dev/null +++ b/servers/build/build_saga_test.go @@ -0,0 +1,366 @@ +package build + +import ( + "context" + "encoding/json" + "errors" + "log/slog" + "os" + "strings" + "testing" + + "miren.dev/runtime/api/app" + "miren.dev/runtime/api/core/core_v1alpha" + "miren.dev/runtime/api/entityserver" + "miren.dev/runtime/api/entityserver/entityserver_v1alpha" + "miren.dev/runtime/pkg/entity" + "miren.dev/runtime/pkg/entity/testutils" + "miren.dev/runtime/pkg/rpc" + "miren.dev/runtime/pkg/saga" +) + +// sagaTestHarness bundles the infrastructure each saga test needs: +// in-memory entity server, a Builder configured against it, a fresh +// StreamRegistry, and a registry+executor wired to the build-from-tar +// definition. Keeps each test self-contained and avoids global state. +type sagaTestHarness struct { + t *testing.T + inmem *testutils.InMemEntityServer + builder *Builder + streams *StreamRegistry + statuses *StatusRegistry + registry *saga.Registry + executor *saga.Executor +} + +func newSagaTestHarness(t *testing.T) *sagaTestHarness { + t.Helper() + log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + + inmem, cleanup := testutils.NewInMemEntityServer(t) + t.Cleanup(cleanup) + + tempDir := t.TempDir() + + rpcClient := rpc.LocalClient(entityserver_v1alpha.AdaptEntityAccess(inmem.Server)) + builder := &Builder{ + Log: log, + EAS: inmem.EAC, + ec: entityserver.NewClient(log, inmem.EAC), + appClient: app.NewClient(log, rpcClient), + TempDir: tempDir, + cacheLocks: newAppLocks(), + } + + streams := NewStreamRegistry(tempDir, log) + statuses := NewStatusRegistry() + + registry := saga.NewRegistry() + // Use a test-mode registration that swaps in a stub buildImage so + // we don't need a real BuildKit component for unit tests. The + // stub returns a synthetic digest/artifact ID so downstream actions + // have something deterministic to work with. The real buildImage + // path is exercised by blackbox tests under iso. + deps := &buildSagaDeps{builder: builder, streams: streams, statuses: statuses} + if err := saga.Define(sagaBuildFromTar). + Using(deps). + Using(log). + Action(actionReceiveTar, receiveTar).Undo(undoReceiveTar). + Action(actionLoadSource, loadSource).Undo(undoLoadSource). + Action(actionGetNextVer, getNextVersion).Undo(undoGetNextVersion). + Action(actionBuildImage, stubBuildImage).Undo(undoBuildImage). + Action(actionPrepareConfig, prepareConfig).Undo(undoPrepareConfig). + Action(actionHandleEphemera, handleEphemeral).Undo(undoHandleEphemeral). + Action(actionCreateConfigVer, createConfigVersion).Undo(undoCreateConfigVersion). + Action(actionCreateVersion, createVersion).Undo(undoCreateVersion). + Action(actionProvisionAddons, provisionAddons).Undo(undoProvisionAddons). + Action(actionSetActiveVer, setActiveVersion).Undo(undoSetActiveVersion). + Action(actionFinalize, finalize).Undo(undoFinalize). + RegisterTo(registry); err != nil { + t.Fatalf("registering build saga: %v", err) + } + + executor := saga.NewExecutor( + saga.NewMemoryStorage(), + saga.WithRegistry(registry), + saga.WithLogger(log), + ) + + return &sagaTestHarness{ + t: t, + inmem: inmem, + builder: builder, + streams: streams, + statuses: statuses, + registry: registry, + executor: executor, + } +} + +// stubBuildImage replaces the real buildImage action in unit tests so +// we don't need a live BuildKit daemon. It returns a deterministic +// digest/artifact ID derived from the version name so the test can +// assert on what downstream actions saw, and it pre-creates the +// matching Artifact entity so any "locate artifact" code path the +// real action would have walked finds something. Real buildImage is +// exercised by blackbox tests under iso. +func stubBuildImage(ctx context.Context, in buildImageIn) (buildImageOut, error) { + deps := saga.Get[*buildSagaDeps](ctx) + + digest := "sha256:" + in.VersionName + "-digest" + // Including VersionName (which itself carries a random suffix from + // nextVersion) keeps the entity name unique when a test runs the + // saga twice against the same in-memory store. + artifactName := in.AppName + "-" + in.VersionName + "-stub" + artifact := &core_v1alpha.Artifact{ + ManifestDigest: digest, + App: entity.Id(in.AppID), + Status: core_v1alpha.ACTIVE, + } + id, err := deps.builder.ec.Create(ctx, artifactName, artifact) + if err != nil { + return buildImageOut{}, err + } + return buildImageOut{ + ManifestDigest: digest, + ArtifactID: string(id), + FinalImageURL: "cluster.local:5000/" + in.AppName + ":" + artifactName, + BuildResult: &BuildResult{ + ManifestDigest: digest, + Entrypoint: "echo hi", + Command: "", + WorkingDir: "/app", + }, + }, nil +} + +// dockerfileTarball returns a tar containing the bare minimum to satisfy +// every saga action up through getNextVersion: a .miren/app.toml at the +// path appconfig.AppConfigPath expects, and a Dockerfile.miren so stack +// detection short-circuits on the dockerfile path without invoking the +// auto-detector (which would fail without a recognized stack). +func dockerfileTarball(t *testing.T) map[string]string { + t.Helper() + return map[string]string{ + ".miren/app.toml": "name = 'demo'\n", + "Dockerfile.miren": "FROM alpine\nCMD echo hi\n", + "Procfile": "web: echo hi\n", + } +} + +func TestBuildSaga_HappyPath_RunsFullPipeline(t *testing.T) { + ctx := context.Background() + + h := newSagaTestHarness(t) + h.streams.Register("stream-1", makeTar(t, dockerfileTarball(t))) + + err := h.executor.Start(sagaBuildFromTar). + Input("app_name", "demo"). + Input("stream_id", "stream-1"). + WithID("test-happy-path"). + Execute(ctx) + if err != nil { + t.Fatalf("saga: %v", err) + } + + // finalize cleans up the staged tar on success — the registry + // should no longer have a path for this ID. + if _, ok := h.streams.StagedPath("stream-1"); ok { + t.Error("expected staged source to be cleaned up by finalize") + } + + // get-next-version creates the App entity on first deploy. + var application core_v1alpha.App + if err := h.builder.ec.Get(ctx, "demo", &application); err != nil { + t.Fatalf("expected app entity 'demo' to exist: %v", err) + } + // set-active-version should have populated the new version. + if application.ActiveVersion == "" { + t.Error("expected app to have an active version after build saga") + } +} + +func TestBuildSaga_ReceiveTar_EmitsStatusUpdates(t *testing.T) { + ctx := context.Background() + + h := newSagaTestHarness(t) + h.streams.Register("stream-status", makeTar(t, dockerfileTarball(t))) + + rec := &recordingSender{} + h.statuses.Register("stream-status", rec) + t.Cleanup(func() { h.statuses.Unregister("stream-status") }) + + err := h.executor.Start(sagaBuildFromTar). + Input("app_name", "demo"). + Input("stream_id", "stream-status"). + WithID("test-status-emit"). + Execute(ctx) + if err != nil { + t.Fatalf("saga: %v", err) + } + + // receive-tar should have emitted both progress messages bracketing + // the staging work. We don't pin exact equality on the entire slice + // (later actions might emit too as the saga grows) — just verify + // these two appeared in order. + wantInOrder := []string{"Reading application data", "Launching builder"} + if !containsInOrder(rec.Messages, wantInOrder) { + t.Errorf("missing expected messages in order: got %v, want subsequence %v", rec.Messages, wantInOrder) + } +} + +// containsInOrder checks that all of `want` appear in `got` in the +// listed order (allowing other entries in between). Used so tests for +// individual actions tolerate additional status messages emitted by +// later actions in the same saga. +func containsInOrder(got, want []string) bool { + i := 0 + for _, g := range got { + if i < len(want) && g == want[i] { + i++ + } + } + return i == len(want) +} + +// TestBuildSaga_FailedActivate_CompensatesEntities verifies the core +// saga guarantee: when a late action fails, prior entity-creating +// actions undo themselves in reverse. set-active-version is the last +// real side-effecting step; swapping it with a failing version should +// trigger create-version and create-config-version compensations, +// leaving the entity store free of orphaned ConfigVersion / AppVersion +// rows. +func TestBuildSaga_FailedActivate_CompensatesEntities(t *testing.T) { + ctx := context.Background() + + log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + inmem, cleanup := testutils.NewInMemEntityServer(t) + t.Cleanup(cleanup) + + rpcClient := rpc.LocalClient(entityserver_v1alpha.AdaptEntityAccess(inmem.Server)) + tempDir := t.TempDir() + builder := &Builder{ + Log: log, + EAS: inmem.EAC, + ec: entityserver.NewClient(log, inmem.EAC), + appClient: app.NewClient(log, rpcClient), + TempDir: tempDir, + cacheLocks: newAppLocks(), + } + streams := NewStreamRegistry(tempDir, log) + statuses := NewStatusRegistry() + streams.Register("stream-fail", makeTar(t, dockerfileTarball(t))) + + // Swap setActiveVersion with a deterministic failure. Same signature, + // same input/output keys — the framework can't tell the difference, + // but the saga compensates everything after createConfigVersion. + failingSetActive := func(ctx context.Context, in setActiveVersionIn) (setActiveVersionOut, error) { + return setActiveVersionOut{}, errSimulatedActivate + } + + registry := saga.NewRegistry() + deps := &buildSagaDeps{builder: builder, streams: streams, statuses: statuses} + if err := saga.Define(sagaBuildFromTar). + Using(deps). + Using(log). + Action(actionReceiveTar, receiveTar).Undo(undoReceiveTar). + Action(actionLoadSource, loadSource).Undo(undoLoadSource). + Action(actionGetNextVer, getNextVersion).Undo(undoGetNextVersion). + Action(actionBuildImage, stubBuildImage).Undo(undoBuildImage). + Action(actionPrepareConfig, prepareConfig).Undo(undoPrepareConfig). + Action(actionHandleEphemera, handleEphemeral).Undo(undoHandleEphemeral). + Action(actionCreateConfigVer, createConfigVersion).Undo(undoCreateConfigVersion). + Action(actionCreateVersion, createVersion).Undo(undoCreateVersion). + Action(actionProvisionAddons, provisionAddons).Undo(undoProvisionAddons). + Action(actionSetActiveVer, failingSetActive).Undo(undoSetActiveVersion). + Action(actionFinalize, finalize).Undo(undoFinalize). + RegisterTo(registry); err != nil { + t.Fatalf("registering build saga: %v", err) + } + + storage := saga.NewMemoryStorage() + executor := saga.NewExecutor( + storage, + saga.WithRegistry(registry), + saga.WithLogger(log), + ) + + err := executor.Start(sagaBuildFromTar). + Input("app_name", "demo"). + Input("stream_id", "stream-fail"). + WithID("test-failed-activate"). + Execute(ctx) + if err == nil { + t.Fatal("saga should have failed when set-active-version errors") + } + + // Pull the saga execution back and verify both create-config-version + // and create-version recorded UndoneAt timestamps. Then pull the + // created entity IDs out of each action's output and confirm the + // entities themselves are actually gone — UndoneAt only says we + // called the undo, not that it succeeded. + exec, err := storage.Get(ctx, "test-failed-activate") + if err != nil { + t.Fatalf("loading saga execution: %v", err) + } + for _, action := range []string{actionCreateConfigVer, actionCreateVersion} { + r := exec.ExecutedActions[action] + if r == nil { + t.Errorf("expected %s to have run before failure", action) + continue + } + if r.UndoneAt == nil { + t.Errorf("expected %s to be undone, but UndoneAt is nil", action) + } + } + + var cvOut createConfigVersionOut + if r := exec.ExecutedActions[actionCreateConfigVer]; r != nil && len(r.Output) > 0 { + if err := json.Unmarshal(r.Output, &cvOut); err == nil && cvOut.ConfigVersionID != "" { + var dummy core_v1alpha.ConfigVersion + if err := builder.ec.GetById(ctx, entity.Id(cvOut.ConfigVersionID), &dummy); err == nil { + t.Errorf("expected ConfigVersion %s to be deleted, but Get succeeded", cvOut.ConfigVersionID) + } + } + } + var avOut createVersionOut + if r := exec.ExecutedActions[actionCreateVersion]; r != nil && len(r.Output) > 0 { + if err := json.Unmarshal(r.Output, &avOut); err == nil && avOut.AppVersionID != "" { + var dummy core_v1alpha.AppVersion + if err := builder.ec.GetById(ctx, entity.Id(avOut.AppVersionID), &dummy); err == nil { + t.Errorf("expected AppVersion %s to be deleted, but Get succeeded", avOut.AppVersionID) + } + } + } + + // Staged source is cleaned up via undoReceiveTar. + if _, ok := streams.StagedPath("stream-fail"); ok { + t.Error("expected staged source to be cleaned up after compensation") + } +} + +var errSimulatedActivate = errors.New("simulated activate failure") + +func TestBuildSaga_FailsWhenStreamUnavailable(t *testing.T) { + ctx := context.Background() + + h := newSagaTestHarness(t) + // Note: no Register call — simulates a crash before the stream arrived, + // or a saga revived after the in-process stream was lost. + + err := h.executor.Start(sagaBuildFromTar). + Input("app_name", "demo"). + Input("stream_id", "stream-gone"). + WithID("test-stream-gone"). + Execute(ctx) + if err == nil { + t.Fatal("saga should have failed when stream is unavailable") + } + // The framework serializes action errors to strings and reconstructs + // them on failure, which breaks errors.Is chains. Match the message + // so we still confirm the stream-unavailable signal made it through. + if !strings.Contains(err.Error(), ErrStreamUnavailable.Error()) { + t.Errorf("expected error to mention %q, got %v", ErrStreamUnavailable.Error(), err) + } + +} diff --git a/servers/build/saga_builder.go b/servers/build/saga_builder.go new file mode 100644 index 00000000..90d1111c --- /dev/null +++ b/servers/build/saga_builder.go @@ -0,0 +1,311 @@ +package build + +import ( + "context" + "fmt" + "log/slog" + + "miren.dev/runtime/api/build/build_v1alpha" + "miren.dev/runtime/api/core/core_v1alpha" + "miren.dev/runtime/pkg/entity" + "miren.dev/runtime/pkg/idgen" + "miren.dev/runtime/pkg/rpc" + "miren.dev/runtime/pkg/rpc/stream" + "miren.dev/runtime/pkg/saga" + "miren.dev/runtime/pkg/tarx" +) + +// buildArgs abstracts the shape both BuildFromTarArgs and +// BuildFromPreparedArgs share. Lets the saga entry points share the +// "pull initial inputs from args" logic. +type buildArgs interface { + EnvVars() []*build_v1alpha.EnvironmentVariable + HasEphemeralLabel() bool + EphemeralLabel() string + HasEphemeralTtl() bool + EphemeralTtl() string +} + +// buildResults is the shared shape of both Results types we set on +// completion. Generated types don't share an interface, so we declare +// the minimum surface here. +type buildResults interface { + SetVersion(string) + SetVersionShortId(string) + SetAccessInfo(**build_v1alpha.AccessInfo) +} + +// SagaBuilder is the saga-backed implementation of the build_v1alpha +// Builder RPC interface. It wraps the existing *Builder (which keeps +// shared collaborators like the entity client, app client, BuildKit +// component, and source cache locks), adds a per-instance saga +// executor + registry, and exposes BuildFromTar / BuildFromPrepared +// implementations that drive the build-from-tar saga end to end. +// +// PrepareUpload and AnalyzeApp delegate straight through to the inner +// Builder. Prepare is just session management, not a build, and +// AnalyzeApp is read-only — neither benefits from saga durability. +type SagaBuilder struct { + inner *Builder + executor *saga.Executor + registry *saga.Registry + streams *StreamRegistry + statuses *StatusRegistry + log *slog.Logger +} + +// NewSagaBuilder constructs a SagaBuilder over an existing Builder. The +// caller still owns the Builder's collaborators (clients, log writer, +// BuildKit component); SagaBuilder layers saga execution + crash +// recovery on top without forking the underlying state. +func NewSagaBuilder(inner *Builder, sagaStorage saga.Storage, log *slog.Logger) *SagaBuilder { + registry := saga.NewRegistry() + executor := saga.NewExecutor( + sagaStorage, + saga.WithRegistry(registry), + saga.WithLogger(log.With("module", "saga-build")), + ) + streams := NewStreamRegistry(inner.TempDir, log) + statuses := NewStatusRegistry() + return &SagaBuilder{ + inner: inner, + executor: executor, + registry: registry, + streams: streams, + statuses: statuses, + log: log.With("module", "saga-builder"), + } +} + +// Init registers the build-from-tar saga definition with the executor +// and recovers any incomplete sagas left over from a previous process. +// Should be called once at server startup before serving RPC requests. +func (s *SagaBuilder) Init(ctx context.Context) error { + if err := registerBuildSaga(s.registry, s.inner, s.streams, s.statuses, s.log); err != nil { + return fmt.Errorf("registering build-from-tar saga: %w", err) + } + if err := s.executor.Recover(ctx); err != nil { + // Recovery failures don't block startup — they're already + // logged inside Recover. New requests can still come in. + s.log.Error("saga recovery completed with errors", "error", err) + } + return nil +} + +// BuildFromTar is the saga-backed implementation of the RPC method. +// Stream registration, status sender registration, and saga start all +// key off the same generated stream ID so the saga's actions can find +// the live reader and status sink by ID alone. +func (s *SagaBuilder) BuildFromTar(ctx context.Context, state *build_v1alpha.BuilderBuildFromTar) error { + args := state.Args() + name := args.Application() + if !rpc.AllowApp(ctx, name) { + return rpc.AppAccessError(ctx, name) + } + + streamID := idgen.Gen("bt") + status := args.Status() + sender := NewRPCStatusSender(status, s.inner.Log) + s.statuses.Register(streamID, sender) + defer s.statuses.Unregister(streamID) + + tardata := args.Tardata() + s.streams.Register(streamID, stream.ToReader(ctx, tardata)) + // streams.Cleanup is idempotent and finalize calls it on success; + // the deferred call covers crash paths between Execute return and + // the next request. + defer func() { + if err := s.streams.Cleanup(streamID); err != nil { + s.log.Warn("post-saga stream cleanup", "stream", streamID, "error", err) + } + }() + + executionID := "build-from-tar-" + streamID + if err := s.startBuild(ctx, executionID, name, streamID, args.EnvVars(), ephemeralFromArgs(args)); err != nil { + return err + } + + return s.populateResults(ctx, executionID, name, state.Results(), ephemeralLabelFromArgs(args)) +} + +// BuildFromPrepared mirrors BuildFromTar but starts from an upload +// session whose source tree is already on disk. The session lookup +// stays out of the saga (it's a pre-flight check, not a step we'd +// want to retry on recovery), then MarkStaged tells the registry the +// path is ready so the saga's receive-tar action returns it directly. +func (s *SagaBuilder) BuildFromPrepared(ctx context.Context, state *build_v1alpha.BuilderBuildFromPrepared) error { + args := state.Args() + sessionID := args.SessionId() + + val, ok := s.inner.sessions.LoadAndDelete(sessionID) + if !ok { + return fmt.Errorf("unknown or expired upload session: %s", sessionID) + } + sess := val.(*buildSession) + sess.cancelFunc() + + name := sess.appName + if !rpc.AllowApp(ctx, name) { + return rpc.AppAccessError(ctx, name) + } + + status := args.Status() + sender := NewRPCStatusSender(status, s.inner.Log) + s.statuses.Register(sessionID, sender) + defer s.statuses.Unregister(sessionID) + + // If the client is sending an incremental tar of changed files, + // extract it into the session dir before the saga starts. This + // matches the pre-saga flow and keeps the receive-tar action + // uniform (it just sees a populated directory). + if td := args.Tardata(); td != nil { + sender.SendMessage("Receiving changed files") + if err := extractTarIntoDir(ctx, td, sess.dir); err != nil { + sender.SendError("Error extracting changed files: %v", err) + return fmt.Errorf("error extracting changed files: %w", err) + } + } + + if s.inner.DataPath != "" { + cache := &sourceCache{dataPath: s.inner.DataPath, log: s.inner.Log, locks: s.inner.cacheLocks} + if err := cache.saveSourceImage(name, sess.dir); err != nil { + s.inner.Log.Warn("failed to save source code cache", "error", err, "app", name) + } + } + + s.streams.MarkStaged(sessionID, sess.dir) + defer func() { + if err := s.streams.Cleanup(sessionID); err != nil { + s.log.Warn("post-saga stream cleanup", "stream", sessionID, "error", err) + } + }() + + executionID := "build-from-prepared-" + sessionID + if err := s.startBuild(ctx, executionID, name, sessionID, args.EnvVars(), ephemeralFromArgs(args)); err != nil { + return err + } + + return s.populateResults(ctx, executionID, name, state.Results(), ephemeralLabelFromArgs(args)) +} + +// PrepareUpload is a session-management op, not a build, so it routes +// straight to the inner Builder. +func (s *SagaBuilder) PrepareUpload(ctx context.Context, state *build_v1alpha.BuilderPrepareUpload) error { + return s.inner.PrepareUpload(ctx, state) +} + +// AnalyzeApp is read-only. +func (s *SagaBuilder) AnalyzeApp(ctx context.Context, state *build_v1alpha.BuilderAnalyzeApp) error { + return s.inner.AnalyzeApp(ctx, state) +} + +// startBuild fans out the initial inputs and runs the saga. +func (s *SagaBuilder) startBuild( + ctx context.Context, + executionID, appName, streamID string, + cliEnvVars []*build_v1alpha.EnvironmentVariable, + eph *ephemeralOpts, +) error { + sb := s.executor.Start(sagaBuildFromTar). + Input("app_name", appName). + Input("stream_id", streamID). + WithID(executionID) + + if len(cliEnvVars) > 0 { + sb = sb.Input("cli_env_vars", cliEnvVars) + } + if eph != nil { + sb = sb.Input("ephemeral_label", eph.label) + if eph.ttl != "" { + sb = sb.Input("ephemeral_ttl", eph.ttl) + } + } + + if err := sb.Execute(ctx); err != nil { + return fmt.Errorf("build saga: %w", err) + } + return nil +} + +// populateResults fishes the saga's outputs back out of storage to +// populate the RPC response. The version short ID and access info +// stay outside the saga since they're cheap reads against entities +// the saga already created. +func (s *SagaBuilder) populateResults( + ctx context.Context, + executionID, appName string, + results buildResults, + ephemeralLabel string, +) error { + out, err := s.executor.ExecutionOutputs(ctx, executionID) + if err != nil { + return fmt.Errorf("loading saga outputs for %s: %w", executionID, err) + } + + var versionName string + if err := out.Get("version_name", &versionName); err != nil { + return fmt.Errorf("reading version_name from saga: %w", err) + } + results.SetVersion(versionName) + + var appVersionID string + _ = out.Get("app_version_id", &appVersionID) + if appVersionID != "" { + if shortID, ok := s.lookupVersionShortID(ctx, appVersionID); ok { + results.SetVersionShortId(shortID) + } + } + + accessInfo := s.inner.getAccessInfo(ctx, appName, ephemeralLabel) + results.SetAccessInfo(&accessInfo) + return nil +} + +// lookupVersionShortID fetches the short ID for a created AppVersion +// entity. Returns false if the entity isn't found or doesn't have a +// short ID attribute — callers should fall back to omitting the field. +func (s *SagaBuilder) lookupVersionShortID(ctx context.Context, appVersionID string) (string, bool) { + var av core_v1alpha.AppVersion + ent, err := s.inner.ec.GetByIdWithEntity(ctx, entity.Id(appVersionID), &av) + if err != nil { + return "", false + } + for _, attr := range ent.Attrs() { + if entity.Id(attr.ID) == entity.DBShortId { + return attr.Value.String(), true + } + } + return "", false +} + +// ephemeralFromArgs builds the ephemeralOpts the saga needs from +// either BuildFromTar or BuildFromPrepared args. Returns nil when no +// label is present — the saga treats nil as "regular deploy". +func ephemeralFromArgs(args buildArgs) *ephemeralOpts { + if !args.HasEphemeralLabel() || args.EphemeralLabel() == "" { + return nil + } + ttl := "24h" + if args.HasEphemeralTtl() && args.EphemeralTtl() != "" { + ttl = args.EphemeralTtl() + } + return &ephemeralOpts{label: args.EphemeralLabel(), ttl: ttl} +} + +// ephemeralLabelFromArgs returns just the label string, "" for +// non-ephemeral deploys. Used for the post-saga access-info lookup +// where the label affects route resolution. +func ephemeralLabelFromArgs(args buildArgs) string { + if !args.HasEphemeralLabel() { + return "" + } + return args.EphemeralLabel() +} + +// extractTarIntoDir reads the RPC tar stream into dir, the same shape +// BuildFromPrepared's pre-saga path uses for incremental uploads. +func extractTarIntoDir(ctx context.Context, td *stream.RecvStreamClient[[]byte], dir string) error { + r := stream.ToReader(ctx, td) + _, err := tarx.TarFS(r, dir) + return err +} diff --git a/servers/build/status_registry.go b/servers/build/status_registry.go new file mode 100644 index 00000000..89e7af47 --- /dev/null +++ b/servers/build/status_registry.go @@ -0,0 +1,174 @@ +package build + +import ( + "context" + "fmt" + "log/slog" + "sync" + + "miren.dev/runtime/api/build/build_v1alpha" + "miren.dev/runtime/pkg/rpc/stream" +) + +// StatusSender lets saga actions emit progress, log lines, and errors +// back to the client that started the build without holding a direct +// reference to the per-request RPC stream. The SagaBuilder wraps a +// concrete stream into a sender and registers it under the stream ID +// for the duration of a saga; actions look it up by the same ID they +// already carry through saga inputs. +// +// During recovery the sender returned by StatusRegistry.SenderFor is a +// noop, which is the right default — nobody's listening for live +// progress when the original CLI invocation is long gone. +type StatusSender interface { + // SendMessage emits a free-form progress message ("Reading + // application data", "Launching builder", etc.). + SendMessage(msg string) + + // SendPhase translates a buildkit phase name into a user-facing + // progress message, matching the mapping the pre-saga path used. + SendPhase(phase string) + + // SendBuildkit emits the raw buildkit JSON status payload so the + // CLI can render live vertex/log output. + SendBuildkit(payload []byte) + + // SendError emits a user-facing error message. Returning the error + // from the action is what surfaces failure through the saga; this + // is the channel for the human-readable explanation. + SendError(format string, args ...any) + + // SendLog emits a structured log entry. Currently only the + // pre-saga "warn" path for local storage migration uses this. + SendLog(level, text string, fields ...*build_v1alpha.LogField) +} + +// noopStatusSender is the zero-cost StatusSender used during recovery +// or any other time no live RPC stream is registered for a given ID. +type noopStatusSender struct{} + +func (noopStatusSender) SendMessage(string) {} +func (noopStatusSender) SendPhase(string) {} +func (noopStatusSender) SendBuildkit([]byte) {} +func (noopStatusSender) SendError(string, ...any) {} +func (noopStatusSender) SendLog(string, string, ...*build_v1alpha.LogField) {} + +// rpcStatusSender adapts the existing per-request SendStreamClient into +// the StatusSender interface so saga actions don't have to know about +// the RPC wiring. Errors on Send are logged and swallowed; a dropped +// client stream shouldn't fail the saga. +type rpcStatusSender struct { + stream *stream.SendStreamClient[*build_v1alpha.Status] + log *slog.Logger +} + +// NewRPCStatusSender wraps an RPC status stream. Passing a nil stream +// returns a sender that no-ops cleanly so callers don't have to special- +// case the "client didn't request status updates" path. +func NewRPCStatusSender(s *stream.SendStreamClient[*build_v1alpha.Status], log *slog.Logger) StatusSender { + if s == nil { + return noopStatusSender{} + } + if log == nil { + log = slog.Default() + } + return &rpcStatusSender{stream: s, log: log} +} + +func (r *rpcStatusSender) send(so *build_v1alpha.Status) { + if _, err := r.stream.Send(context.Background(), so); err != nil { + r.log.Warn("status send", "error", err) + } +} + +func (r *rpcStatusSender) SendMessage(msg string) { + so := new(build_v1alpha.Status) + so.Update().SetMessage(msg) + r.send(so) +} + +func (r *rpcStatusSender) SendPhase(phase string) { + // Mapping matches the pre-saga path's WithPhaseUpdates callback so + // the CLI's progress display behaves identically on both code paths. + var msg string + switch phase { + case "export": + msg = "Registering image" + case "solving": + msg = "Calculating build" + case "solved": + msg = "Building image" + default: + msg = phase + } + r.SendMessage(msg) +} + +func (r *rpcStatusSender) SendBuildkit(payload []byte) { + so := new(build_v1alpha.Status) + so.Update().SetBuildkit(payload) + r.send(so) +} + +func (r *rpcStatusSender) SendError(format string, args ...any) { + so := new(build_v1alpha.Status) + so.Update().SetError(fmt.Sprintf(format, args...)) + r.send(so) +} + +func (r *rpcStatusSender) SendLog(level, text string, fields ...*build_v1alpha.LogField) { + so := new(build_v1alpha.Status) + entry := &build_v1alpha.LogEntry{} + entry.SetLevel(level) + entry.SetText(text) + if len(fields) > 0 { + entry.SetFields(fields) + } + so.Update().SetLog(entry) + r.send(so) +} + +// StatusRegistry maps stream IDs (the same handles StreamRegistry uses) +// to live StatusSenders. Saga actions retrieve a sender by ID without +// caring whether anyone's currently listening — SenderFor returns noop +// when nothing's registered, which gives recovery the right behavior +// for free. +type StatusRegistry struct { + mu sync.Mutex + senders map[string]StatusSender +} + +func NewStatusRegistry() *StatusRegistry { + return &StatusRegistry{senders: make(map[string]StatusSender)} +} + +// Register associates a sender with a stream ID. The SagaBuilder calls +// this before saga.Start and Unregister after, so a sender's lifetime +// is bounded by one saga execution. +func (s *StatusRegistry) Register(streamID string, sender StatusSender) { + if sender == nil { + sender = noopStatusSender{} + } + s.mu.Lock() + defer s.mu.Unlock() + s.senders[streamID] = sender +} + +// Unregister drops the live sender for an ID. Idempotent — calling +// twice (e.g., from both a defer and an error path) is fine. +func (s *StatusRegistry) Unregister(streamID string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.senders, streamID) +} + +// SenderFor returns the registered sender or a noop. Always returns a +// usable StatusSender so callers can avoid nil checks. +func (s *StatusRegistry) SenderFor(streamID string) StatusSender { + s.mu.Lock() + defer s.mu.Unlock() + if sender, ok := s.senders[streamID]; ok { + return sender + } + return noopStatusSender{} +} diff --git a/servers/build/status_registry_test.go b/servers/build/status_registry_test.go new file mode 100644 index 00000000..0c199241 --- /dev/null +++ b/servers/build/status_registry_test.go @@ -0,0 +1,131 @@ +package build + +import ( + "sync" + "testing" + + "miren.dev/runtime/api/build/build_v1alpha" +) + +// recordingSender captures every Send* call so tests can assert on the +// sequence of progress messages a saga action emitted. Thread-safe so +// it works regardless of which goroutine the action runs on. +type recordingSender struct { + mu sync.Mutex + Messages []string + Phases []string + Buildkit [][]byte + Errors []string + Logs []recordedLog +} + +type recordedLog struct { + Level string + Text string + Fields []*build_v1alpha.LogField +} + +func (r *recordingSender) SendMessage(msg string) { + r.mu.Lock() + defer r.mu.Unlock() + r.Messages = append(r.Messages, msg) +} + +func (r *recordingSender) SendPhase(phase string) { + r.mu.Lock() + defer r.mu.Unlock() + r.Phases = append(r.Phases, phase) +} + +func (r *recordingSender) SendBuildkit(payload []byte) { + r.mu.Lock() + defer r.mu.Unlock() + r.Buildkit = append(r.Buildkit, append([]byte(nil), payload...)) +} + +func (r *recordingSender) SendError(format string, args ...any) { + r.mu.Lock() + defer r.mu.Unlock() + r.Errors = append(r.Errors, format) // tests don't need full Sprintf +} + +func (r *recordingSender) SendLog(level, text string, fields ...*build_v1alpha.LogField) { + r.mu.Lock() + defer r.mu.Unlock() + r.Logs = append(r.Logs, recordedLog{Level: level, Text: text, Fields: fields}) +} + +func TestStatusRegistry_UnregisteredIDReturnsNoop(t *testing.T) { + reg := NewStatusRegistry() + // SenderFor must always return a usable sender; the noop is what + // makes the recovery path work without special-casing. + sender := reg.SenderFor("not-registered") + // Calling every method should not panic. + sender.SendMessage("hi") + sender.SendPhase("solving") + sender.SendBuildkit([]byte("x")) + sender.SendError("oops") + sender.SendLog("info", "text") +} + +func TestStatusRegistry_RegisteredSenderReceives(t *testing.T) { + reg := NewStatusRegistry() + rec := &recordingSender{} + reg.Register("s1", rec) + + sender := reg.SenderFor("s1") + sender.SendMessage("hello") + sender.SendPhase("solving") + sender.SendBuildkit([]byte("payload")) + sender.SendError("bad %s", "thing") + + if got, want := rec.Messages, []string{"hello"}; !equalStringSlice(got, want) { + t.Errorf("Messages = %v, want %v", got, want) + } + if got, want := rec.Phases, []string{"solving"}; !equalStringSlice(got, want) { + t.Errorf("Phases = %v, want %v", got, want) + } + if len(rec.Buildkit) != 1 || string(rec.Buildkit[0]) != "payload" { + t.Errorf("Buildkit = %v, want one entry with payload bytes", rec.Buildkit) + } + if len(rec.Errors) != 1 { + t.Errorf("Errors len = %d, want 1", len(rec.Errors)) + } +} + +func TestStatusRegistry_UnregisterReturnsNoop(t *testing.T) { + reg := NewStatusRegistry() + rec := &recordingSender{} + reg.Register("s1", rec) + reg.Unregister("s1") + // Double unregister is fine. + reg.Unregister("s1") + + sender := reg.SenderFor("s1") + sender.SendMessage("after-unregister") + + if len(rec.Messages) != 0 { + t.Errorf("recorder should not have received post-unregister messages, got %v", rec.Messages) + } +} + +func TestStatusRegistry_NilSenderRegistersAsNoop(t *testing.T) { + reg := NewStatusRegistry() + // Passing nil shouldn't blow up callers who didn't construct an + // rpcStatusSender — Register normalizes to noop. + reg.Register("s1", nil) + sender := reg.SenderFor("s1") + sender.SendMessage("should be safe") // no panic +} + +func equalStringSlice(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} diff --git a/servers/build/stream_registry.go b/servers/build/stream_registry.go new file mode 100644 index 00000000..25d0523b --- /dev/null +++ b/servers/build/stream_registry.go @@ -0,0 +1,183 @@ +package build + +import ( + "errors" + "fmt" + "io" + "log/slog" + "os" + "path/filepath" + "sync" + + "miren.dev/runtime/pkg/tarx" +) + +// ErrStreamUnavailable is returned by Stage when neither an active reader +// nor a previously staged path exists for the given stream ID. This is the +// expected failure when recovering a saga whose tar stream was lost in a +// crash before the first stage attempt. +var ErrStreamUnavailable = errors.New("stream unavailable") + +// StreamRegistry bridges non-serializable tar streams into the saga world by +// staging them to durable filesystem paths. The pattern, per RFD-35: +// +// 1. Entry point registers an io.Reader with a generated stream ID and +// starts a saga carrying only the ID. +// 2. The first saga action calls Stage to read the registered stream and +// extract its tar contents to a fresh subdirectory. +// 3. If the saga later resumes after a crash, the stream is gone but the +// staged path is durable — Stage returns the existing path without +// touching any reader. +// +// Stage and Cleanup are idempotent. Cleanup is safe to call from action +// compensation (on failure) and from a terminal saga step (on success); +// running it twice is a no-op. +type StreamRegistry struct { + baseDir string + log *slog.Logger + + mu sync.Mutex + streams map[string]io.Reader + staged map[string]string +} + +// NewStreamRegistry creates a registry that stages streams under baseDir. +// Each registered stream gets its own subdirectory so Cleanup is a single +// RemoveAll. +func NewStreamRegistry(baseDir string, log *slog.Logger) *StreamRegistry { + if log == nil { + log = slog.Default() + } + return &StreamRegistry{ + baseDir: baseDir, + log: log.With("component", "stream-registry"), + streams: make(map[string]io.Reader), + staged: make(map[string]string), + } +} + +// Register associates an io.Reader with a stream ID. The reader is held +// in memory until Stage consumes it or Cleanup drops it. Re-registering an +// ID with an active reader replaces it; re-registering after staging is a +// no-op so recovery flows don't accidentally restart staging. +func (s *StreamRegistry) Register(streamID string, r io.Reader) { + s.mu.Lock() + defer s.mu.Unlock() + + if _, alreadyStaged := s.staged[streamID]; alreadyStaged { + return + } + s.streams[streamID] = r +} + +// Stage extracts the registered stream's tar contents to a per-stream +// subdirectory and returns the path. On recovery, if the original reader +// is gone but a staged path exists, returns that path unchanged. Returns +// ErrStreamUnavailable when neither is available. +func (s *StreamRegistry) Stage(streamID string) (string, error) { + s.mu.Lock() + if path, ok := s.staged[streamID]; ok { + s.mu.Unlock() + s.log.Debug("stream already staged", "id", streamID, "path", path) + return path, nil + } + + reader, ok := s.streams[streamID] + if !ok { + s.mu.Unlock() + return "", fmt.Errorf("%w: %s", ErrStreamUnavailable, streamID) + } + // Remove the reader from the map so two concurrent Stage callers + // can't both consume it. If MkdirTemp fails the reader is + // untouched and we can safely restore it for a retry; TarFS failure + // past that point has consumed bytes and isn't recoverable. + delete(s.streams, streamID) + s.mu.Unlock() + + path, err := os.MkdirTemp(s.baseDir, "stream-"+streamID+"-") + if err != nil { + s.mu.Lock() + s.streams[streamID] = reader + s.mu.Unlock() + return "", fmt.Errorf("creating staging dir for %s: %w", streamID, err) + } + + if _, err := tarx.TarFS(reader, path); err != nil { + // Best-effort cleanup; the partial directory is small and the + // outer saga will fail the action anyway. + _ = os.RemoveAll(path) + return "", fmt.Errorf("extracting tar for %s: %w", streamID, err) + } + + s.mu.Lock() + s.staged[streamID] = path + s.mu.Unlock() + + s.log.Debug("staged stream", "id", streamID, "path", path) + return path, nil +} + +// StagedPath returns the staged directory if Stage previously succeeded +// for this ID. Useful for diagnostics and tests; the saga path itself +// flows through Stage's return value. +func (s *StreamRegistry) StagedPath(streamID string) (string, bool) { + s.mu.Lock() + defer s.mu.Unlock() + path, ok := s.staged[streamID] + return path, ok +} + +// MarkStaged records an externally-prepared path as already staged for +// streamID. Used by BuildFromPrepared, where the source is already on +// disk from a PrepareUpload session and the saga's receive-tar action +// just needs to discover that path. Refuses to overwrite an existing +// entry: re-marking is silently dropped (with a warn-log) so a stale +// session ID can't blow away a legitimately-staged directory and leak +// it on disk. +func (s *StreamRegistry) MarkStaged(streamID, path string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.streams, streamID) + if existing, ok := s.staged[streamID]; ok { + s.log.Warn("MarkStaged called for already-staged id", + "id", streamID, "existing", existing, "new", path) + return + } + s.staged[streamID] = path +} + +// Cleanup removes any staged directory and forgets the stream ID. +// Idempotent: safe to call from both action compensation (failure path) +// and a terminal cleanup action (success path), and safe to call when +// nothing was ever staged. Keeps the registry entry until RemoveAll +// succeeds so a transient filesystem error doesn't leak the path — +// the next call can retry with the same ID. +func (s *StreamRegistry) Cleanup(streamID string) error { + s.mu.Lock() + path, hadStaged := s.staged[streamID] + delete(s.streams, streamID) + s.mu.Unlock() + + if !hadStaged { + return nil + } + + if err := os.RemoveAll(path); err != nil { + return fmt.Errorf("removing staged dir %s: %w", path, err) + } + + s.mu.Lock() + delete(s.staged, streamID) + s.mu.Unlock() + + s.log.Debug("cleaned up staged stream", "id", streamID, "path", path) + return nil +} + +// stagedFileExists is used by tests to confirm the staging directory was +// actually written and later removed. Kept package-private; saga code uses +// the path returned from Stage directly. +func stagedFileExists(dir, name string) bool { + _, err := os.Stat(filepath.Join(dir, name)) + return err == nil +} diff --git a/servers/build/stream_registry_test.go b/servers/build/stream_registry_test.go new file mode 100644 index 00000000..c9be484b --- /dev/null +++ b/servers/build/stream_registry_test.go @@ -0,0 +1,261 @@ +package build + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "errors" + "io" + "strings" + "sync" + "testing" +) + +// makeTar returns an io.Reader producing a gzip'd tar archive (tarx.TarFS +// always decompresses) containing the given files. Parent directories are +// emitted automatically for nested paths so the receiver doesn't need to +// pre-create them. Same shape of data BuildFromTar receives over the wire. +func makeTar(t *testing.T, files map[string]string) io.Reader { + t.Helper() + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + tw := tar.NewWriter(gw) + + seenDirs := map[string]bool{} + emitDir := func(dir string) { + if dir == "" || dir == "." || seenDirs[dir] { + return + } + seenDirs[dir] = true + hdr := &tar.Header{ + Name: dir + "/", + Mode: 0o755, + Typeflag: tar.TypeDir, + } + if err := tw.WriteHeader(hdr); err != nil { + t.Fatalf("tar header for dir %q: %v", dir, err) + } + } + + for name, body := range files { + // Emit each parent directory once so nested file entries can be + // created without ENOENT on os.Create. + if i := strings.LastIndex(name, "/"); i > 0 { + emitDir(name[:i]) + } + hdr := &tar.Header{ + Name: name, + Mode: 0o644, + Size: int64(len(body)), + } + if err := tw.WriteHeader(hdr); err != nil { + t.Fatalf("tar header for %q: %v", name, err) + } + if _, err := tw.Write([]byte(body)); err != nil { + t.Fatalf("tar body for %q: %v", name, err) + } + } + if err := tw.Close(); err != nil { + t.Fatalf("closing tar: %v", err) + } + if err := gw.Close(); err != nil { + t.Fatalf("closing gzip: %v", err) + } + return &buf +} + +func TestStreamRegistry_StageExtractsAndReturnsPath(t *testing.T) { + reg := NewStreamRegistry(t.TempDir(), nil) + reg.Register("s1", makeTar(t, map[string]string{ + "app.toml": "name = 'demo'", + "main.go": "package main", + })) + + path, err := reg.Stage("s1") + if err != nil { + t.Fatalf("Stage: %v", err) + } + if path == "" { + t.Fatal("Stage returned empty path") + } + + if !stagedFileExists(path, "app.toml") { + t.Errorf("expected app.toml at %s", path) + } + if !stagedFileExists(path, "main.go") { + t.Errorf("expected main.go at %s", path) + } +} + +func TestStreamRegistry_StageIsIdempotent(t *testing.T) { + reg := NewStreamRegistry(t.TempDir(), nil) + reg.Register("s1", makeTar(t, map[string]string{"a": "x"})) + + first, err := reg.Stage("s1") + if err != nil { + t.Fatalf("first Stage: %v", err) + } + + // Second call must return the same path without consuming a reader. + // The reader is gone after the first Stage, so a non-idempotent impl + // would either re-stage (changing path) or fail with ErrStreamUnavailable. + second, err := reg.Stage("s1") + if err != nil { + t.Fatalf("second Stage: %v", err) + } + if first != second { + t.Errorf("Stage paths differ: first=%s second=%s", first, second) + } +} + +func TestStreamRegistry_StageRecoveryWithoutStream(t *testing.T) { + // Simulate: caller staged once, then "crashed" — re-staging without + // ever calling Register again must still succeed because the path is + // remembered in-process. (Note: cross-process recovery would require + // persisting the path; that's a future iteration.) + reg := NewStreamRegistry(t.TempDir(), nil) + reg.Register("s1", makeTar(t, map[string]string{"a": "x"})) + staged, err := reg.Stage("s1") + if err != nil { + t.Fatalf("initial Stage: %v", err) + } + + recovered, err := reg.Stage("s1") + if err != nil { + t.Fatalf("recovery Stage: %v", err) + } + if recovered != staged { + t.Errorf("recovery path mismatch: got %s want %s", recovered, staged) + } +} + +func TestStreamRegistry_StageFailsWhenStreamGoneAndUnstaged(t *testing.T) { + reg := NewStreamRegistry(t.TempDir(), nil) + // Never registered. This is the "crash before any stage attempt" case. + _, err := reg.Stage("never-seen") + if err == nil { + t.Fatal("Stage should fail when nothing registered") + } + if !errors.Is(err, ErrStreamUnavailable) { + t.Errorf("expected ErrStreamUnavailable, got %v", err) + } +} + +func TestStreamRegistry_StageFailsOnBadTar(t *testing.T) { + reg := NewStreamRegistry(t.TempDir(), nil) + reg.Register("s1", strings.NewReader("not a tar")) + + _, err := reg.Stage("s1") + if err == nil { + t.Fatal("Stage should fail on malformed tar") + } + + // Failure must not leave the ID staged; a retry should report + // ErrStreamUnavailable rather than a half-extracted directory. + if _, ok := reg.StagedPath("s1"); ok { + t.Error("StagedPath should be empty after Stage failure") + } + _, err = reg.Stage("s1") + if !errors.Is(err, ErrStreamUnavailable) { + t.Errorf("retry after Stage failure: expected ErrStreamUnavailable, got %v", err) + } +} + +func TestStreamRegistry_CleanupRemovesStagedDir(t *testing.T) { + reg := NewStreamRegistry(t.TempDir(), nil) + reg.Register("s1", makeTar(t, map[string]string{"app.toml": "x"})) + path, err := reg.Stage("s1") + if err != nil { + t.Fatalf("Stage: %v", err) + } + + if err := reg.Cleanup("s1"); err != nil { + t.Fatalf("Cleanup: %v", err) + } + if stagedFileExists(path, "app.toml") { + t.Errorf("expected %s to be removed", path) + } + if _, ok := reg.StagedPath("s1"); ok { + t.Error("StagedPath should be empty after Cleanup") + } +} + +func TestStreamRegistry_CleanupIsIdempotent(t *testing.T) { + reg := NewStreamRegistry(t.TempDir(), nil) + reg.Register("s1", makeTar(t, map[string]string{"a": "x"})) + if _, err := reg.Stage("s1"); err != nil { + t.Fatalf("Stage: %v", err) + } + + // First cleanup removes the dir. + if err := reg.Cleanup("s1"); err != nil { + t.Fatalf("first Cleanup: %v", err) + } + // Second cleanup is a no-op — important because both action compensation + // and a terminal saga step might call it. + if err := reg.Cleanup("s1"); err != nil { + t.Fatalf("second Cleanup: %v", err) + } + // Cleanup on a never-staged ID is also fine. + if err := reg.Cleanup("never-seen"); err != nil { + t.Fatalf("cleanup of unknown id: %v", err) + } +} + +func TestStreamRegistry_RegisterAfterStageDoesNotResetState(t *testing.T) { + // A confused caller re-registering after staging shouldn't blow away + // the staged path. Recovery flows might re-register a reader that's + // already been consumed; we just ignore it. + reg := NewStreamRegistry(t.TempDir(), nil) + reg.Register("s1", makeTar(t, map[string]string{"a": "x"})) + first, err := reg.Stage("s1") + if err != nil { + t.Fatalf("Stage: %v", err) + } + + reg.Register("s1", makeTar(t, map[string]string{"b": "y"})) + second, err := reg.Stage("s1") + if err != nil { + t.Fatalf("Stage after re-register: %v", err) + } + if first != second { + t.Errorf("path changed after Register-after-Stage: first=%s second=%s", first, second) + } +} + +func TestStreamRegistry_ConcurrentStreams(t *testing.T) { + reg := NewStreamRegistry(t.TempDir(), nil) + const n = 8 + for i := 0; i < n; i++ { + id := streamID(i) + reg.Register(id, makeTar(t, map[string]string{id: id})) + } + + var wg sync.WaitGroup + paths := make([]string, n) + errs := make([]error, n) + for i := 0; i < n; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + paths[i], errs[i] = reg.Stage(streamID(i)) + }(i) + } + wg.Wait() + + seen := map[string]bool{} + for i, err := range errs { + if err != nil { + t.Errorf("Stage %d: %v", i, err) + continue + } + if seen[paths[i]] { + t.Errorf("duplicate staged path %s for stream %d", paths[i], i) + } + seen[paths[i]] = true + } +} + +func streamID(i int) string { + return "s" + string(rune('a'+i)) +}