Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion components/coordinate/coordinate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,18 @@ func (c *Coordinator) Start(ctx context.Context) error {
appClient := appclient.NewClient(c.Log, loopback)

bs := build.NewBuilder(c.Log, eac, appClient, addonsClient, c.Resolver, c.TempDir, c.LogWriter, c.CloudAuth.DNSHostname, c.BuildKit, c.DataPath)
server.ExposeValue("dev.miren.runtime/build", build_v1alpha.AdaptBuilder(bs))

var buildHandler build_v1alpha.Builder = bs
if labs.Sagas() {
sagaStorage := saga.NewEntityStorage(etcdStore, c.Log)
sagaBuilder := build.NewSagaBuilder(bs, sagaStorage, c.Log)
if err := sagaBuilder.Init(ctx); err != nil {
c.Log.Error("failed to initialize saga builder", "error", err)
return err
}
buildHandler = sagaBuilder
}
server.ExposeValue("dev.miren.runtime/build", build_v1alpha.AdaptBuilder(buildHandler))

ls := logs.NewServer(c.Log, ec, c.Logs)
server.ExposeValue("dev.miren.runtime/logs", app_v1alpha.AdaptLogs(ls))
Expand Down
2 changes: 1 addition & 1 deletion pkg/labs/features.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ features:

- name: sagas
predicate: Sagas
description: Use saga-based crash-recoverable workflows for sandbox lifecycle
description: Use saga-based crash-recoverable workflows
default: false
4 changes: 2 additions & 2 deletions pkg/labs/labs.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions pkg/saga/eac_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,15 @@ func (s *EACStorage) Save(ctx context.Context, exec *Execution) error {
sagaEntity.Encode(),
)

_, err = s.eac.Ensure(ctx, ent.Attrs())
if err != nil {
// Put is an upsert (update-then-create): unlike Ensure, it applies our
// attributes even when the entity already exists. Ensure is create-if-absent
// and would silently drop every save after the first, freezing the saga at
// its initial pending state.
rpcEnt := &es.Entity{}
rpcEnt.SetId(exec.ID)
rpcEnt.SetAttrs(ent.Attrs())

if _, err = s.eac.Put(ctx, rpcEnt); err != nil {
return fmt.Errorf("saving saga entity via EAC: %w", err)
}

Expand Down
13 changes: 9 additions & 4 deletions pkg/saga/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,15 +465,20 @@ func (e *Executor) Recover(ctx context.Context) error {
continue
}

e.log.Info("recovering saga", "saga", exec.DefinitionName, "execution", exec.ID, "status", exec.Status)

def, ok := e.registry.Get(exec.DefinitionName)
if !ok {
e.log.Error("saga definition not found for recovery", "saga", exec.DefinitionName)
recoverErrors = append(recoverErrors, fmt.Errorf("definition %q not found", exec.DefinitionName))
// Storage is shared across executors (e.g. build and sandbox each
// run their own), but ListIncomplete returns every executor's
// sagas. A definition we don't recognize belongs to a different
// executor, which will recover it from its own registry. Skip
// quietly rather than treating it as our recovery error.
e.log.Debug("skipping saga with unregistered definition (owned by another executor)",
"saga", exec.DefinitionName, "execution", exec.ID)
continue
}

e.log.Info("recovering saga", "saga", exec.DefinitionName, "execution", exec.ID, "status", exec.Status)

// Check version compatibility
if def.Version != exec.DefinitionVersion {
e.log.Warn("saga definition version mismatch",
Expand Down
13 changes: 11 additions & 2 deletions pkg/saga/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,25 @@ func (s *EntityStorage) Save(ctx context.Context, exec *Execution) error {
Error: exec.Error,
}

// Create or update the entity
// Create or update the entity. EnsureEntity is create-if-absent and
// does NOT apply our attributes when the entity already exists, so on
// every save after the first we must explicitly replace. Without this,
// the saga record stays frozen at its initial pending state and later
// status/action-progress writes are silently dropped.
ent := entity.New(
entity.DBId, entity.Id(exec.ID),
sagaEntity.Encode(),
)

_, _, err = s.store.EnsureEntity(ctx, ent)
_, created, err := s.store.EnsureEntity(ctx, ent)
if err != nil {
return fmt.Errorf("saving saga entity: %w", err)
}
if !created {
if _, err := s.store.ReplaceEntity(ctx, ent); err != nil {
return fmt.Errorf("updating saga entity: %w", err)
}
}

return nil
}
Expand Down
169 changes: 169 additions & 0 deletions pkg/saga/storage_conformance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package saga

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We picked this up in another PR already I'm pretty sure.


import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"miren.dev/runtime/pkg/entity/testutils"
)

// storageFactory builds a fresh, empty Storage for one test. It registers any
// teardown via t.Cleanup so callers get a clean backend per subtest.
type storageFactory struct {
name string
make func(t *testing.T) Storage
}

// allStorageBackends returns every production Storage implementation behind a
// uniform factory. The whole point of this suite is that every backend must
// satisfy the same contract: the bug in MIR-441 lived in two of these three
// (EntityStorage and EACStorage both used create-if-absent semantics and
// silently dropped every save after the first), while MemoryStorage was
// correct, so memory-backed unit tests stayed green while production froze
// every saga at its initial pending state. Running the same scenarios against
// all backends is what closes that gap.
//
// This suite tests a different question than pkg/entity's Store conformance
// suite, and the two are complementary. The entity suite proves MockStore
// behaves like the real EtcdStore on the underlying write primitives. This
// suite proves saga storage *uses* those primitives correctly (the MIR-441 bug
// was a create-only call where an upsert was needed, which no amount of
// store-level testing would catch). Because the entity suite vouches for the
// mock, the EntityStorage backend here is mock-backed on purpose and we do not
// add a real-etcd backend; the EACStorage backend additionally exercises the
// EntityAccessClient RPC path, which the entity suite does not reach.
func allStorageBackends() []storageFactory {
return []storageFactory{
{
name: "MemoryStorage",
make: func(t *testing.T) Storage {
return NewMemoryStorage()
},
},
{
name: "EntityStorage",
make: func(t *testing.T) Storage {
inmem, cleanup := testutils.NewInMemEntityServer(t)
t.Cleanup(cleanup)
return NewEntityStorage(inmem.Store, testutils.TestLogger(t))
},
},
{
name: "EACStorage",
make: func(t *testing.T) Storage {
inmem, cleanup := testutils.NewInMemEntityServer(t)
t.Cleanup(cleanup)
return NewEACStorage(inmem.EAC, testutils.TestLogger(t))
},
},
}
}

// TestStorageConformance_SaveUpdatesExistingExecution is the direct regression
// for the EnsureEntity/Ensure create-if-absent bug. A saga is saved repeatedly
// as it progresses (pending -> running -> completed, accumulating action
// results). Every save after the first must overwrite the stored state. The
// buggy backends dropped these updates, so Get returned a pending execution
// with no recorded actions even though the saga had completed.
func TestStorageConformance_SaveUpdatesExistingExecution(t *testing.T) {
for _, backend := range allStorageBackends() {
t.Run(backend.name, func(t *testing.T) {
ctx := context.Background()
storage := backend.make(t)

exec := &Execution{
ID: "build-from-prepared-abc123",
DefinitionName: "build-from-tar",
Status: StatusPending,
InitialInputs: map[string]any{"app_name": "demo"},
ExecutedActions: map[string]*ActionResult{},
ExecutionOrder: []string{},
}

// First save: creates the entity.
require.NoError(t, storage.Save(ctx, exec))

// Progress the saga and save again. Under the old create-if-absent
// behavior this save was a silent no-op.
exec.Status = StatusRunning
exec.ExecutedActions["receive-tar"] = &ActionResult{
Output: []byte(`{"source_dir":"/tmp/build"}`),
ExecutedAt: time.Unix(0, 0),
}
exec.ExecutionOrder = []string{"receive-tar"}
require.NoError(t, storage.Save(ctx, exec))

// Complete the saga and save a final time.
exec.Status = StatusCompleted
exec.ExecutedActions["create-version"] = &ActionResult{
Output: []byte(`{"version_name":"demo-v1"}`),
ExecutedAt: time.Unix(0, 0),
}
exec.ExecutionOrder = []string{"receive-tar", "create-version"}
require.NoError(t, storage.Save(ctx, exec))

got, err := storage.Get(ctx, exec.ID)
require.NoError(t, err)

assert.Equal(t, StatusCompleted, got.Status,
"final status must persist; a stuck pending status means saves after the first were dropped")
assert.Len(t, got.ExecutedActions, 2,
"executed actions recorded across saves must all persist")
assert.Equal(t, []string{"receive-tar", "create-version"}, got.ExecutionOrder)

if action, ok := got.ExecutedActions["create-version"]; assert.True(t, ok, "create-version action must persist") {
assert.JSONEq(t, `{"version_name":"demo-v1"}`, string(action.Output))
}
})
}
}

// TestStorageConformance_CompletedExecutionLeavesIncompleteList verifies that a
// saga which has reached a terminal status no longer shows up in
// ListIncomplete. This is the operational consequence of the same bug: when the
// completed save was dropped, the execution stayed pending in storage forever,
// so every process restart re-ran an already-finished saga during recovery.
func TestStorageConformance_CompletedExecutionLeavesIncompleteList(t *testing.T) {
for _, backend := range allStorageBackends() {
t.Run(backend.name, func(t *testing.T) {
ctx := context.Background()
storage := backend.make(t)

exec := &Execution{
ID: "saga-incomplete-check",
DefinitionName: "build-from-tar",
Status: StatusPending,
InitialInputs: map[string]any{},
ExecutedActions: map[string]*ActionResult{},
ExecutionOrder: []string{},
}
require.NoError(t, storage.Save(ctx, exec))

incomplete, err := storage.ListIncomplete(ctx)
require.NoError(t, err)
assert.True(t, containsExecution(incomplete, exec.ID),
"a pending saga must appear in ListIncomplete")

exec.Status = StatusCompleted
require.NoError(t, storage.Save(ctx, exec))

incomplete, err = storage.ListIncomplete(ctx)
require.NoError(t, err)
assert.False(t, containsExecution(incomplete, exec.ID),
"a completed saga must NOT appear in ListIncomplete; if it does, recovery will re-run finished work")
})
}
}

func containsExecution(execs []*Execution, id string) bool {
for _, e := range execs {
if e != nil && e.ID == id {
return true
}
}
return false
}
Loading
Loading