From dfdc45746650828852baa16d1011f6fe324e40f1 Mon Sep 17 00:00:00 2001 From: Ted Kim Date: Thu, 25 Jun 2026 11:56:29 -0400 Subject: [PATCH 1/8] feat(config): declarative function schedulers in volcano-config.yaml (VOL-271) Add schedulers as a first-class declarative resource so they join the declare -> test-local -> promote-to-cloud flow alongside buckets/functions. - manifest: SchedulerManifest nested under functions (name+cron required, optional enabled/payload/regions); validateFunctions relaxed to 'public OR schedulers'; structural-only validation (cron/region deferred to server) - function.Service.UpdateSchedulerByID: in-place full update preserving the scheduler UUID (no delete+recreate) - deploy: reconcileSchedulers diffs by name and creates/updates/leaves unchanged; NON-DESTRUCTIVE (never deletes undeclared/ad-hoc schedulers); idempotent payload (canonical JSON) and region (server-managed when omitted) comparison; Summary + output extended - guard reconcileFunctions against nil Public for schedulers-only entries Reuses the existing cloud scheduler REST API + client, so the same manifest applies to localmode and cloud. --- internal/function/function.go | 14 + internal/output/projectconfig.go | 5 + internal/projectconfig/deploy.go | 197 ++++++++++- internal/projectconfig/deploy_test.go | 418 +++++++++++++++++++++++- internal/projectconfig/manifest.go | 59 +++- internal/projectconfig/manifest_test.go | 91 +++++- 6 files changed, 748 insertions(+), 36 deletions(-) diff --git a/internal/function/function.go b/internal/function/function.go index b22d723..06ed121 100644 --- a/internal/function/function.go +++ b/internal/function/function.go @@ -340,6 +340,20 @@ func (s Service) CreateSchedulerByID(ctx context.Context, functionID uuid.UUID, return scheduler, nil } +// UpdateSchedulerByID updates one scheduler by ID, preserving the scheduler UUID. +func (s Service) UpdateSchedulerByID(ctx context.Context, functionID, schedulerID uuid.UUID, input api.FunctionSchedulerInput) (*apiclient.FunctionScheduler, error) { + authenticated, err := s.sessions.CurrentProject() + if err != nil { + return nil, err + } + + scheduler, err := authenticated.API.UpdateFunctionScheduler(ctx, authenticated.ProjectID, functionID, schedulerID, input) + if err != nil { + return nil, fmt.Errorf("failed to update scheduler: %w", err) + } + return scheduler, nil +} + // EnableScheduler enables one scheduler for a function. func (s Service) EnableScheduler(ctx context.Context, identifier string, schedulerID uuid.UUID) (*apiclient.FunctionScheduler, error) { return s.setSchedulerEnabled(ctx, identifier, schedulerID, true) diff --git a/internal/output/projectconfig.go b/internal/output/projectconfig.go index db13161..4358e9f 100644 --- a/internal/output/projectconfig.go +++ b/internal/output/projectconfig.go @@ -27,4 +27,9 @@ func ProjectConfigDeploySummary(w io.Writer, summary *projectconfig.Summary) { summary.FunctionsUpdated, summary.FunctionsUnchanged, ) + fmt.Fprintf(w, "Schedulers: %d created, %d updated, %d unchanged\n", + summary.SchedulersCreated, + summary.SchedulersUpdated, + summary.SchedulersUnchanged, + ) } diff --git a/internal/projectconfig/deploy.go b/internal/projectconfig/deploy.go index a049d08..16ac38c 100644 --- a/internal/projectconfig/deploy.go +++ b/internal/projectconfig/deploy.go @@ -2,12 +2,16 @@ package projectconfig import ( "context" + "encoding/json" "errors" "fmt" "net/http" + "reflect" "sort" "strings" + "github.com/google/uuid" + "github.com/Kong/volcano-cli/internal/api" "github.com/Kong/volcano-cli/internal/apiclient" apicommon "github.com/Kong/volcano-cli/internal/apiclient/common" @@ -20,15 +24,18 @@ import ( // non-decreasing across reconciliation steps so callers can render a single // "buckets: X created, Y updated, Z unchanged" line per resource type. type Summary struct { - BucketsCreated int - BucketsUpdated int - BucketsUnchanged int - PoliciesCreated int - PoliciesUpdated int - PoliciesDeleted int - PoliciesUnchanged int - FunctionsUpdated int - FunctionsUnchanged int + BucketsCreated int + BucketsUpdated int + BucketsUnchanged int + PoliciesCreated int + PoliciesUpdated int + PoliciesDeleted int + PoliciesUnchanged int + FunctionsUpdated int + FunctionsUnchanged int + SchedulersCreated int + SchedulersUpdated int + SchedulersUnchanged int } // StorageReconciler is the subset of internal/storage.Service that Deploy @@ -50,25 +57,36 @@ type FunctionReconciler interface { UpdateVisibility(ctx context.Context, identifier string, isPublic bool) (*apiclient.Function, error) } +// SchedulerReconciler is the subset of internal/function.Service used to +// reconcile schedulers from a manifest. +type SchedulerReconciler interface { + ListSchedulers(ctx context.Context, identifier string) (*apiclient.Function, *apiclient.FunctionSchedulerListResponse, error) + CreateSchedulerByID(ctx context.Context, functionID uuid.UUID, input api.FunctionSchedulerInput) (*apiclient.FunctionScheduler, error) + UpdateSchedulerByID(ctx context.Context, functionID, schedulerID uuid.UUID, input api.FunctionSchedulerInput) (*apiclient.FunctionScheduler, error) +} + // Service deploys declarative project configuration to the Volcano API. type Service struct { - storage StorageReconciler - functions FunctionReconciler + storage StorageReconciler + functions FunctionReconciler + schedulers SchedulerReconciler } // NewService wires the projectconfig Service against the storage and function // services. func NewService(deps cliruntime.Deps) Service { + fnService := clifunction.NewService(deps) return Service{ - storage: clistorage.NewService(deps), - functions: clifunction.NewService(deps), + storage: clistorage.NewService(deps), + functions: fnService, + schedulers: fnService, } } // NewServiceWithReconcilers returns a Service that uses the supplied // reconcilers. Intended for tests. -func NewServiceWithReconcilers(storage StorageReconciler, functions FunctionReconciler) Service { - return Service{storage: storage, functions: functions} +func NewServiceWithReconcilers(storage StorageReconciler, functions FunctionReconciler, schedulers SchedulerReconciler) Service { + return Service{storage: storage, functions: functions, schedulers: schedulers} } // Deploy reconciles the project state to match the manifest. Buckets and their @@ -93,6 +111,10 @@ func (s Service) Deploy(ctx context.Context, manifest *Manifest) (*Summary, erro if err := s.reconcileFunctions(ctx, manifest.Functions, summary); err != nil { return summary, err } + + if err := s.reconcileSchedulers(ctx, manifest.Functions, summary); err != nil { + return summary, err + } return summary, nil } @@ -243,6 +265,11 @@ func (s Service) reconcileFunctions(ctx context.Context, functions []FunctionMan } for _, target := range functions { + // Skip visibility reconciliation if Public is not set (schedulers-only entry) + if target.Public == nil { + continue + } + fn, ok := byName[target.Name] if !ok { available := make([]string, 0, len(deployed)) @@ -309,3 +336,143 @@ func stringSlicesEqual(a, b []string) bool { } return true } + +func (s Service) reconcileSchedulers(ctx context.Context, functions []FunctionManifest, summary *Summary) error { + for _, fnManifest := range functions { + if len(fnManifest.Schedulers) == 0 { + continue + } + + // Resolve the function and list existing schedulers + fn, listResp, err := s.schedulers.ListSchedulers(ctx, fnManifest.Name) + if err != nil { + return fmt.Errorf("function %q: failed to list schedulers: %w", fnManifest.Name, err) + } + if fn == nil { + return fmt.Errorf("function %q: not found", fnManifest.Name) + } + + // Build map of existing schedulers by Name + existingByName := make(map[string]apiclient.FunctionScheduler) + if listResp != nil { + for _, existing := range listResp.Data { + if existing.Name != nil { + name := *existing.Name + if _, duplicate := existingByName[name]; duplicate { + return fmt.Errorf("function %q: duplicate scheduler name %q exists on server (cannot reconcile unambiguously)", fnManifest.Name, name) + } + existingByName[name] = existing + } + } + } + + // Reconcile each desired scheduler + for _, desired := range fnManifest.Schedulers { + input := buildSchedulerInput(desired) + + existing, found := existingByName[desired.Name] + if !found { + // Create new scheduler + if _, err := s.schedulers.CreateSchedulerByID(ctx, fn.Id, input); err != nil { + return fmt.Errorf("function %q: failed to create scheduler %q: %w", fnManifest.Name, desired.Name, err) + } + summary.SchedulersCreated++ + continue + } + + // Check if update needed + if schedulerNeedsUpdate(existing, desired) { + if existing.Id == nil { + return fmt.Errorf("function %q scheduler %q: existing scheduler has no ID", fnManifest.Name, desired.Name) + } + schedulerID, err := uuid.Parse(existing.Id.String()) + if err != nil { + return fmt.Errorf("function %q scheduler %q: invalid scheduler ID: %w", fnManifest.Name, desired.Name, err) + } + if _, err := s.schedulers.UpdateSchedulerByID(ctx, fn.Id, schedulerID, input); err != nil { + return fmt.Errorf("function %q: failed to update scheduler %q: %w", fnManifest.Name, desired.Name, err) + } + summary.SchedulersUpdated++ + } else { + summary.SchedulersUnchanged++ + } + } + } + return nil +} + +func buildSchedulerInput(manifest SchedulerManifest) api.FunctionSchedulerInput { + return api.FunctionSchedulerInput{ + Name: manifest.Name, + CronExpression: manifest.Cron, + Payload: manifest.Payload, + Regions: manifest.Regions, + Enabled: manifest.Enabled, + } +} + +func schedulerNeedsUpdate(existing apiclient.FunctionScheduler, desired SchedulerManifest) bool { + // Compare cron expression + if existing.CronExpression == nil || *existing.CronExpression != desired.Cron { + return true + } + + // Compare enabled (default true if not set in manifest) + desiredEnabled := true + if desired.Enabled != nil { + desiredEnabled = *desired.Enabled + } + existingEnabled := false + if existing.Enabled != nil { + existingEnabled = *existing.Enabled + } + if existingEnabled != desiredEnabled { + return true + } + + // Compare payload using a JSON-canonical comparison. The manifest payload + // comes from YAML (numbers decode as int), while the server returns JSON + // (numbers decode as float64); reflect.DeepEqual would flag those as + // different every deploy. Marshalling both to JSON normalizes number + // formatting and key order, and treats an omitted payload as the server's + // empty-object default. + var existingPayload map[string]any + if existing.Payload != nil { + existingPayload = *existing.Payload + } + if !payloadEqual(existingPayload, desired.Payload) { + return true + } + + // Compare regions only when the manifest explicitly declares them. When + // regions are omitted the server auto-assigns a deployed region, so a + // comparison against the empty manifest value would report a spurious + // update on every deploy. Treat omitted regions as server-managed. + if len(desired.Regions) > 0 { + var existingRegions []string + if existing.Regions != nil { + existingRegions = *existing.Regions + } + if !stringSlicesEqual(existingRegions, desired.Regions) { + return true + } + } + + return false +} + +// payloadEqual reports whether two scheduler payloads are equivalent. An empty +// or nil payload is treated as equal to the server's default empty object. +// Comparison is done on canonical JSON so YAML ints and server float64s (and +// map key ordering) do not produce false differences. +func payloadEqual(a, b map[string]any) bool { + if len(a) == 0 && len(b) == 0 { + return true + } + aJSON, errA := json.Marshal(a) + bJSON, errB := json.Marshal(b) + if errA != nil || errB != nil { + return reflect.DeepEqual(a, b) + } + return string(aJSON) == string(bJSON) +} diff --git a/internal/projectconfig/deploy_test.go b/internal/projectconfig/deploy_test.go index ce9c1b3..6e0db36 100644 --- a/internal/projectconfig/deploy_test.go +++ b/internal/projectconfig/deploy_test.go @@ -196,7 +196,7 @@ func (f *fakeFunctions) UpdateVisibility(_ context.Context, identifier string, i func TestDeployCreatesMissingBucketAndPolicies(t *testing.T) { storage := newFakeStorage() functions := &fakeFunctions{} - svc := NewServiceWithReconcilers(storage, functions) + svc := NewServiceWithReconcilers(storage, functions, newFakeSchedulers()) limit := int64(2048) manifest := &Manifest{ @@ -243,7 +243,7 @@ func TestDeployUpdatesBucketWhenMimeTypesDiffer(t *testing.T) { AllowedMimeTypes: &[]string{"image/png"}, } - svc := NewServiceWithReconcilers(storage, &fakeFunctions{}) + svc := NewServiceWithReconcilers(storage, &fakeFunctions{}, newFakeSchedulers()) newLimit := int64(1024) manifest := &Manifest{ Version: 1, @@ -276,7 +276,7 @@ func TestDeployLeavesBucketUnchangedWhenFieldsMatch(t *testing.T) { AllowedMimeTypes: &[]string{"image/png"}, } - svc := NewServiceWithReconcilers(storage, &fakeFunctions{}) + svc := NewServiceWithReconcilers(storage, &fakeFunctions{}, newFakeSchedulers()) manifest := &Manifest{ Version: 1, Buckets: []BucketManifest{{ @@ -305,7 +305,7 @@ func TestDeployRecreatesPolicyWhenDefinitionChanges(t *testing.T) { Definition: "true", }} - svc := NewServiceWithReconcilers(storage, &fakeFunctions{}) + svc := NewServiceWithReconcilers(storage, &fakeFunctions{}, newFakeSchedulers()) manifest := &Manifest{ Version: 1, Buckets: []BucketManifest{{ @@ -340,7 +340,7 @@ func TestDeployDeletesPolicyMissingFromManifest(t *testing.T) { Definition: "false", }} - svc := NewServiceWithReconcilers(storage, &fakeFunctions{}) + svc := NewServiceWithReconcilers(storage, &fakeFunctions{}, newFakeSchedulers()) manifest := &Manifest{ Version: 1, Buckets: []BucketManifest{{Name: "uploads"}}, @@ -361,7 +361,7 @@ func TestDeployUpdatesFunctionVisibility(t *testing.T) { {Id: uuid.New(), Name: "world", IsPublic: true}, }, } - svc := NewServiceWithReconcilers(newFakeStorage(), functions) + svc := NewServiceWithReconcilers(newFakeStorage(), functions, newFakeSchedulers()) pub := true stay := true @@ -390,7 +390,7 @@ func TestDeployFunctionMissingReturnsAvailableList(t *testing.T) { {Id: uuid.New(), Name: "world"}, }, } - svc := NewServiceWithReconcilers(newFakeStorage(), functions) + svc := NewServiceWithReconcilers(newFakeStorage(), functions, newFakeSchedulers()) pub := true manifest := &Manifest{ Version: 1, @@ -404,7 +404,7 @@ func TestDeployFunctionMissingReturnsAvailableList(t *testing.T) { } func TestDeployFunctionMissingNoFunctions(t *testing.T) { - svc := NewServiceWithReconcilers(newFakeStorage(), &fakeFunctions{}) + svc := NewServiceWithReconcilers(newFakeStorage(), &fakeFunctions{}, newFakeSchedulers()) pub := true manifest := &Manifest{ Version: 1, @@ -419,7 +419,7 @@ func TestDeployFunctionMissingNoFunctions(t *testing.T) { func TestDeployBucketFetchErrorPropagates(t *testing.T) { storage := newFakeStorage() storage.getBucketErr = errors.New("boom") - svc := NewServiceWithReconcilers(storage, &fakeFunctions{}) + svc := NewServiceWithReconcilers(storage, &fakeFunctions{}, newFakeSchedulers()) manifest := &Manifest{ Version: 1, @@ -431,7 +431,7 @@ func TestDeployBucketFetchErrorPropagates(t *testing.T) { } func TestDeployNilManifest(t *testing.T) { - svc := NewServiceWithReconcilers(newFakeStorage(), &fakeFunctions{}) + svc := NewServiceWithReconcilers(newFakeStorage(), &fakeFunctions{}, newFakeSchedulers()) _, err := svc.Deploy(context.Background(), nil) require.Error(t, err) assert.Contains(t, err.Error(), "manifest is required") @@ -452,7 +452,7 @@ func TestDeployRollsBackPolicyWhenRecreateFails(t *testing.T) { // Fail the new-definition create; let the rollback create succeed. storage.createPolicyErrs = []error{errors.New("boom"), nil} - svc := NewServiceWithReconcilers(storage, &fakeFunctions{}) + svc := NewServiceWithReconcilers(storage, &fakeFunctions{}, newFakeSchedulers()) manifest := &Manifest{ Version: 1, Buckets: []BucketManifest{{ @@ -490,7 +490,7 @@ func TestDeployPolicyRollbackFailureSurfacesBothErrors(t *testing.T) { }} storage.createPolicyErrs = []error{errors.New("create failed"), errors.New("rollback failed")} - svc := NewServiceWithReconcilers(storage, &fakeFunctions{}) + svc := NewServiceWithReconcilers(storage, &fakeFunctions{}, newFakeSchedulers()) manifest := &Manifest{ Version: 1, Buckets: []BucketManifest{{ @@ -550,7 +550,7 @@ func TestDeployListsFunctionsAcrossMultiplePages(t *testing.T) { {{Id: secondID, Name: "second", IsPublic: false}}, }, } - svc := NewServiceWithReconcilers(newFakeStorage(), functions) + svc := NewServiceWithReconcilers(newFakeStorage(), functions, newFakeSchedulers()) pub := true manifest := &Manifest{ @@ -580,7 +580,7 @@ func TestDeployLeavesBucketUnchangedWhenMimeTypesDifferInOrder(t *testing.T) { AllowedMimeTypes: &[]string{"image/jpeg", "image/png"}, } - svc := NewServiceWithReconcilers(storage, &fakeFunctions{}) + svc := NewServiceWithReconcilers(storage, &fakeFunctions{}, newFakeSchedulers()) manifest := &Manifest{ Version: 1, Buckets: []BucketManifest{{ @@ -595,3 +595,393 @@ func TestDeployLeavesBucketUnchangedWhenMimeTypesDifferInOrder(t *testing.T) { assert.Equal(t, 1, summary.BucketsUnchanged) assert.Empty(t, storage.updateBuckets, "MIME types differing only in order must not trigger an update") } + +type fakeSchedulers struct { + functions map[string]*apiclient.Function + schedulers map[string][]apiclient.FunctionScheduler + createdCalls []schedulerCreateCall + updatedCalls []schedulerUpdateCall + listErr error + createErr error + updateErr error + duplicateNames map[string]bool // function names with duplicate scheduler names +} + +type schedulerCreateCall struct { + FunctionID uuid.UUID + Input api.FunctionSchedulerInput +} + +type schedulerUpdateCall struct { + FunctionID uuid.UUID + SchedulerID uuid.UUID + Input api.FunctionSchedulerInput +} + +func newFakeSchedulers() *fakeSchedulers { + return &fakeSchedulers{ + functions: make(map[string]*apiclient.Function), + schedulers: make(map[string][]apiclient.FunctionScheduler), + duplicateNames: make(map[string]bool), + } +} + +func (f *fakeSchedulers) ListSchedulers(_ context.Context, identifier string) (*apiclient.Function, *apiclient.FunctionSchedulerListResponse, error) { + if f.listErr != nil { + return nil, nil, f.listErr + } + fn, ok := f.functions[identifier] + if !ok { + return nil, nil, api.ErrNotFound + } + schedulers := f.schedulers[identifier] + // Simulate duplicate names on server if marked + if f.duplicateNames[identifier] { + // Already has duplicates in the slice + } + resp := &apiclient.FunctionSchedulerListResponse{ + Data: append([]apiclient.FunctionScheduler(nil), schedulers...), + } + return fn, resp, nil +} + +func (f *fakeSchedulers) CreateSchedulerByID(_ context.Context, functionID uuid.UUID, input api.FunctionSchedulerInput) (*apiclient.FunctionScheduler, error) { + f.createdCalls = append(f.createdCalls, schedulerCreateCall{FunctionID: functionID, Input: input}) + if f.createErr != nil { + return nil, f.createErr + } + schedulerID := uuid.New() + scheduler := apiclient.FunctionScheduler{ + Id: &schedulerID, + FunctionId: &functionID, + Name: &input.Name, + CronExpression: &input.CronExpression, + Enabled: input.Enabled, + } + if input.Payload != nil { + payload := make(map[string]interface{}) + for k, v := range input.Payload { + payload[k] = v + } + scheduler.Payload = &payload + } + if len(input.Regions) > 0 { + regions := append([]string(nil), input.Regions...) + scheduler.Regions = ®ions + } + // Store by function name (need to find it) + for name, fn := range f.functions { + if fn.Id == functionID { + f.schedulers[name] = append(f.schedulers[name], scheduler) + break + } + } + return &scheduler, nil +} + +func (f *fakeSchedulers) UpdateSchedulerByID(_ context.Context, functionID, schedulerID uuid.UUID, input api.FunctionSchedulerInput) (*apiclient.FunctionScheduler, error) { + f.updatedCalls = append(f.updatedCalls, schedulerUpdateCall{FunctionID: functionID, SchedulerID: schedulerID, Input: input}) + if f.updateErr != nil { + return nil, f.updateErr + } + // Find and update the scheduler + for name, fn := range f.functions { + if fn.Id == functionID { + for i := range f.schedulers[name] { + if f.schedulers[name][i].Id != nil && *f.schedulers[name][i].Id == schedulerID { + f.schedulers[name][i].Name = &input.Name + f.schedulers[name][i].CronExpression = &input.CronExpression + f.schedulers[name][i].Enabled = input.Enabled + if input.Payload != nil { + payload := make(map[string]interface{}) + for k, v := range input.Payload { + payload[k] = v + } + f.schedulers[name][i].Payload = &payload + } else { + f.schedulers[name][i].Payload = nil + } + if len(input.Regions) > 0 { + regions := append([]string(nil), input.Regions...) + f.schedulers[name][i].Regions = ®ions + } else { + f.schedulers[name][i].Regions = nil + } + return &f.schedulers[name][i], nil + } + } + break + } + } + return nil, fmt.Errorf("scheduler not found") +} + +func TestReconcileSchedulersCreatesNew(t *testing.T) { + functionID := uuid.New() + schedulers := newFakeSchedulers() + schedulers.functions["hello"] = &apiclient.Function{ + Id: functionID, + Name: "hello", + } + + svc := NewServiceWithReconcilers(newFakeStorage(), &fakeFunctions{}, schedulers) + manifest := &Manifest{ + Version: 1, + Functions: []FunctionManifest{{ + Name: "hello", + Schedulers: []SchedulerManifest{{ + Name: "daily", + Cron: "0 0 * * *", + }}, + }}, + } + + summary, err := svc.Deploy(context.Background(), manifest) + require.NoError(t, err) + assert.Equal(t, 1, summary.SchedulersCreated) + assert.Equal(t, 0, summary.SchedulersUpdated) + assert.Equal(t, 0, summary.SchedulersUnchanged) + + require.Len(t, schedulers.createdCalls, 1) + assert.Equal(t, functionID, schedulers.createdCalls[0].FunctionID) + assert.Equal(t, "daily", schedulers.createdCalls[0].Input.Name) + assert.Equal(t, "0 0 * * *", schedulers.createdCalls[0].Input.CronExpression) +} + +func TestReconcileSchedulersUpdatesChanged(t *testing.T) { + functionID := uuid.New() + schedulerID := uuid.New() + schedulers := newFakeSchedulers() + schedulers.functions["hello"] = &apiclient.Function{ + Id: functionID, + Name: "hello", + } + cron := "0 0 * * *" + enabled := true + schedulers.schedulers["hello"] = []apiclient.FunctionScheduler{{ + Id: &schedulerID, + FunctionId: &functionID, + Name: strPtr("daily"), + CronExpression: &cron, + Enabled: &enabled, + }} + + svc := NewServiceWithReconcilers(newFakeStorage(), &fakeFunctions{}, schedulers) + newCron := "0 12 * * *" // Changed time + manifest := &Manifest{ + Version: 1, + Functions: []FunctionManifest{{ + Name: "hello", + Schedulers: []SchedulerManifest{{ + Name: "daily", + Cron: newCron, + }}, + }}, + } + + summary, err := svc.Deploy(context.Background(), manifest) + require.NoError(t, err) + assert.Equal(t, 0, summary.SchedulersCreated) + assert.Equal(t, 1, summary.SchedulersUpdated) + assert.Equal(t, 0, summary.SchedulersUnchanged) + + require.Len(t, schedulers.updatedCalls, 1) + assert.Equal(t, functionID, schedulers.updatedCalls[0].FunctionID) + assert.Equal(t, schedulerID, schedulers.updatedCalls[0].SchedulerID) + assert.Equal(t, "daily", schedulers.updatedCalls[0].Input.Name) + assert.Equal(t, newCron, schedulers.updatedCalls[0].Input.CronExpression) +} + +func TestReconcileSchedulersUnchanged(t *testing.T) { + functionID := uuid.New() + schedulerID := uuid.New() + schedulers := newFakeSchedulers() + schedulers.functions["hello"] = &apiclient.Function{ + Id: functionID, + Name: "hello", + } + cron := "0 0 * * *" + enabled := true + schedulers.schedulers["hello"] = []apiclient.FunctionScheduler{{ + Id: &schedulerID, + FunctionId: &functionID, + Name: strPtr("daily"), + CronExpression: &cron, + Enabled: &enabled, + }} + + svc := NewServiceWithReconcilers(newFakeStorage(), &fakeFunctions{}, schedulers) + manifest := &Manifest{ + Version: 1, + Functions: []FunctionManifest{{ + Name: "hello", + Schedulers: []SchedulerManifest{{ + Name: "daily", + Cron: "0 0 * * *", + Enabled: &enabled, + }}, + }}, + } + + summary, err := svc.Deploy(context.Background(), manifest) + require.NoError(t, err) + assert.Equal(t, 0, summary.SchedulersCreated) + assert.Equal(t, 0, summary.SchedulersUpdated) + assert.Equal(t, 1, summary.SchedulersUnchanged) + + assert.Empty(t, schedulers.createdCalls) + assert.Empty(t, schedulers.updatedCalls) +} + +func TestReconcileSchedulersDoesNotDeleteUndeclared(t *testing.T) { + functionID := uuid.New() + schedulerID := uuid.New() + schedulers := newFakeSchedulers() + schedulers.functions["hello"] = &apiclient.Function{ + Id: functionID, + Name: "hello", + } + cron := "0 0 * * *" + enabled := true + // Server has an existing scheduler not in manifest + schedulers.schedulers["hello"] = []apiclient.FunctionScheduler{{ + Id: &schedulerID, + FunctionId: &functionID, + Name: strPtr("adhoc"), + CronExpression: &cron, + Enabled: &enabled, + }} + + svc := NewServiceWithReconcilers(newFakeStorage(), &fakeFunctions{}, schedulers) + // Manifest declares a different scheduler + manifest := &Manifest{ + Version: 1, + Functions: []FunctionManifest{{ + Name: "hello", + Schedulers: []SchedulerManifest{{ + Name: "daily", + Cron: "0 12 * * *", + }}, + }}, + } + + summary, err := svc.Deploy(context.Background(), manifest) + require.NoError(t, err) + assert.Equal(t, 1, summary.SchedulersCreated) // Creates "daily" + assert.Equal(t, 0, summary.SchedulersUpdated) + assert.Equal(t, 0, summary.SchedulersUnchanged) + + // Verify "adhoc" was not deleted (still in fake storage) + assert.Len(t, schedulers.schedulers["hello"], 2, "undeclared scheduler should not be deleted") +} + +func TestReconcileSchedulersDuplicateNameError(t *testing.T) { + functionID := uuid.New() + schedulerID1 := uuid.New() + schedulerID2 := uuid.New() + schedulers := newFakeSchedulers() + schedulers.functions["hello"] = &apiclient.Function{ + Id: functionID, + Name: "hello", + } + cron := "0 0 * * *" + enabled := true + // Server has duplicate scheduler names + name := "daily" + schedulers.schedulers["hello"] = []apiclient.FunctionScheduler{ + { + Id: &schedulerID1, + FunctionId: &functionID, + Name: &name, + CronExpression: &cron, + Enabled: &enabled, + }, + { + Id: &schedulerID2, + FunctionId: &functionID, + Name: &name, + CronExpression: &cron, + Enabled: &enabled, + }, + } + schedulers.duplicateNames["hello"] = true + + svc := NewServiceWithReconcilers(newFakeStorage(), &fakeFunctions{}, schedulers) + manifest := &Manifest{ + Version: 1, + Functions: []FunctionManifest{{ + Name: "hello", + Schedulers: []SchedulerManifest{{ + Name: "daily", + Cron: "0 0 * * *", + }}, + }}, + } + + _, err := svc.Deploy(context.Background(), manifest) + require.Error(t, err) + assert.Contains(t, err.Error(), "duplicate scheduler name") + assert.Contains(t, err.Error(), "daily") +} + +func strPtr(s string) *string { + return &s +} + +func TestSchedulerNeedsUpdateIdempotency(t *testing.T) { + cron := "0 9 * * *" + enabled := true + + base := apiclient.FunctionScheduler{ + CronExpression: &cron, + Enabled: &enabled, + } + + t.Run("omitted regions are server-managed", func(t *testing.T) { + existing := base + regions := []string{"us-east-1"} + existing.Regions = ®ions + desired := SchedulerManifest{Name: "daily", Cron: cron} // regions omitted + if schedulerNeedsUpdate(existing, desired) { + t.Fatalf("expected no update when manifest omits regions (server-managed)") + } + }) + + t.Run("payload int equals server float64", func(t *testing.T) { + existing := base + p := map[string]any{"count": float64(5)} + existing.Payload = &p + desired := SchedulerManifest{Name: "daily", Cron: cron, Payload: map[string]any{"count": 5}} + if schedulerNeedsUpdate(existing, desired) { + t.Fatalf("expected no update for numerically equivalent payload") + } + }) + + t.Run("omitted payload equals server empty object", func(t *testing.T) { + existing := base + empty := map[string]any{} + existing.Payload = &empty + desired := SchedulerManifest{Name: "daily", Cron: cron} // payload omitted + if schedulerNeedsUpdate(existing, desired) { + t.Fatalf("expected no update when payload omitted") + } + }) + + t.Run("cron change triggers update", func(t *testing.T) { + existing := base + desired := SchedulerManifest{Name: "daily", Cron: "0 10 * * *"} + if !schedulerNeedsUpdate(existing, desired) { + t.Fatalf("expected update when cron differs") + } + }) + + t.Run("explicit region mismatch triggers update", func(t *testing.T) { + existing := base + regions := []string{"us-east-1"} + existing.Regions = ®ions + desired := SchedulerManifest{Name: "daily", Cron: cron, Regions: []string{"eu-west-1"}} + if !schedulerNeedsUpdate(existing, desired) { + t.Fatalf("expected update when explicit regions differ") + } + }) +} diff --git a/internal/projectconfig/manifest.go b/internal/projectconfig/manifest.go index dfbbf7d..7d95ed1 100644 --- a/internal/projectconfig/manifest.go +++ b/internal/projectconfig/manifest.go @@ -47,8 +47,18 @@ type PolicyManifest struct { // FunctionManifest declares the desired visibility for one deployed function. type FunctionManifest struct { - Name string `yaml:"name"` - Public *bool `yaml:"public,omitempty"` + Name string `yaml:"name"` + Public *bool `yaml:"public,omitempty"` + Schedulers []SchedulerManifest `yaml:"schedulers,omitempty"` +} + +// SchedulerManifest declares one scheduler attached to a function. +type SchedulerManifest struct { + Name string `yaml:"name"` + Cron string `yaml:"cron"` + Enabled *bool `yaml:"enabled,omitempty"` + Payload map[string]any `yaml:"payload,omitempty"` + Regions []string `yaml:"regions,omitempty"` } // Load reads, parses, and validates a manifest from disk. The returned path is @@ -125,7 +135,10 @@ func (m *Manifest) Validate() error { if err := validateBuckets(m.Buckets); err != nil { return err } - return validateFunctions(m.Functions) + if err := validateFunctions(m.Functions); err != nil { + return err + } + return validateAllSchedulers(m.Functions) } func validateBuckets(buckets []BucketManifest) error { @@ -198,9 +211,45 @@ func validateFunctions(functions []FunctionManifest) error { return fmt.Errorf("duplicate function name %q in manifest", function.Name) } seen[function.Name] = struct{}{} - if function.Public == nil { - return fmt.Errorf("function %q: public is required", function.Name) + // A function entry is valid if it sets public OR declares at least one scheduler + if function.Public == nil && len(function.Schedulers) == 0 { + return fmt.Errorf("function %q: must set 'public' or declare at least one scheduler", function.Name) + } + } + return nil +} + +func validateAllSchedulers(functions []FunctionManifest) error { + for i := range functions { + if err := validateSchedulers(&functions[i]); err != nil { + return err + } + } + return nil +} + +func validateSchedulers(function *FunctionManifest) error { + if len(function.Schedulers) == 0 { + return nil + } + + seen := make(map[string]struct{}, len(function.Schedulers)) + for j := range function.Schedulers { + scheduler := &function.Schedulers[j] + scheduler.Name = strings.TrimSpace(scheduler.Name) + if scheduler.Name == "" { + return fmt.Errorf("function %q: scheduler name is required", function.Name) + } + if _, exists := seen[scheduler.Name]; exists { + return fmt.Errorf("function %q: duplicate scheduler name %q", function.Name, scheduler.Name) + } + seen[scheduler.Name] = struct{}{} + + scheduler.Cron = strings.TrimSpace(scheduler.Cron) + if scheduler.Cron == "" { + return fmt.Errorf("function %q scheduler %q: cron is required", function.Name, scheduler.Name) } + // Do not parse/validate cron client-side; the server owns cron/region rules } return nil } diff --git a/internal/projectconfig/manifest_test.go b/internal/projectconfig/manifest_test.go index 3d11845..f9e9668 100644 --- a/internal/projectconfig/manifest_test.go +++ b/internal/projectconfig/manifest_test.go @@ -126,12 +126,82 @@ func TestValidate(t *testing.T) { errContains: "definition is required", }, { - name: "missing function public flag", + name: "missing function public flag and no schedulers", manifest: Manifest{ Version: 1, Functions: []FunctionManifest{{Name: "hello"}}, }, - errContains: `function "hello": public is required`, + errContains: `function "hello": must set 'public' or declare at least one scheduler`, + }, + { + name: "function with schedulers only (no public)", + manifest: Manifest{ + Version: 1, + Functions: []FunctionManifest{{ + Name: "hello", + Schedulers: []SchedulerManifest{{ + Name: "daily", + Cron: "0 0 * * *", + }}, + }}, + }, + }, + { + name: "function with both public and schedulers", + manifest: Manifest{ + Version: 1, + Functions: []FunctionManifest{{ + Name: "hello", + Public: &pub, + Schedulers: []SchedulerManifest{{ + Name: "hourly", + Cron: "0 * * * *", + }}, + }}, + }, + }, + { + name: "scheduler missing name", + manifest: Manifest{ + Version: 1, + Functions: []FunctionManifest{{ + Name: "hello", + Public: &pub, + Schedulers: []SchedulerManifest{{ + Cron: "0 0 * * *", + }}, + }}, + }, + errContains: `function "hello": scheduler name is required`, + }, + { + name: "scheduler missing cron", + manifest: Manifest{ + Version: 1, + Functions: []FunctionManifest{{ + Name: "hello", + Public: &pub, + Schedulers: []SchedulerManifest{{ + Name: "daily", + }}, + }}, + }, + errContains: `function "hello" scheduler "daily": cron is required`, + }, + { + name: "duplicate scheduler name", + manifest: Manifest{ + Version: 1, + Functions: []FunctionManifest{{ + Name: "hello", + Public: &pub, + Schedulers: []SchedulerManifest{ + {Name: "daily", Cron: "0 0 * * *"}, + {Name: "daily", Cron: "0 12 * * *"}, + }, + }}, + }, + errContains: `function "hello": duplicate scheduler name "daily"`, }, { name: "duplicate function name", @@ -144,6 +214,23 @@ func TestValidate(t *testing.T) { }, errContains: `duplicate function name "hello"`, }, + { + name: "scheduler with full fields", + manifest: Manifest{ + Version: 1, + Functions: []FunctionManifest{{ + Name: "hello", + Public: &pub, + Schedulers: []SchedulerManifest{{ + Name: "daily", + Cron: "0 0 * * *", + Enabled: &pub, + Payload: map[string]any{"key": "value"}, + Regions: []string{"us-east-1"}, + }}, + }}, + }, + }, { name: "normalization: operation lowercase, file size limit kept", manifest: Manifest{ From 6ffff0b9c4ac82d581b1c1cf7fee555c28f89778 Mon Sep 17 00:00:00 2001 From: Ted Kim Date: Thu, 25 Jun 2026 12:38:48 -0400 Subject: [PATCH 2/8] feat(localmode): --image flag to run a locally-built server image Let the CLI knowingly run an unpublished, locally-built local-mode image: - start/restart accept --image (highest precedence: flag > VOLCANO_IMAGE env > project .env.local > bundled default kong/volcano:local-nightly) - Service.WithImage option + resolveImage() single source of precedence - pre-flight: when the resolved image is custom (differs from the default), it must already exist locally; the CLI never pulls it. Missing -> hard fail with an actionable build message instead of a confusing compose registry error - the published default is left to compose's normal pull-if-missing behavior Tests: resolveImage precedence (flag/env/.env.local/default + custom detection) and Start hard-fail when a custom image is absent. --- internal/cmd/localmode/localmode.go | 30 +++++++--- internal/localmode/compose.go | 39 +++++++++--- internal/localmode/service.go | 22 +++++++ internal/localmode/start_test.go | 93 +++++++++++++++++++++++++++++ 4 files changed, 169 insertions(+), 15 deletions(-) diff --git a/internal/cmd/localmode/localmode.go b/internal/cmd/localmode/localmode.go index 7f23459..5922b52 100644 --- a/internal/cmd/localmode/localmode.go +++ b/internal/cmd/localmode/localmode.go @@ -10,18 +10,26 @@ import ( // NewStart returns the start command. func NewStart(deps cliruntime.Deps) *cobra.Command { - return &cobra.Command{ + var image string + cmd := &cobra.Command{ Use: "start", Short: "Start the local Volcano development environment", Long: `Start PostgreSQL, Redis, and the Volcano local-mode server with Docker Compose. -To pin or select a specific server image, set VOLCANO_IMAGE: - VOLCANO_IMAGE=kong/volcano:local-nightly volcano start`, +To run a specific or locally-built server image, use --image (highest precedence) +or set VOLCANO_IMAGE: + volcano start --image kong/volcano:local-dev + VOLCANO_IMAGE=kong/volcano:local-nightly volcano start + +An explicitly selected image must already exist locally: the CLI never pulls an +unpublished local-mode image and fails fast if it is missing.`, Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, _ []string) error { - return localmodecore.NewService(deps).Start(cmd.Context(), cmd.OutOrStdout()) + return localmodecore.NewService(deps, localmodecore.WithImage(image)).Start(cmd.Context(), cmd.OutOrStdout()) }, } + cmd.Flags().StringVar(&image, "image", "", "Local-mode server image to run (overrides VOLCANO_IMAGE and the bundled default; must already exist locally)") + return cmd } // NewStatus returns the status command. @@ -58,13 +66,19 @@ Use --clean to also remove all data volumes and local dev state.`, // NewRestart returns the restart command. func NewRestart(deps cliruntime.Deps) *cobra.Command { - return &cobra.Command{ + var image string + cmd := &cobra.Command{ Use: "restart", Short: "Restart the local Volcano development environment", - Long: "Stop and start the local Volcano development environment while preserving data.", - Args: cobra.NoArgs, + Long: `Stop and start the local Volcano development environment while preserving data. + +Use --image (or VOLCANO_IMAGE) to select the server image; an explicitly +selected image must already exist locally and is never pulled.`, + Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, _ []string) error { - return localmodecore.NewService(deps).Restart(cmd.Context(), cmd.OutOrStdout()) + return localmodecore.NewService(deps, localmodecore.WithImage(image)).Restart(cmd.Context(), cmd.OutOrStdout()) }, } + cmd.Flags().StringVar(&image, "image", "", "Local-mode server image to run (overrides VOLCANO_IMAGE and the bundled default; must already exist locally)") + return cmd } diff --git a/internal/localmode/compose.go b/internal/localmode/compose.go index 28a7263..7c5499e 100644 --- a/internal/localmode/compose.go +++ b/internal/localmode/compose.go @@ -52,19 +52,44 @@ func (s Service) composeEnvironment() ([]string, string, error) { } env = append(env, overrides...) - image := defaultVolcanoImage - if fileImage, ok := envValue(overrides, "VOLCANO_IMAGE"); ok && strings.TrimSpace(fileImage) != "" { - image = strings.TrimSpace(fileImage) - } - if processImage := strings.TrimSpace(s.getenv("VOLCANO_IMAGE")); processImage != "" { - image = processImage - } + image, _ := s.resolveImage() env = withoutEnvKey(env, "VOLCANO_IMAGE") env = append(env, "VOLCANO_IMAGE="+image) return env, image, nil } +// resolveImage returns the local-mode server image to run and whether it is a +// custom image (i.e. differs from the bundled default). Precedence (highest +// first): explicit image (WithImage/--image) > VOLCANO_IMAGE process env > +// project .env.local > defaultVolcanoImage. A custom image is treated as +// local-only: it is never pulled and must already exist locally. The bundled +// default is left to Docker Compose's normal pull-if-missing behavior even when +// it is selected explicitly. +func (s Service) resolveImage() (string, bool) { + image := defaultVolcanoImage + switch { + case s.image != "": + image = s.image + case strings.TrimSpace(s.getenv("VOLCANO_IMAGE")) != "": + image = strings.TrimSpace(s.getenv("VOLCANO_IMAGE")) + default: + if overrides, err := localEnvOverrides(); err == nil { + if fileImage, ok := envValue(overrides, "VOLCANO_IMAGE"); ok && strings.TrimSpace(fileImage) != "" { + image = strings.TrimSpace(fileImage) + } + } + } + return image, image != defaultVolcanoImage +} + +// imageExistsLocally reports whether a Docker image reference is present in the +// local image store. It never contacts a registry. +func (s Service) imageExistsLocally(ctx context.Context, ref string) bool { + _, err := s.runDocker(ctx, "image", "inspect", ref) + return err == nil +} + func (s Service) startDockerServices(ctx context.Context, env []string) error { composePath, cleanup, err := s.writeComposeFile() if err != nil { diff --git a/internal/localmode/service.go b/internal/localmode/service.go index f9a71d1..416394d 100644 --- a/internal/localmode/service.go +++ b/internal/localmode/service.go @@ -40,6 +40,7 @@ type Service struct { environ func() []string getenv func(string) string tempDir string + image string } // Option configures a Service. @@ -106,6 +107,16 @@ func WithTempDir(tempDir string) Option { } } +// WithImage sets an explicit local-mode server image, taking precedence over the +// VOLCANO_IMAGE environment variable, any project .env.local value, and the +// bundled default. An empty value is ignored. An explicitly selected image is +// never pulled: it must already exist locally (see Start's pre-flight check). +func WithImage(image string) Option { + return func(s *Service) { + s.image = strings.TrimSpace(image) + } +} + // NewService returns a local-mode environment service. func NewService(deps cliruntime.Deps, opts ...Option) Service { healthClient := deps.HTTPClient @@ -163,6 +174,17 @@ func (s Service) Start(ctx context.Context, w io.Writer) error { } fmt.Fprintf(w, "Using Docker image: %s\n", image) + // When the image is an explicit override (--image / VOLCANO_IMAGE / .env.local) + // it must already exist locally. The CLI never pulls unpublished local-mode + // images, so fail fast with an actionable message instead of letting + // `docker compose up` emit a confusing registry-pull error. + if _, overridden := s.resolveImage(); overridden { + if !s.imageExistsLocally(ctx, image) { + return fmt.Errorf("image %q not found locally; the CLI does not pull unpublished local-mode images. Build it (e.g. in volcano-hosting: make docker-build DOCKER_TAG=) and ensure the tag matches, or run `docker pull %s` first if it is published", image, image) + } + output.Success(w, "Using local image %q (not pulled)", image) + } + if err := s.startDockerServices(ctx, composeEnv); err != nil { return fmt.Errorf("failed to start Docker services: %w", err) } diff --git a/internal/localmode/start_test.go b/internal/localmode/start_test.go index 27512bf..536fcfd 100644 --- a/internal/localmode/start_test.go +++ b/internal/localmode/start_test.go @@ -316,3 +316,96 @@ func lastEnvValue(env []string, key string) (string, bool) { } return "", false } + +func TestResolveImagePrecedence(t *testing.T) { + setLocalDevTestHome(t) + withTempWorkingDir(t) + + emptyEnv := func(string) string { return "" } + newSvc := func(image string, getenv func(string) string) Service { + return NewService(cliruntime.Deps{}, + WithImage(image), + WithEnvironment(func() []string { return []string{"PATH=/bin"} }, getenv), + ) + } + envWith := func(value string) func(string) string { + return func(k string) string { + if k == "VOLCANO_IMAGE" { + return value + } + return "" + } + } + + t.Run("default when nothing set", func(t *testing.T) { + _ = os.Remove(".env.local") + img, overridden := newSvc("", emptyEnv).resolveImage() + assert.Equal(t, defaultVolcanoImage, img) + assert.False(t, overridden) + }) + + t.Run("env overrides default", func(t *testing.T) { + _ = os.Remove(".env.local") + img, overridden := newSvc("", envWith("kong/volcano:from-env")).resolveImage() + assert.Equal(t, "kong/volcano:from-env", img) + assert.True(t, overridden) + }) + + t.Run(".env.local used when env unset", func(t *testing.T) { + require.NoError(t, os.WriteFile(".env.local", []byte("VOLCANO_IMAGE=kong/volcano:from-file\n"), 0o600)) + img, overridden := newSvc("", emptyEnv).resolveImage() + assert.Equal(t, "kong/volcano:from-file", img) + assert.True(t, overridden) + }) + + t.Run("flag beats env and .env.local", func(t *testing.T) { + require.NoError(t, os.WriteFile(".env.local", []byte("VOLCANO_IMAGE=kong/volcano:from-file\n"), 0o600)) + img, overridden := newSvc("kong/volcano:from-flag", envWith("kong/volcano:from-env")).resolveImage() + assert.Equal(t, "kong/volcano:from-flag", img) + assert.True(t, overridden) + }) + + t.Run("explicit default value is not treated as custom", func(t *testing.T) { + _ = os.Remove(".env.local") + img, overridden := newSvc(defaultVolcanoImage, emptyEnv).resolveImage() + assert.Equal(t, defaultVolcanoImage, img) + assert.False(t, overridden) + }) +} + +func TestStartFailsWhenCustomImageMissing(t *testing.T) { + setLocalDevTestHome(t) + withTempWorkingDir(t) + + runner := &fakeCommandRunner{ + run: func(_ context.Context, command Command) ([]byte, error) { + switch { + case commandIs(command, "docker", "inspect", "--format={{.State.Running}}", serverContainerName): + return []byte("false\n"), nil + case commandIs(command, "docker", "version"): + return []byte("Docker version 1\n"), nil + case commandIs(command, "docker", "image", "inspect", "kong/volcano:local-dev"): + return nil, errors.New("Error: No such image: kong/volcano:local-dev") + case command.Name == "docker" && slices.Contains(command.Args, "up"): + t.Fatalf("compose up must not run when the custom image is missing") + return nil, nil + default: + return nil, nil + } + }, + } + + var out bytes.Buffer + service := NewService( + cliruntime.Deps{}, + WithDockerRunner(runner), + WithImage("kong/volcano:local-dev"), + WithEnvironment(func() []string { return []string{"PATH=/bin"} }, func(string) string { return "" }), + WithTempDir(t.TempDir()), + ) + + err := service.Start(context.Background(), &out) + require.Error(t, err) + assert.Contains(t, err.Error(), "not found locally") + assert.False(t, runner.calledWithArg("docker", "up")) +} From 213f5c8869eac95be3bd74629568d911c758a711 Mon Sep 17 00:00:00 2001 From: Ted Kim Date: Thu, 25 Jun 2026 14:10:23 -0400 Subject: [PATCH 3/8] docs+ux(localmode): document declarative schedulers; check image before announcing (review) - README: document the schedulers block in volcano-config.yaml, the non-destructive reconcile (never deletes; remove via imperative delete), and that omitted regions are server-managed. - start: run the custom-image local-presence check before printing 'Using Docker image' so we never announce an image we then reject. --- README.md | 30 ++++++++++++++++++++++++++++++ internal/localmode/service.go | 19 ++++++++++--------- 2 files changed, 40 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 6d63b7e..9a02b62 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,36 @@ files, migrations directory, and README). Use a template to add language-specific files: `javascript` (aliases: `js`, `node`, `nodejs`), `nextjs`, `python`, or `ruby`. +## Project configuration (`volcano-config.yaml`) + +`volcano config deploy` reconciles declarative project configuration +(`volcano/volcano-config.yaml` or `./volcano-config.yaml`) against the active +target — the same manifest applies to local mode and cloud. + +Functions may declare scheduled invocations. `name` and `cron` are required; +`enabled` (default `true`), `payload`, and `regions` are optional. A function +entry is valid if it sets `public` **or** declares at least one scheduler. + +```yaml +version: 1 +functions: + - name: hello + public: false + schedulers: + - name: refresh-cache # required, unique per function (the reconcile key) + cron: "*/5 * * * *" + enabled: true + payload: { job: refresh } + regions: [us-east-1] # omit to let the server pick one deployed region +``` + +Scheduler reconciliation is **non-destructive**: `config deploy` creates and +updates the schedulers a function declares (matched by `name`, preserving the +scheduler ID) but never deletes one. Removing a scheduler from the manifest is +a no-op — delete it explicitly with `volcano functions schedulers delete`. +Likewise, `cron`/`payload`/`enabled` are reconciled, but `regions` are only +enforced when declared (an omitted `regions` is left server-managed). + ## Contributing See `CONTRIBUTING.md` for local workflows, generated-code guidance, release diff --git a/internal/localmode/service.go b/internal/localmode/service.go index 416394d..eb24d9d 100644 --- a/internal/localmode/service.go +++ b/internal/localmode/service.go @@ -172,16 +172,17 @@ func (s Service) Start(ctx context.Context, w io.Writer) error { if err != nil { return err } - fmt.Fprintf(w, "Using Docker image: %s\n", image) - // When the image is an explicit override (--image / VOLCANO_IMAGE / .env.local) - // it must already exist locally. The CLI never pulls unpublished local-mode - // images, so fail fast with an actionable message instead of letting - // `docker compose up` emit a confusing registry-pull error. - if _, overridden := s.resolveImage(); overridden { - if !s.imageExistsLocally(ctx, image) { - return fmt.Errorf("image %q not found locally; the CLI does not pull unpublished local-mode images. Build it (e.g. in volcano-hosting: make docker-build DOCKER_TAG=) and ensure the tag matches, or run `docker pull %s` first if it is published", image, image) - } + // it must already exist locally before we announce or start it. The CLI never + // pulls unpublished local-mode images, so fail fast with an actionable message + // instead of letting `docker compose up` emit a confusing registry-pull error. + _, customImage := s.resolveImage() + if customImage && !s.imageExistsLocally(ctx, image) { + return fmt.Errorf("image %q not found locally; the CLI does not pull unpublished local-mode images. Build it (e.g. in volcano-hosting: make docker-build DOCKER_TAG=) and ensure the tag matches, or run `docker pull %s` first if it is published", image, image) + } + + fmt.Fprintf(w, "Using Docker image: %s\n", image) + if customImage { output.Success(w, "Using local image %q (not pulled)", image) } From 7c6b9fe86e5bc29af9c4ecf480cacabe5608829a Mon Sep 17 00:00:00 2001 From: Ted Kim Date: Thu, 25 Jun 2026 14:46:21 -0400 Subject: [PATCH 4/8] fix(config): treat nil scheduler Enabled as enabled in diff (Copilot review) schedulerNeedsUpdate defaulted a nil existing.Enabled to false while schedulerState renders nil as enabled; an API that omits enabled would then force a spurious update every deploy. Default existingEnabled to true to keep reconcile idempotent; add a regression subtest. --- internal/projectconfig/deploy.go | 5 ++++- internal/projectconfig/deploy_test.go | 8 ++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/internal/projectconfig/deploy.go b/internal/projectconfig/deploy.go index 16ac38c..383cde7 100644 --- a/internal/projectconfig/deploy.go +++ b/internal/projectconfig/deploy.go @@ -422,7 +422,10 @@ func schedulerNeedsUpdate(existing apiclient.FunctionScheduler, desired Schedule if desired.Enabled != nil { desiredEnabled = *desired.Enabled } - existingEnabled := false + // A nil Enabled from the API means enabled (matches schedulerState rendering), + // so default to true; otherwise an omitted field would force a spurious update + // on every deploy. + existingEnabled := true if existing.Enabled != nil { existingEnabled = *existing.Enabled } diff --git a/internal/projectconfig/deploy_test.go b/internal/projectconfig/deploy_test.go index 6e0db36..06b73ee 100644 --- a/internal/projectconfig/deploy_test.go +++ b/internal/projectconfig/deploy_test.go @@ -984,4 +984,12 @@ func TestSchedulerNeedsUpdateIdempotency(t *testing.T) { t.Fatalf("expected update when explicit regions differ") } }) + + t.Run("nil existing enabled treated as enabled", func(t *testing.T) { + existing := apiclient.FunctionScheduler{CronExpression: &cron} // Enabled nil + desired := SchedulerManifest{Name: "daily", Cron: cron} // enabled omitted -> true + if schedulerNeedsUpdate(existing, desired) { + t.Fatalf("expected no update: nil existing Enabled should be treated as enabled") + } + }) } From 045cd7d837ad3991423db3e453016667f75d8564 Mon Sep 17 00:00:00 2001 From: Ted Kim Date: Thu, 25 Jun 2026 15:05:11 -0400 Subject: [PATCH 5/8] style(config): satisfy golangci-lint in scheduler reconcile + tests CI 'check' (golangci-lint) flagged issues my local go vet/gofmt missed: - deploy.go: bytes.Equal instead of string(a)==string(b) (gocritic) - deploy_test.go fake: maps.Clone for payload copies (modernize), invert+ continue to reduce nesting (gocritic), errors.New over fmt.Errorf (perfsprint), and remove the dead duplicateNames flag + empty block (revive). Verified with 'go tool golangci-lint run ./...' (0 issues). --- internal/projectconfig/deploy.go | 3 +- internal/projectconfig/deploy_test.go | 72 +++++++++++---------------- 2 files changed, 32 insertions(+), 43 deletions(-) diff --git a/internal/projectconfig/deploy.go b/internal/projectconfig/deploy.go index 383cde7..51ad726 100644 --- a/internal/projectconfig/deploy.go +++ b/internal/projectconfig/deploy.go @@ -1,6 +1,7 @@ package projectconfig import ( + "bytes" "context" "encoding/json" "errors" @@ -477,5 +478,5 @@ func payloadEqual(a, b map[string]any) bool { if errA != nil || errB != nil { return reflect.DeepEqual(a, b) } - return string(aJSON) == string(bJSON) + return bytes.Equal(aJSON, bJSON) } diff --git a/internal/projectconfig/deploy_test.go b/internal/projectconfig/deploy_test.go index 06b73ee..0772ebb 100644 --- a/internal/projectconfig/deploy_test.go +++ b/internal/projectconfig/deploy_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "maps" "net/http" "testing" @@ -597,14 +598,13 @@ func TestDeployLeavesBucketUnchangedWhenMimeTypesDifferInOrder(t *testing.T) { } type fakeSchedulers struct { - functions map[string]*apiclient.Function - schedulers map[string][]apiclient.FunctionScheduler - createdCalls []schedulerCreateCall - updatedCalls []schedulerUpdateCall - listErr error - createErr error - updateErr error - duplicateNames map[string]bool // function names with duplicate scheduler names + functions map[string]*apiclient.Function + schedulers map[string][]apiclient.FunctionScheduler + createdCalls []schedulerCreateCall + updatedCalls []schedulerUpdateCall + listErr error + createErr error + updateErr error } type schedulerCreateCall struct { @@ -620,9 +620,8 @@ type schedulerUpdateCall struct { func newFakeSchedulers() *fakeSchedulers { return &fakeSchedulers{ - functions: make(map[string]*apiclient.Function), - schedulers: make(map[string][]apiclient.FunctionScheduler), - duplicateNames: make(map[string]bool), + functions: make(map[string]*apiclient.Function), + schedulers: make(map[string][]apiclient.FunctionScheduler), } } @@ -635,10 +634,6 @@ func (f *fakeSchedulers) ListSchedulers(_ context.Context, identifier string) (* return nil, nil, api.ErrNotFound } schedulers := f.schedulers[identifier] - // Simulate duplicate names on server if marked - if f.duplicateNames[identifier] { - // Already has duplicates in the slice - } resp := &apiclient.FunctionSchedulerListResponse{ Data: append([]apiclient.FunctionScheduler(nil), schedulers...), } @@ -659,10 +654,7 @@ func (f *fakeSchedulers) CreateSchedulerByID(_ context.Context, functionID uuid. Enabled: input.Enabled, } if input.Payload != nil { - payload := make(map[string]interface{}) - for k, v := range input.Payload { - payload[k] = v - } + payload := maps.Clone(input.Payload) scheduler.Payload = &payload } if len(input.Regions) > 0 { @@ -688,32 +680,30 @@ func (f *fakeSchedulers) UpdateSchedulerByID(_ context.Context, functionID, sche for name, fn := range f.functions { if fn.Id == functionID { for i := range f.schedulers[name] { - if f.schedulers[name][i].Id != nil && *f.schedulers[name][i].Id == schedulerID { - f.schedulers[name][i].Name = &input.Name - f.schedulers[name][i].CronExpression = &input.CronExpression - f.schedulers[name][i].Enabled = input.Enabled - if input.Payload != nil { - payload := make(map[string]interface{}) - for k, v := range input.Payload { - payload[k] = v - } - f.schedulers[name][i].Payload = &payload - } else { - f.schedulers[name][i].Payload = nil - } - if len(input.Regions) > 0 { - regions := append([]string(nil), input.Regions...) - f.schedulers[name][i].Regions = ®ions - } else { - f.schedulers[name][i].Regions = nil - } - return &f.schedulers[name][i], nil + if f.schedulers[name][i].Id == nil || *f.schedulers[name][i].Id != schedulerID { + continue } + f.schedulers[name][i].Name = &input.Name + f.schedulers[name][i].CronExpression = &input.CronExpression + f.schedulers[name][i].Enabled = input.Enabled + if input.Payload != nil { + payload := maps.Clone(input.Payload) + f.schedulers[name][i].Payload = &payload + } else { + f.schedulers[name][i].Payload = nil + } + if len(input.Regions) > 0 { + regions := append([]string(nil), input.Regions...) + f.schedulers[name][i].Regions = ®ions + } else { + f.schedulers[name][i].Regions = nil + } + return &f.schedulers[name][i], nil } break } } - return nil, fmt.Errorf("scheduler not found") + return nil, errors.New("scheduler not found") } func TestReconcileSchedulersCreatesNew(t *testing.T) { @@ -904,8 +894,6 @@ func TestReconcileSchedulersDuplicateNameError(t *testing.T) { Enabled: &enabled, }, } - schedulers.duplicateNames["hello"] = true - svc := NewServiceWithReconcilers(newFakeStorage(), &fakeFunctions{}, schedulers) manifest := &Manifest{ Version: 1, From c9de2cb999cd1879c83416ed76b658fa27e0cffa Mon Sep 17 00:00:00 2001 From: Ted Kim Date: Thu, 25 Jun 2026 17:23:20 -0400 Subject: [PATCH 6/8] fix(config): treat omitted scheduler payload as server-managed (review) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit swkeever: an existing non-empty payload + a manifest that omits payload would enter the update path forever — api.UpdateFunctionScheduler doesn't serialize a nil payload, so the server keeps its value and the deploy never converges. Only diff/reconcile payload when the manifest declares one (mirrors omitted-regions handling). Adds a regression subtest. --- internal/projectconfig/deploy.go | 27 +++++++++++++++------------ internal/projectconfig/deploy_test.go | 12 ++++++++++++ 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/internal/projectconfig/deploy.go b/internal/projectconfig/deploy.go index 51ad726..d9cb307 100644 --- a/internal/projectconfig/deploy.go +++ b/internal/projectconfig/deploy.go @@ -434,18 +434,21 @@ func schedulerNeedsUpdate(existing apiclient.FunctionScheduler, desired Schedule return true } - // Compare payload using a JSON-canonical comparison. The manifest payload - // comes from YAML (numbers decode as int), while the server returns JSON - // (numbers decode as float64); reflect.DeepEqual would flag those as - // different every deploy. Marshalling both to JSON normalizes number - // formatting and key order, and treats an omitted payload as the server's - // empty-object default. - var existingPayload map[string]any - if existing.Payload != nil { - existingPayload = *existing.Payload - } - if !payloadEqual(existingPayload, desired.Payload) { - return true + // Only reconcile payload when the manifest declares one. An omitted payload is + // treated as server-managed: the API does not serialize a nil payload on update + // (so we could never clear it), and the server defaults it to {}. Diffing an + // omitted payload against an existing non-empty one would report an update that + // can never converge, so we leave it alone — mirroring how omitted regions are + // handled below. When a payload IS declared, compare on canonical JSON so YAML + // ints vs server float64s (and key order) don't produce false differences. + if len(desired.Payload) > 0 { + var existingPayload map[string]any + if existing.Payload != nil { + existingPayload = *existing.Payload + } + if !payloadEqual(existingPayload, desired.Payload) { + return true + } } // Compare regions only when the manifest explicitly declares them. When diff --git a/internal/projectconfig/deploy_test.go b/internal/projectconfig/deploy_test.go index 0772ebb..ca00cc6 100644 --- a/internal/projectconfig/deploy_test.go +++ b/internal/projectconfig/deploy_test.go @@ -980,4 +980,16 @@ func TestSchedulerNeedsUpdateIdempotency(t *testing.T) { t.Fatalf("expected no update: nil existing Enabled should be treated as enabled") } }) + + t.Run("omitted payload is server-managed (no churn)", func(t *testing.T) { + existing := base + serverPayload := map[string]any{"job": "refresh"} + existing.Payload = &serverPayload + desired := SchedulerManifest{Name: "daily", Cron: cron} // payload omitted + // Manifest omits payload; the API can't clear it and the server keeps its + // value, so reporting an update would never converge. Expect no update. + if schedulerNeedsUpdate(existing, desired) { + t.Fatalf("expected no update when manifest omits payload (server-managed)") + } + }) } From e54ab31b9650ea5348543135369e2639fdc2aa6d Mon Sep 17 00:00:00 2001 From: Mark Kim Date: Fri, 26 Jun 2026 12:09:00 -0400 Subject: [PATCH 7/8] fix(config,localmode): address PR #18 review feedback Two correctness fixes from the PR review: config: when a manifest omits 'enabled', buildSchedulerInput forwarded a nil *bool, which UpdateFunctionScheduler drops on the wire (enabled,omitempty) and the server leaves unchanged. schedulerNeedsUpdate, however, defaults a desired enabled to true, so a server-disabled scheduler was reported as needing an update on every deploy without ever converging. Default the input to true (matching the comparison and the documented default) so the update actually re-enables the scheduler. localmode: move the custom-image existence preflight into ensureCustomImageAvailable and call it before teardown in Restart, so 'restart --image ' fails fast while the running stack stays intact instead of being stopped first. Adds regression tests for re-enable convergence and restart preflight. --- internal/localmode/compose.go | 13 +++++++++ internal/localmode/restart_test.go | 41 +++++++++++++++++++++++++++ internal/localmode/service.go | 20 ++++++++----- internal/projectconfig/deploy.go | 11 ++++++- internal/projectconfig/deploy_test.go | 41 +++++++++++++++++++++++++++ 5 files changed, 118 insertions(+), 8 deletions(-) diff --git a/internal/localmode/compose.go b/internal/localmode/compose.go index 7c5499e..5487f4d 100644 --- a/internal/localmode/compose.go +++ b/internal/localmode/compose.go @@ -90,6 +90,19 @@ func (s Service) imageExistsLocally(ctx context.Context, ref string) bool { return err == nil } +// ensureCustomImageAvailable fails fast when an explicitly selected (custom) +// local-mode image is not present locally. The CLI never pulls unpublished +// local-mode images, so this surfaces an actionable build message instead of a +// confusing registry-pull error. The bundled default is left to Compose's +// normal pull-if-missing behavior even when selected explicitly. +func (s Service) ensureCustomImageAvailable(ctx context.Context) error { + image, customImage := s.resolveImage() + if customImage && !s.imageExistsLocally(ctx, image) { + return fmt.Errorf("image %q not found locally; the CLI does not pull unpublished local-mode images. Build it (e.g. in volcano-hosting: make docker-build DOCKER_TAG=) and ensure the tag matches, or run `docker pull %s` first if it is published", image, image) + } + return nil +} + func (s Service) startDockerServices(ctx context.Context, env []string) error { composePath, cleanup, err := s.writeComposeFile() if err != nil { diff --git a/internal/localmode/restart_test.go b/internal/localmode/restart_test.go index 2310809..3b4c3fe 100644 --- a/internal/localmode/restart_test.go +++ b/internal/localmode/restart_test.go @@ -3,6 +3,7 @@ package localmode import ( "bytes" "context" + "errors" "net/http" "net/http/httptest" "slices" @@ -72,3 +73,43 @@ func TestRestartStopsBeforeStarting(t *testing.T) { require.NoError(t, service.Restart(context.Background(), &out)) assert.Equal(t, []string{"down", "up"}, order) } + +func TestRestartFailsBeforeTeardownWhenCustomImageMissing(t *testing.T) { + setLocalDevTestHome(t) + withTempWorkingDir(t) + + runner := &fakeCommandRunner{ + run: func(_ context.Context, command Command) ([]byte, error) { + switch { + case commandIs(command, "docker", "inspect", "--format={{.State.Running}}", serverContainerName): + return []byte("true\n"), nil + case commandIs(command, "docker", "version"): + return nil, nil + case commandIs(command, "docker", "image", "inspect", "kong/volcano:local-dev"): + return nil, errors.New("Error: No such image: kong/volcano:local-dev") + case commandIsComposeDown(command, false): + t.Fatalf("compose down must not run when the custom image is missing") + return nil, nil + case command.Name == "docker" && slices.Contains(command.Args, "up"): + t.Fatalf("compose up must not run when the custom image is missing") + return nil, nil + default: + return nil, nil + } + }, + } + + var out bytes.Buffer + service := NewService( + cliruntime.Deps{}, + WithDockerRunner(runner), + WithImage("kong/volcano:local-dev"), + WithEnvironment(func() []string { return []string{"PATH=/bin"} }, func(string) string { return "" }), + WithTempDir(t.TempDir()), + ) + + err := service.Restart(context.Background(), &out) + require.Error(t, err) + assert.Contains(t, err.Error(), "not found locally") + assert.False(t, runner.calledWithArg("docker", "down"), "environment must not be torn down on a bad --image") +} diff --git a/internal/localmode/service.go b/internal/localmode/service.go index eb24d9d..efe2f50 100644 --- a/internal/localmode/service.go +++ b/internal/localmode/service.go @@ -168,18 +168,19 @@ func (s Service) Start(ctx context.Context, w io.Writer) error { } output.Success(w, "Docker is available") + // When the image is an explicit override (--image / VOLCANO_IMAGE / .env.local) + // it must already exist locally before we announce or start it. Fail fast here + // (Restart calls this before tearing the environment down) instead of letting + // `docker compose up` emit a confusing registry-pull error. + if err := s.ensureCustomImageAvailable(ctx); err != nil { + return err + } + composeEnv, image, err := s.composeEnvironment() if err != nil { return err } - // When the image is an explicit override (--image / VOLCANO_IMAGE / .env.local) - // it must already exist locally before we announce or start it. The CLI never - // pulls unpublished local-mode images, so fail fast with an actionable message - // instead of letting `docker compose up` emit a confusing registry-pull error. _, customImage := s.resolveImage() - if customImage && !s.imageExistsLocally(ctx, image) { - return fmt.Errorf("image %q not found locally; the CLI does not pull unpublished local-mode images. Build it (e.g. in volcano-hosting: make docker-build DOCKER_TAG=) and ensure the tag matches, or run `docker pull %s` first if it is published", image, image) - } fmt.Fprintf(w, "Using Docker image: %s\n", image) if customImage { @@ -273,6 +274,11 @@ func (s Service) Stop(ctx context.Context, w io.Writer, clean bool) error { // Restart restarts the local Volcano Docker stack while preserving data. func (s Service) Restart(ctx context.Context, w io.Writer) error { + // Validate a custom image before tearing the environment down, so a bad + // --image leaves the running stack intact instead of stopped. + if err := s.ensureCustomImageAvailable(ctx); err != nil { + return err + } if err := s.Stop(ctx, w, false); err != nil { return err } diff --git a/internal/projectconfig/deploy.go b/internal/projectconfig/deploy.go index d9cb307..b6a6860 100644 --- a/internal/projectconfig/deploy.go +++ b/internal/projectconfig/deploy.go @@ -403,12 +403,21 @@ func (s Service) reconcileSchedulers(ctx context.Context, functions []FunctionMa } func buildSchedulerInput(manifest SchedulerManifest) api.FunctionSchedulerInput { + // Default enabled to true when the manifest omits it, matching both the + // documented default and schedulerNeedsUpdate's comparison. Sending an + // explicit value (rather than nil) ensures the update actually applies: a nil + // Enabled is omitted on the wire and left server-managed, so an update that + // relied on it would never converge. + enabled := true + if manifest.Enabled != nil { + enabled = *manifest.Enabled + } return api.FunctionSchedulerInput{ Name: manifest.Name, CronExpression: manifest.Cron, Payload: manifest.Payload, Regions: manifest.Regions, - Enabled: manifest.Enabled, + Enabled: &enabled, } } diff --git a/internal/projectconfig/deploy_test.go b/internal/projectconfig/deploy_test.go index ca00cc6..7eb8075 100644 --- a/internal/projectconfig/deploy_test.go +++ b/internal/projectconfig/deploy_test.go @@ -993,3 +993,44 @@ func TestSchedulerNeedsUpdateIdempotency(t *testing.T) { } }) } + +func TestReconcileSchedulersReenablesAndConverges(t *testing.T) { + functionID := uuid.New() + schedulerID := uuid.New() + schedulers := newFakeSchedulers() + schedulers.functions["hello"] = &apiclient.Function{Id: functionID, Name: "hello"} + cron := "0 0 * * *" + disabled := false + // Server scheduler is disabled; the manifest omits enabled (defaults to true). + schedulers.schedulers["hello"] = []apiclient.FunctionScheduler{{ + Id: &schedulerID, + FunctionId: &functionID, + Name: strPtr("daily"), + CronExpression: &cron, + Enabled: &disabled, + }} + + svc := NewServiceWithReconcilers(newFakeStorage(), &fakeFunctions{}, schedulers) + manifest := &Manifest{ + Version: 1, + Functions: []FunctionManifest{{ + Name: "hello", + Schedulers: []SchedulerManifest{{Name: "daily", Cron: cron}}, // enabled omitted + }}, + } + + // First deploy: re-enables the scheduler with an explicit Enabled=true. + summary, err := svc.Deploy(context.Background(), manifest) + require.NoError(t, err) + assert.Equal(t, 1, summary.SchedulersUpdated) + require.Len(t, schedulers.updatedCalls, 1) + require.NotNil(t, schedulers.updatedCalls[0].Input.Enabled) + assert.True(t, *schedulers.updatedCalls[0].Input.Enabled, "omitted enabled must be sent as true so the update applies") + + // Second deploy: scheduler is now enabled, so the run converges (no churn). + summary, err = svc.Deploy(context.Background(), manifest) + require.NoError(t, err) + assert.Equal(t, 0, summary.SchedulersUpdated) + assert.Equal(t, 1, summary.SchedulersUnchanged) + assert.Len(t, schedulers.updatedCalls, 1, "second deploy must not issue another update") +} From 07f6f5ede1dd021ed495abddf5fe2ad5f763b24b Mon Sep 17 00:00:00 2001 From: Ted Kim Date: Fri, 26 Jun 2026 13:09:18 -0400 Subject: [PATCH 8/8] fix(config): follow server semantics for omitted scheduler fields + guard paginated list MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Team decision: the manifest follows the server's own reconcile contract — declared fields are enforced; omitted fields are server-managed. - enabled: supersede the re-enable approach in e54ab31. buildSchedulerInput now forwards the manifest's enabled as-is (nil when omitted, which the update body drops so the server keeps the current state; create still defaults true), and schedulerNeedsUpdate only diffs enabled when the manifest declares it. This mirrors the server (omitted enabled = keep existing) and stops config deploy from re-enabling a deliberately-disabled scheduler — matching how payload and regions are already handled. Replaces the re-enable convergence test. - pagination (swkeever): the scheduler list is read as a single page and the client has no page param; fail fast if has_more is ever set rather than silently recreating schedulers beyond page 1. - README: document the single rule (declared=enforced, omitted=server-managed; deploy never deletes or disables; cron always required). Keeps marckong's localmode restart --image preflight from e54ab31 intact. --- README.md | 19 ++++--- internal/projectconfig/deploy.go | 48 +++++++++-------- internal/projectconfig/deploy_test.go | 76 ++++++++++++++++++++++----- 3 files changed, 101 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index 9a02b62..60a2d95 100644 --- a/README.md +++ b/README.md @@ -63,12 +63,19 @@ functions: regions: [us-east-1] # omit to let the server pick one deployed region ``` -Scheduler reconciliation is **non-destructive**: `config deploy` creates and -updates the schedulers a function declares (matched by `name`, preserving the -scheduler ID) but never deletes one. Removing a scheduler from the manifest is -a no-op — delete it explicitly with `volcano functions schedulers delete`. -Likewise, `cron`/`payload`/`enabled` are reconciled, but `regions` are only -enforced when declared (an omitted `regions` is left server-managed). +Reconciliation follows one rule that mirrors the server: **fields you declare +are enforced; fields you omit are left server-managed.** An omitted `enabled`, +`payload`, or `regions` keeps whatever the scheduler already has on the server +(on first create the server applies its defaults — `enabled: true`, an empty +payload, and one chosen region). In particular, `config deploy` will not +re-enable a scheduler you disabled out of band unless the manifest sets +`enabled: true`. `cron` is always required and enforced. + +Reconciliation is also **non-destructive**: it creates and updates the +schedulers a function declares (matched by `name`, preserving the scheduler ID) +but never deletes or disables one. A scheduler the manifest no longer declares +is left running; to remove or disable one, use the imperative commands +(`volcano functions schedulers delete` / `disable`). ## Contributing diff --git a/internal/projectconfig/deploy.go b/internal/projectconfig/deploy.go index b6a6860..3300fe6 100644 --- a/internal/projectconfig/deploy.go +++ b/internal/projectconfig/deploy.go @@ -352,6 +352,13 @@ func (s Service) reconcileSchedulers(ctx context.Context, functions []FunctionMa if fn == nil { return fmt.Errorf("function %q: not found", fnManifest.Name) } + // The function-scoped scheduler list returns the full set in one response + // (has_more is always false today and the client exposes no page parameter). + // Guard against a future server change that paginates: schedulers beyond page + // 1 would be missed and wrongly recreated, so fail loudly instead. + if listResp != nil && listResp.HasMore { + return fmt.Errorf("function %q: scheduler list is paginated (has_more=true) but the CLI cannot page it; aborting to avoid duplicate creation", fnManifest.Name) + } // Build map of existing schedulers by Name existingByName := make(map[string]apiclient.FunctionScheduler) @@ -403,21 +410,17 @@ func (s Service) reconcileSchedulers(ctx context.Context, functions []FunctionMa } func buildSchedulerInput(manifest SchedulerManifest) api.FunctionSchedulerInput { - // Default enabled to true when the manifest omits it, matching both the - // documented default and schedulerNeedsUpdate's comparison. Sending an - // explicit value (rather than nil) ensures the update actually applies: a nil - // Enabled is omitted on the wire and left server-managed, so an update that - // relied on it would never converge. - enabled := true - if manifest.Enabled != nil { - enabled = *manifest.Enabled - } + // Forward the manifest's enabled as-is. A nil (omitted) value is dropped from + // the update body so the server keeps the scheduler's current state, and on + // create the server defaults it to true. This follows the server's own + // "omitted = keep existing" contract and avoids re-enabling a deliberately + // disabled scheduler; schedulerNeedsUpdate only flags enabled when declared. return api.FunctionSchedulerInput{ Name: manifest.Name, CronExpression: manifest.Cron, Payload: manifest.Payload, Regions: manifest.Regions, - Enabled: &enabled, + Enabled: manifest.Enabled, } } @@ -427,20 +430,19 @@ func schedulerNeedsUpdate(existing apiclient.FunctionScheduler, desired Schedule return true } - // Compare enabled (default true if not set in manifest) - desiredEnabled := true + // Only reconcile enabled when the manifest declares it. This mirrors the + // server, whose update contract treats an omitted enabled as "keep existing" + // (payload/regions below are handled the same way). Forcing the default would + // re-enable a deliberately-disabled scheduler on every deploy. A nil existing + // Enabled from the API means enabled. if desired.Enabled != nil { - desiredEnabled = *desired.Enabled - } - // A nil Enabled from the API means enabled (matches schedulerState rendering), - // so default to true; otherwise an omitted field would force a spurious update - // on every deploy. - existingEnabled := true - if existing.Enabled != nil { - existingEnabled = *existing.Enabled - } - if existingEnabled != desiredEnabled { - return true + existingEnabled := true + if existing.Enabled != nil { + existingEnabled = *existing.Enabled + } + if existingEnabled != *desired.Enabled { + return true + } } // Only reconcile payload when the manifest declares one. An omitted payload is diff --git a/internal/projectconfig/deploy_test.go b/internal/projectconfig/deploy_test.go index 7eb8075..755333e 100644 --- a/internal/projectconfig/deploy_test.go +++ b/internal/projectconfig/deploy_test.go @@ -605,6 +605,7 @@ type fakeSchedulers struct { listErr error createErr error updateErr error + hasMore bool } type schedulerCreateCall struct { @@ -635,7 +636,8 @@ func (f *fakeSchedulers) ListSchedulers(_ context.Context, identifier string) (* } schedulers := f.schedulers[identifier] resp := &apiclient.FunctionSchedulerListResponse{ - Data: append([]apiclient.FunctionScheduler(nil), schedulers...), + Data: append([]apiclient.FunctionScheduler(nil), schedulers...), + HasMore: f.hasMore, } return fn, resp, nil } @@ -992,16 +994,49 @@ func TestSchedulerNeedsUpdateIdempotency(t *testing.T) { t.Fatalf("expected no update when manifest omits payload (server-managed)") } }) + + t.Run("existing disabled + omitted enabled is server-managed (no churn)", func(t *testing.T) { + existing := base + disabled := false + existing.Enabled = &disabled + desired := SchedulerManifest{Name: "daily", Cron: cron} // enabled omitted + // Following server behavior, omitted enabled is left alone; flagging an update + // would never converge and would re-enable a deliberately-disabled scheduler. + if schedulerNeedsUpdate(existing, desired) { + t.Fatalf("expected no update: omitted enabled is server-managed (must converge)") + } + }) + + t.Run("existing disabled + explicit enable triggers update", func(t *testing.T) { + existing := base + disabled := false + existing.Enabled = &disabled + want := true + desired := SchedulerManifest{Name: "daily", Cron: cron, Enabled: &want} + if !schedulerNeedsUpdate(existing, desired) { + t.Fatalf("expected update when manifest explicitly enables a disabled scheduler") + } + }) + + t.Run("existing enabled + explicit disable triggers update", func(t *testing.T) { + existing := base // enabled + want := false + desired := SchedulerManifest{Name: "daily", Cron: cron, Enabled: &want} + if !schedulerNeedsUpdate(existing, desired) { + t.Fatalf("expected update when manifest explicitly disables an enabled scheduler") + } + }) } -func TestReconcileSchedulersReenablesAndConverges(t *testing.T) { +func TestReconcileSchedulersLeavesDisabledWhenEnabledOmitted(t *testing.T) { functionID := uuid.New() schedulerID := uuid.New() schedulers := newFakeSchedulers() schedulers.functions["hello"] = &apiclient.Function{Id: functionID, Name: "hello"} cron := "0 0 * * *" disabled := false - // Server scheduler is disabled; the manifest omits enabled (defaults to true). + // Server scheduler is disabled (e.g. an operator ran `schedulers disable`); the + // manifest re-declares it but omits enabled. schedulers.schedulers["hello"] = []apiclient.FunctionScheduler{{ Id: &schedulerID, FunctionId: &functionID, @@ -1019,18 +1054,33 @@ func TestReconcileSchedulersReenablesAndConverges(t *testing.T) { }}, } - // First deploy: re-enables the scheduler with an explicit Enabled=true. + // Following server behavior, an omitted enabled is server-managed: the deploy + // must NOT re-enable the scheduler, and it converges immediately (no update). summary, err := svc.Deploy(context.Background(), manifest) require.NoError(t, err) - assert.Equal(t, 1, summary.SchedulersUpdated) - require.Len(t, schedulers.updatedCalls, 1) - require.NotNil(t, schedulers.updatedCalls[0].Input.Enabled) - assert.True(t, *schedulers.updatedCalls[0].Input.Enabled, "omitted enabled must be sent as true so the update applies") - - // Second deploy: scheduler is now enabled, so the run converges (no churn). - summary, err = svc.Deploy(context.Background(), manifest) - require.NoError(t, err) assert.Equal(t, 0, summary.SchedulersUpdated) assert.Equal(t, 1, summary.SchedulersUnchanged) - assert.Len(t, schedulers.updatedCalls, 1, "second deploy must not issue another update") + assert.Empty(t, schedulers.updatedCalls, "omitted enabled must not trigger an update that re-enables a disabled scheduler") +} + +func TestReconcileSchedulersAbortsOnPaginatedList(t *testing.T) { + functionID := uuid.New() + schedulers := newFakeSchedulers() + schedulers.functions["hello"] = &apiclient.Function{Id: functionID, Name: "hello"} + schedulers.hasMore = true // simulate a server that paginates the scheduler list + + svc := NewServiceWithReconcilers(newFakeStorage(), &fakeFunctions{}, schedulers) + manifest := &Manifest{ + Version: 1, + Functions: []FunctionManifest{{ + Name: "hello", + Schedulers: []SchedulerManifest{{Name: "daily", Cron: "0 0 * * *"}}, + }}, + } + + _, err := svc.Deploy(context.Background(), manifest) + require.Error(t, err) + assert.Contains(t, err.Error(), "paginated") + // Must not create when it cannot see the full set (avoids duplicate creation). + assert.Empty(t, schedulers.createdCalls) }