diff --git a/cmd/chunks.go b/cmd/chunks.go new file mode 100644 index 0000000..fb2a803 --- /dev/null +++ b/cmd/chunks.go @@ -0,0 +1,222 @@ +package cmd + +import ( + "context" + "fmt" + "os" + "sort" + "strings" + "text/tabwriter" + "time" + + "github.com/spf13/cobra" + flowctlpb "github.com/withobsrvr/flowctl/proto" +) + +var ( + chunksRunID string + chunksComponentID string + chunksStatus string + chunksLimit int32 +) + +var chunksCmd = &cobra.Command{ + Use: "chunks", + Short: "Inspect historical range chunk runs", + Long: `Inspect historical range chunk runs tracked by the control plane. + +Chunks are the durable control-plane records for bounded ledger work units such +as Bronze backfill/repair ranges. Data-plane components report chunk status, +row counts, verification results, and typed failure classes.`, +} + +var chunksListCmd = &cobra.Command{ + Use: "list", + Short: "List chunk runs", + RunE: func(cmd *cobra.Command, args []string) error { + client, conn, err := connectToControlPlane() + if err != nil { + return err + } + defer conn.Close() + + status, err := parseChunkStatus(chunksStatus) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + runID := chunksRunID + if runID != "" { + runID, err = resolveRunID(ctx, client, runID) + if err != nil { + return err + } + } + + resp, err := client.ListChunkRuns(ctx, &flowctlpb.ListChunkRunsRequest{ + PipelineRunId: runID, + ComponentId: chunksComponentID, + Status: status, + Limit: chunksLimit, + }) + if err != nil { + return fmt.Errorf("failed to list chunk runs: %w", err) + } + + if len(resp.Chunks) == 0 { + fmt.Println("No chunk runs found") + return nil + } + + sort.Slice(resp.Chunks, func(i, j int) bool { + if resp.Chunks[i].ChunkStart == resp.Chunks[j].ChunkStart { + return resp.Chunks[i].Attempt < resp.Chunks[j].Attempt + } + return resp.Chunks[i].ChunkStart < resp.Chunks[j].ChunkStart + }) + + w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) + fmt.Fprintln(w, "CHUNK ID\tCOMPONENT\tRANGE\tATTEMPT\tSTATUS\tFAILURE\tPHASE\tROWS") + for _, chunk := range resp.Chunks { + fmt.Fprintf(w, "%s\t%s\t%d-%d\t%d\t%s\t%s\t%s\t%d\n", + chunk.ChunkId, + chunk.ComponentId, + chunk.ChunkStart, + chunk.ChunkEnd, + chunk.Attempt, + formatChunkStatus(chunk.Status), + formatFailureClass(chunk.FailureClass), + chunk.Phase, + totalRows(chunk.RowCounts), + ) + } + w.Flush() + + fmt.Printf("\n%d chunk run(s) found\n", len(resp.Chunks)) + return nil + }, +} + +var chunksShowCmd = &cobra.Command{ + Use: "show ", + Short: "Show detailed information about a chunk run", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + client, conn, err := connectToControlPlane() + if err != nil { + return err + } + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + chunk, err := client.GetChunkRun(ctx, &flowctlpb.GetChunkRunRequest{ChunkId: args[0]}) + if err != nil { + return fmt.Errorf("failed to get chunk run: %w", err) + } + + fmt.Printf("Chunk ID: %s\n", chunk.ChunkId) + fmt.Printf("Run ID: %s\n", chunk.PipelineRunId) + fmt.Printf("Component: %s\n", chunk.ComponentId) + fmt.Printf("Range: %d-%d\n", chunk.ChunkStart, chunk.ChunkEnd) + fmt.Printf("Attempt: %d\n", chunk.Attempt) + fmt.Printf("Status: %s\n", formatChunkStatus(chunk.Status)) + fmt.Printf("Phase: %s\n", chunk.Phase) + if chunk.FailureClass != flowctlpb.FailureClass_FAILURE_CLASS_UNKNOWN { + fmt.Printf("Failure: %s\n", formatFailureClass(chunk.FailureClass)) + } + if chunk.Error != "" { + fmt.Printf("Error: %s\n", chunk.Error) + } + if chunk.RecommendedAction != "" { + fmt.Printf("Next action: %s\n", chunk.RecommendedAction) + } + if chunk.StartedAt != nil { + fmt.Printf("Started: %s\n", chunk.StartedAt.AsTime().Format("2006-01-02 15:04:05 MST")) + } + if chunk.CompletedAt != nil { + fmt.Printf("Completed: %s\n", chunk.CompletedAt.AsTime().Format("2006-01-02 15:04:05 MST")) + } + if chunk.VerifiedAt != nil { + fmt.Printf("Verified: %s\n", chunk.VerifiedAt.AsTime().Format("2006-01-02 15:04:05 MST")) + } + if len(chunk.RowCounts) > 0 { + fmt.Println("\nRow counts:") + keys := make([]string, 0, len(chunk.RowCounts)) + for k := range chunk.RowCounts { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + fmt.Printf(" %s: %d\n", k, chunk.RowCounts[k]) + } + } + if len(chunk.Verification) > 0 { + fmt.Println("\nVerification:") + keys := make([]string, 0, len(chunk.Verification)) + for k := range chunk.Verification { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + fmt.Printf(" %s: %s\n", k, chunk.Verification[k]) + } + } + return nil + }, +} + +func parseChunkStatus(value string) (flowctlpb.ChunkStatus, error) { + if value == "" { + return flowctlpb.ChunkStatus_CHUNK_STATUS_UNKNOWN, nil + } + key := "CHUNK_STATUS_" + strings.ToUpper(strings.ReplaceAll(value, "-", "_")) + status, ok := flowctlpb.ChunkStatus_value[key] + if !ok { + return flowctlpb.ChunkStatus_CHUNK_STATUS_UNKNOWN, fmt.Errorf("unknown chunk status %q", value) + } + return flowctlpb.ChunkStatus(status), nil +} + +func formatChunkStatus(status flowctlpb.ChunkStatus) string { + return strings.ToLower(strings.TrimPrefix(status.String(), "CHUNK_STATUS_")) +} + +func formatFailureClass(failure flowctlpb.FailureClass) string { + if failure == flowctlpb.FailureClass_FAILURE_CLASS_UNKNOWN { + return "-" + } + return strings.ToLower(strings.TrimPrefix(failure.String(), "FAILURE_CLASS_")) +} + +func shortChunkID(chunkID string) string { + if len(chunkID) <= 18 { + return chunkID + } + return chunkID[:18] +} + +func totalRows(rows map[string]int64) int64 { + var total int64 + for _, count := range rows { + total += count + } + return total +} + +func init() { + rootCmd.AddCommand(chunksCmd) + chunksCmd.AddCommand(chunksListCmd) + chunksCmd.AddCommand(chunksShowCmd) + + chunksCmd.PersistentFlags().StringVar(&pipelinesControlPlaneAddr, "control-plane-address", "127.0.0.1", "Control plane address") + chunksCmd.PersistentFlags().IntVar(&pipelinesControlPlanePort, "control-plane-port", 8080, "Control plane port") + chunksListCmd.Flags().StringVar(&chunksRunID, "run", "", "Pipeline run ID or unique prefix") + chunksListCmd.Flags().StringVar(&chunksComponentID, "component", "", "Component ID filter") + chunksListCmd.Flags().StringVar(&chunksStatus, "status", "", "Chunk status filter (planned, dispatched, running, completed, verified, failed, retry-pending, aborted)") + chunksListCmd.Flags().Int32Var(&chunksLimit, "limit", 100, "Maximum number of chunks to show") +} diff --git a/docs/component-flowctl-reporting.md b/docs/component-flowctl-reporting.md new file mode 100644 index 0000000..e60c24c --- /dev/null +++ b/docs/component-flowctl-reporting.md @@ -0,0 +1,81 @@ +# Component flowctl reporting + +Data-plane components can report bounded historical work-unit state to the flowctl control plane. This is the contract used by historical ledger workers such as Bronze and Silver loaders. + +## Environment contract + +```text +ENABLE_FLOWCTL=true +FLOWCTL_ENDPOINT= +FLOWCTL_COMPONENT_ID= +FLOWCTL_RUN_ID= +FLOWCTL_ATTEMPT= +FLOWCTL_HEARTBEAT_INTERVAL_MS=10000 +``` + +`flowctl run` injects `FLOWCTL_ENDPOINT`, `FLOWCTL_COMPONENT_ID`, `FLOWCTL_RUN_ID`, `FLOWCTL_ATTEMPT`, and heartbeat interval values for process-managed components. + +Historical range workers should also accept: + +```text +START_LEDGER= +END_LEDGER= +CHUNK_START= +CHUNK_END= +``` + +## Go helper + +Use `github.com/withobsrvr/flowctl/pkg/component` from a component binary: + +```go +cfg := component.ConfigFromEnv() +reporter, err := component.NewReporter(ctx, cfg) +if err != nil { + return err +} +defer reporter.Close() + +if err := reporter.Register(ctx, flowctlpb.ServiceType_SERVICE_TYPE_SOURCE, map[string]string{ + "pipeline": "obsrvr-mainnet-bronze-repair", + "network": "pubnet", +}); err != nil { + return err +} + +go reporter.StartHeartbeatLoop(ctx, func() map[string]float64 { + return map[string]float64{"ledgers_processed": float64(processed)} +}, nil) + +_ = reporter.ReportChunkProgress(ctx, chunkStart, chunkEnd, "ducklake_push", nil, nil) + +_ = reporter.ReportChunkCompleted(ctx, chunkStart, chunkEnd, true, map[string]int64{ + "ledgers_row_v2": 250000, + "transactions_row_v2": 22685377, +}, map[string]string{ + "gate": "bronze-silver-readiness-direct", + "passed": "true", +}) +``` + +On failure: + +```go +_ = reporter.ReportChunkFailed( + ctx, + chunkStart, + chunkEnd, + "verification", + flowctlpb.FailureClass_FAILURE_CLASS_RETRYABLE_INFRASTRUCTURE, + "catalog postgres connection timed out", + "retry_verification", +) +``` + +## Operator inspection + +```bash +flowctl chunks list --run +flowctl chunks list --run --status failed +flowctl chunks show +``` diff --git a/internal/api/control_plane.go b/internal/api/control_plane.go index 718a72d..64eede9 100644 --- a/internal/api/control_plane.go +++ b/internal/api/control_plane.go @@ -3,10 +3,12 @@ package api import ( "context" "fmt" + "strconv" "sync" "time" "google.golang.org/grpc" + gproto "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" @@ -675,6 +677,104 @@ func (s *ControlPlaneServer) StopPipelineRun(ctx context.Context, req *flowctlpb return stoppedRun, nil } +// UpsertChunkRun implements the UpsertChunkRun RPC. +func (s *ControlPlaneServer) UpsertChunkRun(ctx context.Context, req *flowctlpb.UpsertChunkRunRequest) (*flowctlpb.ChunkRun, error) { + if s.storage == nil { + return nil, fmt.Errorf("storage not configured") + } + if req.Chunk == nil { + return nil, fmt.Errorf("chunk is required") + } + + chunk := req.Chunk + if chunk.PipelineRunId == "" { + return nil, fmt.Errorf("chunk pipeline_run_id is required") + } + if chunk.ComponentId == "" { + return nil, fmt.Errorf("chunk component_id is required") + } + if chunk.ChunkStart > chunk.ChunkEnd { + return nil, fmt.Errorf("chunk_start must be less than or equal to chunk_end") + } + if chunk.Attempt == 0 { + chunk.Attempt = 1 + } + if chunk.ChunkId == "" { + chunk.ChunkId = fmt.Sprintf("%s:%s:%d-%d:%d", chunk.PipelineRunId, chunk.ComponentId, chunk.ChunkStart, chunk.ChunkEnd, chunk.Attempt) + } + + if existing, err := s.storage.GetChunkRun(ctx, chunk.ChunkId); err == nil && existing != nil && existing.Chunk != nil { + chunk = mergeChunkRun(existing.Chunk, chunk) + } else if err != nil && !storage.IsNotFound(err) { + return nil, fmt.Errorf("failed to check existing chunk run: %w", err) + } + + if err := s.storage.UpsertChunkRun(ctx, &storage.ChunkRunInfo{Chunk: chunk}); err != nil { + logger.Error("Failed to upsert chunk run", + zap.String("chunk_id", chunk.ChunkId), + zap.Error(err)) + return nil, fmt.Errorf("failed to upsert chunk run: %w", err) + } + + logger.Debug("Chunk run upserted", + zap.String("chunk_id", chunk.ChunkId), + zap.String("pipeline_run_id", chunk.PipelineRunId), + zap.String("component_id", chunk.ComponentId), + zap.Int64("chunk_start", chunk.ChunkStart), + zap.Int64("chunk_end", chunk.ChunkEnd), + zap.String("status", chunk.Status.String())) + + return chunk, nil +} + +// GetChunkRun implements the GetChunkRun RPC. +func (s *ControlPlaneServer) GetChunkRun(ctx context.Context, req *flowctlpb.GetChunkRunRequest) (*flowctlpb.ChunkRun, error) { + if s.storage == nil { + return nil, fmt.Errorf("storage not configured") + } + + chunkInfo, err := s.storage.GetChunkRun(ctx, req.ChunkId) + if err != nil { + if storage.IsNotFound(err) { + return nil, fmt.Errorf("chunk run not found: %s", req.ChunkId) + } + logger.Error("Failed to get chunk run", + zap.String("chunk_id", req.ChunkId), + zap.Error(err)) + return nil, fmt.Errorf("failed to get chunk run: %w", err) + } + + return chunkInfo.Chunk, nil +} + +// ListChunkRuns implements the ListChunkRuns RPC. +func (s *ControlPlaneServer) ListChunkRuns(ctx context.Context, req *flowctlpb.ListChunkRunsRequest) (*flowctlpb.ListChunkRunsResponse, error) { + if s.storage == nil { + return &flowctlpb.ListChunkRunsResponse{Chunks: []*flowctlpb.ChunkRun{}}, nil + } + + limit := req.Limit + if limit == 0 { + limit = 100 + } + + chunkInfos, err := s.storage.ListChunkRuns(ctx, req.PipelineRunId, req.ComponentId, req.Status, limit) + if err != nil { + logger.Error("Failed to list chunk runs", + zap.String("pipeline_run_id", req.PipelineRunId), + zap.String("component_id", req.ComponentId), + zap.Error(err)) + return nil, fmt.Errorf("failed to list chunk runs: %w", err) + } + + chunks := make([]*flowctlpb.ChunkRun, len(chunkInfos)) + for i, chunkInfo := range chunkInfos { + chunks[i] = chunkInfo.Chunk + } + + return &flowctlpb.ListChunkRunsResponse{Chunks: chunks}, nil +} + // ControlPlaneWrapper wraps ControlPlaneServer to implement the flowctlpb.ControlPlane interface // This is needed because flowctlpb and v1 have methods with the same names but different signatures type ControlPlaneWrapper struct { @@ -732,6 +832,7 @@ func (w *ControlPlaneWrapper) Register(ctx context.Context, req *flowctlpb.Servi func (w *ControlPlaneWrapper) Heartbeat(ctx context.Context, req *flowctlpb.ServiceHeartbeat) (*emptypb.Empty, error) { v1Req := &flowctlv1.HeartbeatRequest{ ServiceId: req.ServiceId, + Metrics: float64MapToStringMap(req.Metrics), } return w.server.Heartbeat(ctx, v1Req) @@ -785,6 +886,18 @@ func (w *ControlPlaneWrapper) StopPipelineRun(ctx context.Context, req *flowctlp return w.server.StopPipelineRun(ctx, req) } +func (w *ControlPlaneWrapper) UpsertChunkRun(ctx context.Context, req *flowctlpb.UpsertChunkRunRequest) (*flowctlpb.ChunkRun, error) { + return w.server.UpsertChunkRun(ctx, req) +} + +func (w *ControlPlaneWrapper) GetChunkRun(ctx context.Context, req *flowctlpb.GetChunkRunRequest) (*flowctlpb.ChunkRun, error) { + return w.server.GetChunkRun(ctx, req) +} + +func (w *ControlPlaneWrapper) ListChunkRuns(ctx context.Context, req *flowctlpb.ListChunkRunsRequest) (*flowctlpb.ListChunkRunsResponse, error) { + return w.server.ListChunkRuns(ctx, req) +} + func copyStringMap(src map[string]string) map[string]string { if len(src) == 0 { return map[string]string{} @@ -820,6 +933,69 @@ func mergeStringMaps(base, updates map[string]string) map[string]string { return merged } +func float64MapToStringMap(src map[string]float64) map[string]string { + if len(src) == 0 { + return map[string]string{} + } + converted := make(map[string]string, len(src)) + for k, v := range src { + converted[k] = strconv.FormatFloat(v, 'f', -1, 64) + } + return converted +} + +func mergeInt64Maps(base, updates map[string]int64) map[string]int64 { + if len(base) == 0 && len(updates) == 0 { + return map[string]int64{} + } + merged := make(map[string]int64, len(base)+len(updates)) + for k, v := range base { + merged[k] = v + } + for k, v := range updates { + merged[k] = v + } + return merged +} + +func mergeChunkRun(existing, update *flowctlpb.ChunkRun) *flowctlpb.ChunkRun { + merged := gproto.Clone(existing).(*flowctlpb.ChunkRun) + merged.ChunkId = update.ChunkId + merged.PipelineRunId = update.PipelineRunId + merged.ComponentId = update.ComponentId + merged.ChunkStart = update.ChunkStart + merged.ChunkEnd = update.ChunkEnd + merged.Attempt = update.Attempt + if update.Status != flowctlpb.ChunkStatus_CHUNK_STATUS_UNKNOWN { + merged.Status = update.Status + } + if update.FailureClass != flowctlpb.FailureClass_FAILURE_CLASS_UNKNOWN { + merged.FailureClass = update.FailureClass + } + if update.Phase != "" { + merged.Phase = update.Phase + } + if update.Error != "" { + merged.Error = update.Error + } + if update.RecommendedAction != "" { + merged.RecommendedAction = update.RecommendedAction + } + if update.StartedAt != nil { + merged.StartedAt = update.StartedAt + } + if update.CompletedAt != nil { + merged.CompletedAt = update.CompletedAt + } + if update.VerifiedAt != nil { + merged.VerifiedAt = update.VerifiedAt + } + merged.RowCounts = mergeInt64Maps(existing.RowCounts, update.RowCounts) + merged.Verification = mergeStringMaps(existing.Verification, update.Verification) + merged.Metadata = mergeStringMaps(existing.Metadata, update.Metadata) + return merged +} + // Helper functions to convert between v1 and flowctlpb types func convertToV1ComponentType(t flowctlpb.ServiceType) flowctlv1.ComponentType { switch t { diff --git a/internal/api/control_plane_test.go b/internal/api/control_plane_test.go index 258c58d..452e8ca 100644 --- a/internal/api/control_plane_test.go +++ b/internal/api/control_plane_test.go @@ -402,8 +402,108 @@ func TestControlPlaneWrapper_Register(t *testing.T) { // Heartbeat should work with the returned ServiceId _, err = wrapper.Heartbeat(ctx, &flowctlpb.ServiceHeartbeat{ ServiceId: ack.ServiceId, + Metrics: map[string]float64{ + "ledgers_processed": 42, + }, }) if err != nil { t.Fatalf("Heartbeat failed with returned ServiceId: %v", err) } + if got := stored.Status.Metrics["ledgers_processed"]; got != "42" { + t.Fatalf("expected heartbeat metrics to be converted and stored, got %q", got) + } +} + +func TestControlPlaneServer_ChunkRuns(t *testing.T) { + memStorage, cleanup := createTestStorage() + defer cleanup() + + server := NewControlPlaneServer(memStorage) + wrapper := NewControlPlaneWrapper(server) + ctx := context.Background() + + chunk, err := wrapper.UpsertChunkRun(ctx, &flowctlpb.UpsertChunkRunRequest{Chunk: &flowctlpb.ChunkRun{ + PipelineRunId: "run-1", + ComponentId: "bronze-history-loader", + ChunkStart: 30750003, + ChunkEnd: 31000002, + Status: flowctlpb.ChunkStatus_CHUNK_STATUS_RUNNING, + Phase: "ducklake_push", + RowCounts: map[string]int64{ + "ledgers_row_v2": 250000, + }, + }}) + if err != nil { + t.Fatalf("UpsertChunkRun failed: %v", err) + } + if chunk.ChunkId == "" { + t.Fatal("expected generated chunk id") + } + if chunk.Attempt != 1 { + t.Fatalf("expected default attempt 1, got %d", chunk.Attempt) + } + + got, err := wrapper.GetChunkRun(ctx, &flowctlpb.GetChunkRunRequest{ChunkId: chunk.ChunkId}) + if err != nil { + t.Fatalf("GetChunkRun failed: %v", err) + } + if got.Phase != "ducklake_push" { + t.Fatalf("expected phase ducklake_push, got %q", got.Phase) + } + + completed, err := wrapper.UpsertChunkRun(ctx, &flowctlpb.UpsertChunkRunRequest{Chunk: &flowctlpb.ChunkRun{ + ChunkId: chunk.ChunkId, + PipelineRunId: "run-1", + ComponentId: "bronze-history-loader", + ChunkStart: 30750003, + ChunkEnd: 31000002, + Attempt: 1, + Status: flowctlpb.ChunkStatus_CHUNK_STATUS_COMPLETED, + }}) + if err != nil { + t.Fatalf("terminal UpsertChunkRun failed: %v", err) + } + if completed.Phase != "ducklake_push" { + t.Fatalf("expected terminal update to preserve phase, got %q", completed.Phase) + } + if completed.RowCounts["ledgers_row_v2"] != 250000 { + t.Fatalf("expected terminal update to preserve row counts, got %v", completed.RowCounts) + } + + list, err := wrapper.ListChunkRuns(ctx, &flowctlpb.ListChunkRunsRequest{ + PipelineRunId: "run-1", + ComponentId: "bronze-history-loader", + Status: flowctlpb.ChunkStatus_CHUNK_STATUS_COMPLETED, + }) + if err != nil { + t.Fatalf("ListChunkRuns failed: %v", err) + } + if len(list.Chunks) != 1 { + t.Fatalf("expected 1 chunk, got %d", len(list.Chunks)) + } +} + +func TestControlPlaneServer_ChunkRunValidation(t *testing.T) { + memStorage, cleanup := createTestStorage() + defer cleanup() + + wrapper := NewControlPlaneWrapper(NewControlPlaneServer(memStorage)) + ctx := context.Background() + + cases := []struct { + name string + chunk *flowctlpb.ChunkRun + }{ + {name: "missing run", chunk: &flowctlpb.ChunkRun{ComponentId: "component", ChunkStart: 1, ChunkEnd: 2}}, + {name: "missing component", chunk: &flowctlpb.ChunkRun{PipelineRunId: "run", ChunkStart: 1, ChunkEnd: 2}}, + {name: "reversed range", chunk: &flowctlpb.ChunkRun{PipelineRunId: "run", ComponentId: "component", ChunkStart: 3, ChunkEnd: 2}}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if _, err := wrapper.UpsertChunkRun(ctx, &flowctlpb.UpsertChunkRunRequest{Chunk: tc.chunk}); err == nil { + t.Fatal("expected validation error") + } + }) + } } diff --git a/internal/orchestrator/process.go b/internal/orchestrator/process.go index e5bc5d0..bd462a4 100644 --- a/internal/orchestrator/process.go +++ b/internal/orchestrator/process.go @@ -455,6 +455,7 @@ func (p *ProcessOrchestrator) buildEnvironment(component *Component) []string { "ENABLE_FLOWCTL=true", fmt.Sprintf("FLOWCTL_ENDPOINT=%s", p.controlPlaneEndpoint), "FLOWCTL_HEARTBEAT_INTERVAL=10s", + "FLOWCTL_HEARTBEAT_INTERVAL_MS=10000", fmt.Sprintf("FLOWCTL_SERVICE_ID=%s", component.ID), fmt.Sprintf("FLOWCTL_COMPONENT_ID=%s", component.ID), // Component ID for registration matching ) diff --git a/internal/runner/pipeline_runner.go b/internal/runner/pipeline_runner.go index 51e6054..7bb33a1 100644 --- a/internal/runner/pipeline_runner.go +++ b/internal/runner/pipeline_runner.go @@ -379,6 +379,17 @@ func (r *PipelineRunner) startComponents() error { // convertToOrchestratorComponent converts a model.Component to orchestrator.Component func (r *PipelineRunner) convertToOrchestratorComponent(modelComp model.Component, componentType string) *orchestrator.Component { + env := make(map[string]string, len(modelComp.Env)+2) + for key, value := range modelComp.Env { + env[key] = value + } + if r.runID != "" { + env["FLOWCTL_RUN_ID"] = r.runID + } + if _, ok := env["FLOWCTL_ATTEMPT"]; !ok { + env["FLOWCTL_ATTEMPT"] = "1" + } + orchComp := &orchestrator.Component{ ID: modelComp.ID, Type: modelComp.Type, @@ -386,7 +397,7 @@ func (r *PipelineRunner) convertToOrchestratorComponent(modelComp model.Componen Image: modelComp.Image, Command: modelComp.Command, Args: modelComp.Args, - Environment: modelComp.Env, + Environment: env, Dependencies: modelComp.Inputs, RestartPolicy: modelComp.RestartPolicy, } diff --git a/internal/storage/bolt.go b/internal/storage/bolt.go index 244b024..8c739e7 100644 --- a/internal/storage/bolt.go +++ b/internal/storage/bolt.go @@ -31,6 +31,9 @@ var serviceBucket = []byte("services") // pipelineRunBucket is the name of the bucket where pipeline run information is stored var pipelineRunBucket = []byte("pipeline_runs") +// chunkRunBucket is the name of the bucket where historical chunk run information is stored. +var chunkRunBucket = []byte("chunk_runs") + // BoltDBStorage implements the ServiceStorage interface using BoltDB type BoltDBStorage struct { db *bolt.DB @@ -66,7 +69,7 @@ func NewBoltDBStorage(opts *BoltOptions) *BoltDBStorage { } return &BoltDBStorage{ - path: opts.Path, + path: opts.Path, options: opts, } } @@ -83,7 +86,7 @@ func (s *BoltDBStorage) Open() error { // Open the database opts := &bolt.Options{Timeout: DefaultBoltTimeout} fileMode := DefaultBoltFileMode - + if s.options != nil { if s.options.Timeout > 0 { opts.Timeout = s.options.Timeout @@ -92,7 +95,7 @@ func (s *BoltDBStorage) Open() error { fileMode = s.options.FileMode } } - + db, err := bolt.Open(s.path, fileMode, opts) if err != nil { return fmt.Errorf("failed to open BoltDB: %w", err) @@ -109,6 +112,10 @@ func (s *BoltDBStorage) Open() error { if err != nil { return fmt.Errorf("failed to create pipeline_runs bucket: %w", err) } + _, err = tx.CreateBucketIfNotExists(chunkRunBucket) + if err != nil { + return fmt.Errorf("failed to create chunk_runs bucket: %w", err) + } return nil }) if err != nil { @@ -515,4 +522,109 @@ func (s *BoltDBStorage) DeletePipelineRun(ctx context.Context, runID string) err return nil }) -} \ No newline at end of file +} + +// UpsertChunkRun creates or replaces a chunk run in the registry. +func (s *BoltDBStorage) UpsertChunkRun(ctx context.Context, chunk *ChunkRunInfo) error { + logger.Debug("Upserting chunk run", zap.String("chunk_id", chunk.Chunk.ChunkId)) + return s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(chunkRunBucket) + if b == nil { + return fmt.Errorf("chunk_runs bucket not found") + } + + data, err := json.Marshal(chunk) + if err != nil { + return fmt.Errorf("failed to marshal chunk run: %w", err) + } + + if err := b.Put([]byte(chunk.Chunk.ChunkId), data); err != nil { + return fmt.Errorf("failed to store chunk run: %w", err) + } + return nil + }) +} + +// GetChunkRun retrieves a chunk run by its ID. +func (s *BoltDBStorage) GetChunkRun(ctx context.Context, chunkID string) (*ChunkRunInfo, error) { + logger.Debug("Getting chunk run", zap.String("chunk_id", chunkID)) + var chunk *ChunkRunInfo + err := s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(chunkRunBucket) + if b == nil { + return fmt.Errorf("chunk_runs bucket not found") + } + + data := b.Get([]byte(chunkID)) + if data == nil { + return ErrChunkRunNotFound{ChunkID: chunkID} + } + + var c ChunkRunInfo + if err := json.Unmarshal(data, &c); err != nil { + return fmt.Errorf("failed to unmarshal chunk run: %w", err) + } + chunk = &c + return nil + }) + return chunk, err +} + +// ListChunkRuns retrieves chunk runs, optionally filtered by pipeline run, component, and status. +func (s *BoltDBStorage) ListChunkRuns(ctx context.Context, pipelineRunID string, componentID string, status flowctlpb.ChunkStatus, limit int32) ([]*ChunkRunInfo, error) { + logger.Debug("Listing chunk runs", zap.String("pipeline_run_id", pipelineRunID), zap.String("component_id", componentID), zap.Any("status", status), zap.Int32("limit", limit)) + var chunks []*ChunkRunInfo + err := s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(chunkRunBucket) + if b == nil { + return fmt.Errorf("chunk_runs bucket not found") + } + + count := int32(0) + cursor := b.Cursor() + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + if limit > 0 && count >= limit { + break + } + + var chunk ChunkRunInfo + if err := json.Unmarshal(v, &chunk); err != nil { + return fmt.Errorf("failed to unmarshal chunk run: %w", err) + } + if pipelineRunID != "" && chunk.Chunk.PipelineRunId != pipelineRunID { + continue + } + if componentID != "" && chunk.Chunk.ComponentId != componentID { + continue + } + if status != flowctlpb.ChunkStatus_CHUNK_STATUS_UNKNOWN && chunk.Chunk.Status != status { + continue + } + + chunks = append(chunks, &chunk) + count++ + } + return nil + }) + return chunks, err +} + +// DeleteChunkRun removes a chunk run from the registry. +func (s *BoltDBStorage) DeleteChunkRun(ctx context.Context, chunkID string) error { + logger.Debug("Deleting chunk run", zap.String("chunk_id", chunkID)) + return s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(chunkRunBucket) + if b == nil { + return fmt.Errorf("chunk_runs bucket not found") + } + + key := []byte(chunkID) + if b.Get(key) == nil { + return ErrChunkRunNotFound{ChunkID: chunkID} + } + if err := b.Delete(key); err != nil { + return fmt.Errorf("failed to delete chunk run: %w", err) + } + return nil + }) +} diff --git a/internal/storage/interface.go b/internal/storage/interface.go index 52edb34..5fe7bbc 100644 --- a/internal/storage/interface.go +++ b/internal/storage/interface.go @@ -23,26 +23,31 @@ type PipelineRunInfo struct { Run *flowctlpb.PipelineRun } +// ChunkRunInfo represents the complete information about a historical chunk run. +type ChunkRunInfo struct { + Chunk *flowctlpb.ChunkRun +} + // ServiceStorage defines the interface for persistent storage of service registry type ServiceStorage interface { // Open initializes the storage and makes it ready for use Open() error - + // Close closes the storage and releases any resources Close() error - + // RegisterService stores a new service in the registry RegisterService(ctx context.Context, service *ServiceInfo) error - + // UpdateService updates an existing service in the registry UpdateService(ctx context.Context, serviceID string, updater func(*ServiceInfo) error) error - + // GetService retrieves a service by its ID GetService(ctx context.Context, serviceID string) (*ServiceInfo, error) - + // ListServices retrieves all services in the registry ListServices(ctx context.Context) ([]*ServiceInfo, error) - + // DeleteService removes a service from the registry DeleteService(ctx context.Context, serviceID string) error @@ -69,22 +74,36 @@ type ServiceStorage interface { // DeletePipelineRun removes a pipeline run from the registry DeletePipelineRun(ctx context.Context, runID string) error + + // Chunk run management methods + + // UpsertChunkRun creates or replaces a chunk run record. + UpsertChunkRun(ctx context.Context, chunk *ChunkRunInfo) error + + // GetChunkRun retrieves a chunk run by its ID. + GetChunkRun(ctx context.Context, chunkID string) (*ChunkRunInfo, error) + + // ListChunkRuns retrieves chunk runs, optionally filtered by pipeline run, component, and status. + ListChunkRuns(ctx context.Context, pipelineRunID string, componentID string, status flowctlpb.ChunkStatus, limit int32) ([]*ChunkRunInfo, error) + + // DeleteChunkRun removes a chunk run from the registry. + DeleteChunkRun(ctx context.Context, chunkID string) error } // Transaction represents a storage transaction type Transaction interface { // RegisterService stores a new service in the registry within a transaction RegisterService(service *ServiceInfo) error - + // UpdateService updates an existing service in the registry within a transaction UpdateService(serviceID string, updater func(*ServiceInfo) error) error - + // GetService retrieves a service by its ID within a transaction GetService(serviceID string) (*ServiceInfo, error) - + // ListServices retrieves all services in the registry within a transaction ListServices() ([]*ServiceInfo, error) - + // DeleteService removes a service from the registry within a transaction DeleteService(serviceID string) error } @@ -109,9 +128,20 @@ func (e ErrPipelineRunNotFound) Error() string { return "pipeline run not found: " + e.RunID } -// IsNotFound returns true if the error is ErrServiceNotFound or ErrPipelineRunNotFound +// ErrChunkRunNotFound is returned when a chunk run with the specified ID is not found. +type ErrChunkRunNotFound struct { + ChunkID string +} + +// Error implements the error interface. +func (e ErrChunkRunNotFound) Error() string { + return "chunk run not found: " + e.ChunkID +} + +// IsNotFound returns true if the error is ErrServiceNotFound, ErrPipelineRunNotFound, or ErrChunkRunNotFound. func IsNotFound(err error) bool { _, okService := err.(ErrServiceNotFound) _, okRun := err.(ErrPipelineRunNotFound) - return okService || okRun -} \ No newline at end of file + _, okChunk := err.(ErrChunkRunNotFound) + return okService || okRun || okChunk +} diff --git a/internal/storage/memory.go b/internal/storage/memory.go index 82e42fe..59dee03 100644 --- a/internal/storage/memory.go +++ b/internal/storage/memory.go @@ -14,6 +14,7 @@ type MemoryStorage struct { mu sync.RWMutex services map[string]*ServiceInfo pipelineRuns map[string]*PipelineRunInfo + chunkRuns map[string]*ChunkRunInfo } // NewMemoryStorage creates a new in-memory storage for testing @@ -21,6 +22,7 @@ func NewMemoryStorage() ServiceStorage { return &MemoryStorage{ services: make(map[string]*ServiceInfo), pipelineRuns: make(map[string]*PipelineRunInfo), + chunkRuns: make(map[string]*ChunkRunInfo), } } @@ -237,4 +239,66 @@ func (s *MemoryStorage) DeletePipelineRun(ctx context.Context, runID string) err delete(s.pipelineRuns, runID) return nil -} \ No newline at end of file +} + +// UpsertChunkRun creates or replaces a chunk run in the registry. +func (s *MemoryStorage) UpsertChunkRun(ctx context.Context, chunk *ChunkRunInfo) error { + s.mu.Lock() + defer s.mu.Unlock() + + logger.Debug("Upserting chunk run in memory", zap.String("chunk_id", chunk.Chunk.ChunkId)) + s.chunkRuns[chunk.Chunk.ChunkId] = chunk + return nil +} + +// GetChunkRun retrieves a chunk run by its ID. +func (s *MemoryStorage) GetChunkRun(ctx context.Context, chunkID string) (*ChunkRunInfo, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + chunk, ok := s.chunkRuns[chunkID] + if !ok { + return nil, ErrChunkRunNotFound{ChunkID: chunkID} + } + + return chunk, nil +} + +// ListChunkRuns retrieves chunk runs, optionally filtered by pipeline run, component, and status. +func (s *MemoryStorage) ListChunkRuns(ctx context.Context, pipelineRunID string, componentID string, status flowctlpb.ChunkStatus, limit int32) ([]*ChunkRunInfo, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + chunks := make([]*ChunkRunInfo, 0) + count := int32(0) + for _, chunk := range s.chunkRuns { + if limit > 0 && count >= limit { + break + } + if pipelineRunID != "" && chunk.Chunk.PipelineRunId != pipelineRunID { + continue + } + if componentID != "" && chunk.Chunk.ComponentId != componentID { + continue + } + if status != flowctlpb.ChunkStatus_CHUNK_STATUS_UNKNOWN && chunk.Chunk.Status != status { + continue + } + chunks = append(chunks, chunk) + count++ + } + + return chunks, nil +} + +// DeleteChunkRun removes a chunk run from the registry. +func (s *MemoryStorage) DeleteChunkRun(ctx context.Context, chunkID string) error { + s.mu.Lock() + defer s.mu.Unlock() + + if _, ok := s.chunkRuns[chunkID]; !ok { + return ErrChunkRunNotFound{ChunkID: chunkID} + } + delete(s.chunkRuns, chunkID) + return nil +} diff --git a/pkg/component/reporter.go b/pkg/component/reporter.go new file mode 100644 index 0000000..ff398a5 --- /dev/null +++ b/pkg/component/reporter.go @@ -0,0 +1,343 @@ +package component + +import ( + "context" + "fmt" + "os" + "strconv" + "strings" + "time" + + flowctlpb "github.com/withobsrvr/flowctl/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/timestamppb" +) + +const ( + defaultHeartbeatInterval = 10 * time.Second + defaultDialTimeout = 5 * time.Second +) + +// Config describes the flowctl control-plane connection advertised to a data-plane component. +type Config struct { + Enabled bool + Endpoint string + ComponentID string + RunID string + Attempt int32 + HeartbeatInterval time.Duration +} + +// ConfigFromEnv loads the standard flowctl component environment contract. +func ConfigFromEnv() Config { + attempt := int32(1) + if raw := os.Getenv("FLOWCTL_ATTEMPT"); raw != "" { + if parsed, err := strconv.ParseInt(raw, 10, 32); err == nil && parsed > 0 { + attempt = int32(parsed) + } + } + + interval := defaultHeartbeatInterval + if raw := os.Getenv("FLOWCTL_HEARTBEAT_INTERVAL_MS"); raw != "" { + if parsed, err := strconv.ParseInt(raw, 10, 64); err == nil && parsed > 0 { + interval = time.Duration(parsed) * time.Millisecond + } + } + + return Config{ + Enabled: strings.EqualFold(os.Getenv("ENABLE_FLOWCTL"), "true"), + Endpoint: os.Getenv("FLOWCTL_ENDPOINT"), + ComponentID: os.Getenv("FLOWCTL_COMPONENT_ID"), + RunID: os.Getenv("FLOWCTL_RUN_ID"), + Attempt: attempt, + HeartbeatInterval: interval, + } +} + +// Reporter emits component lifecycle and historical chunk state to flowctl. +type Reporter struct { + cfg Config + conn *grpc.ClientConn + client flowctlpb.ControlPlaneClient + serviceID string +} + +// NewReporter connects to the control plane. If cfg.Enabled is false it returns a disabled no-op reporter. +func NewReporter(ctx context.Context, cfg Config) (*Reporter, error) { + r := &Reporter{cfg: cfg} + if !cfg.Enabled { + return r, nil + } + if cfg.Endpoint == "" { + return nil, fmt.Errorf("FLOWCTL_ENDPOINT is required when ENABLE_FLOWCTL=true") + } + if cfg.ComponentID == "" { + return nil, fmt.Errorf("FLOWCTL_COMPONENT_ID is required when ENABLE_FLOWCTL=true") + } + + dialCtx := ctx + cancel := func() {} + if _, ok := ctx.Deadline(); !ok { + dialCtx, cancel = context.WithTimeout(ctx, defaultDialTimeout) + } + defer cancel() + + conn, err := grpc.DialContext(dialCtx, cfg.Endpoint, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + ) + if err != nil { + return nil, fmt.Errorf("connect to flowctl control plane: %w", err) + } + + r.conn = conn + r.client = flowctlpb.NewControlPlaneClient(conn) + return r, nil +} + +// Close closes the underlying gRPC connection. +func (r *Reporter) Close() error { + if r == nil || r.conn == nil { + return nil + } + return r.conn.Close() +} + +// Enabled reports whether this reporter will send events. +func (r *Reporter) Enabled() bool { + return r != nil && r.cfg.Enabled +} + +// Register announces the component to flowctl. +func (r *Reporter) Register(ctx context.Context, serviceType flowctlpb.ServiceType, metadata map[string]string) error { + if !r.Enabled() { + return nil + } + + meta := copyMap(metadata) + meta["flowctl_run_id"] = r.cfg.RunID + meta["flowctl_attempt"] = strconv.Itoa(int(r.cfg.Attempt)) + + resp, err := r.client.Register(ctx, &flowctlpb.ServiceInfo{ + ServiceId: r.cfg.ComponentID, + ComponentId: r.cfg.ComponentID, + ServiceType: serviceType, + Metadata: meta, + }) + if err != nil { + return fmt.Errorf("register component with flowctl: %w", err) + } + if resp.ServiceId != "" { + r.serviceID = resp.ServiceId + } else { + r.serviceID = r.cfg.ComponentID + } + return nil +} + +// Heartbeat sends a coarse liveness/metric heartbeat. +func (r *Reporter) Heartbeat(ctx context.Context, metrics map[string]float64) error { + if !r.Enabled() { + return nil + } + serviceID := r.serviceID + if serviceID == "" { + serviceID = r.cfg.ComponentID + } + _, err := r.client.Heartbeat(ctx, &flowctlpb.ServiceHeartbeat{ + ServiceId: serviceID, + Metrics: metrics, + }) + if err != nil { + return fmt.Errorf("send flowctl heartbeat: %w", err) + } + return nil +} + +// StartHeartbeatLoop emits heartbeats until ctx is cancelled. Errors are sent to errCh when provided. +func (r *Reporter) StartHeartbeatLoop(ctx context.Context, metrics func() map[string]float64, errCh chan<- error) { + if !r.Enabled() { + return + } + interval := r.cfg.HeartbeatInterval + if interval <= 0 { + interval = defaultHeartbeatInterval + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + var snapshot map[string]float64 + if metrics != nil { + snapshot = metrics() + } + if err := r.Heartbeat(ctx, snapshot); err != nil && errCh != nil { + select { + case errCh <- err: + default: + } + } + } + } +} + +// ChunkUpdate describes a bounded historical work-unit state update. +type ChunkUpdate struct { + ChunkID string + ComponentID string + RunID string + ChunkStart int64 + ChunkEnd int64 + Attempt int32 + Status flowctlpb.ChunkStatus + FailureClass flowctlpb.FailureClass + Phase string + Error string + RecommendedAction string + StartedAt *time.Time + CompletedAt *time.Time + VerifiedAt *time.Time + RowCounts map[string]int64 + Verification map[string]string + Metadata map[string]string +} + +// ReportChunk upserts a historical chunk state record. +func (r *Reporter) ReportChunk(ctx context.Context, update ChunkUpdate) (*flowctlpb.ChunkRun, error) { + if !r.Enabled() { + return nil, nil + } + + componentID := firstNonEmpty(update.ComponentID, r.cfg.ComponentID) + runID := firstNonEmpty(update.RunID, r.cfg.RunID) + attempt := update.Attempt + if attempt == 0 { + attempt = r.cfg.Attempt + } + if attempt == 0 { + attempt = 1 + } + + chunk := &flowctlpb.ChunkRun{ + ChunkId: update.ChunkID, + PipelineRunId: runID, + ComponentId: componentID, + ChunkStart: update.ChunkStart, + ChunkEnd: update.ChunkEnd, + Attempt: attempt, + Status: update.Status, + FailureClass: update.FailureClass, + Phase: update.Phase, + Error: update.Error, + RecommendedAction: update.RecommendedAction, + RowCounts: copyInt64Map(update.RowCounts), + Verification: copyMap(update.Verification), + Metadata: copyMap(update.Metadata), + } + if chunk.Status == flowctlpb.ChunkStatus_CHUNK_STATUS_UNKNOWN { + chunk.Status = flowctlpb.ChunkStatus_CHUNK_STATUS_RUNNING + } + if update.StartedAt != nil { + chunk.StartedAt = timestamppb.New(*update.StartedAt) + } + if update.CompletedAt != nil { + chunk.CompletedAt = timestamppb.New(*update.CompletedAt) + } + if update.VerifiedAt != nil { + chunk.VerifiedAt = timestamppb.New(*update.VerifiedAt) + } + + resp, err := r.client.UpsertChunkRun(ctx, &flowctlpb.UpsertChunkRunRequest{Chunk: chunk}) + if err != nil { + return nil, fmt.Errorf("report flowctl chunk run: %w", err) + } + return resp, nil +} + +// ReportChunkProgress marks a chunk running in a phase. +func (r *Reporter) ReportChunkProgress(ctx context.Context, start, end int64, phase string, rowCounts map[string]int64, metadata map[string]string) error { + now := time.Now() + _, err := r.ReportChunk(ctx, ChunkUpdate{ + ChunkStart: start, + ChunkEnd: end, + Status: flowctlpb.ChunkStatus_CHUNK_STATUS_RUNNING, + Phase: phase, + StartedAt: &now, + RowCounts: rowCounts, + Metadata: metadata, + }) + return err +} + +// ReportChunkCompleted marks a chunk completed or verified. +func (r *Reporter) ReportChunkCompleted(ctx context.Context, start, end int64, verified bool, rowCounts map[string]int64, verification map[string]string) error { + now := time.Now() + status := flowctlpb.ChunkStatus_CHUNK_STATUS_COMPLETED + var verifiedAt *time.Time + if verified { + status = flowctlpb.ChunkStatus_CHUNK_STATUS_VERIFIED + verifiedAt = &now + } + _, err := r.ReportChunk(ctx, ChunkUpdate{ + ChunkStart: start, + ChunkEnd: end, + Status: status, + CompletedAt: &now, + VerifiedAt: verifiedAt, + RowCounts: rowCounts, + Verification: verification, + }) + return err +} + +// ReportChunkFailed marks a chunk failed with a typed failure class. +func (r *Reporter) ReportChunkFailed(ctx context.Context, start, end int64, phase string, failure flowctlpb.FailureClass, errText string, recommendedAction string) error { + now := time.Now() + _, err := r.ReportChunk(ctx, ChunkUpdate{ + ChunkStart: start, + ChunkEnd: end, + Status: flowctlpb.ChunkStatus_CHUNK_STATUS_FAILED, + FailureClass: failure, + Phase: phase, + Error: errText, + RecommendedAction: recommendedAction, + CompletedAt: &now, + }) + return err +} + +func firstNonEmpty(values ...string) string { + for _, value := range values { + if value != "" { + return value + } + } + return "" +} + +func copyMap(src map[string]string) map[string]string { + if len(src) == 0 { + return map[string]string{} + } + dst := make(map[string]string, len(src)) + for k, v := range src { + dst[k] = v + } + return dst +} + +func copyInt64Map(src map[string]int64) map[string]int64 { + if len(src) == 0 { + return map[string]int64{} + } + dst := make(map[string]int64, len(src)) + for k, v := range src { + dst[k] = v + } + return dst +} diff --git a/pkg/component/reporter_test.go b/pkg/component/reporter_test.go new file mode 100644 index 0000000..51a75d8 --- /dev/null +++ b/pkg/component/reporter_test.go @@ -0,0 +1,94 @@ +package component + +import ( + "context" + "os" + "testing" + "time" + + flowctlpb "github.com/withobsrvr/flowctl/proto" +) + +func TestConfigFromEnvDefaultsWhenDisabled(t *testing.T) { + t.Setenv("ENABLE_FLOWCTL", "") + t.Setenv("FLOWCTL_ENDPOINT", "") + t.Setenv("FLOWCTL_COMPONENT_ID", "") + t.Setenv("FLOWCTL_RUN_ID", "") + t.Setenv("FLOWCTL_ATTEMPT", "") + t.Setenv("FLOWCTL_HEARTBEAT_INTERVAL_MS", "") + + cfg := ConfigFromEnv() + if cfg.Enabled { + t.Fatal("expected flowctl to be disabled") + } + if cfg.Attempt != 1 { + t.Fatalf("expected default attempt 1, got %d", cfg.Attempt) + } + if cfg.HeartbeatInterval != defaultHeartbeatInterval { + t.Fatalf("expected default heartbeat interval %s, got %s", defaultHeartbeatInterval, cfg.HeartbeatInterval) + } +} + +func TestConfigFromEnvParsesFlowctlContract(t *testing.T) { + t.Setenv("ENABLE_FLOWCTL", "true") + t.Setenv("FLOWCTL_ENDPOINT", "flowctl.service.consul:8080") + t.Setenv("FLOWCTL_COMPONENT_ID", "bronze-history-loader") + t.Setenv("FLOWCTL_RUN_ID", "mainnet-bronze-repair-20260612") + t.Setenv("FLOWCTL_ATTEMPT", "3") + t.Setenv("FLOWCTL_HEARTBEAT_INTERVAL_MS", "2500") + + cfg := ConfigFromEnv() + if !cfg.Enabled { + t.Fatal("expected flowctl to be enabled") + } + if cfg.Endpoint != "flowctl.service.consul:8080" { + t.Fatalf("unexpected endpoint: %s", cfg.Endpoint) + } + if cfg.ComponentID != "bronze-history-loader" { + t.Fatalf("unexpected component id: %s", cfg.ComponentID) + } + if cfg.RunID != "mainnet-bronze-repair-20260612" { + t.Fatalf("unexpected run id: %s", cfg.RunID) + } + if cfg.Attempt != 3 { + t.Fatalf("expected attempt 3, got %d", cfg.Attempt) + } + if cfg.HeartbeatInterval != 2500*time.Millisecond { + t.Fatalf("expected heartbeat interval 2500ms, got %s", cfg.HeartbeatInterval) + } +} + +func TestDisabledReporterIsNoop(t *testing.T) { + reporter, err := NewReporter(context.Background(), Config{Enabled: false}) + if err != nil { + t.Fatalf("disabled reporter should not error: %v", err) + } + if reporter.Enabled() { + t.Fatal("expected reporter to be disabled") + } + if err := reporter.Register(context.Background(), flowctlpb.ServiceType_SERVICE_TYPE_SOURCE, nil); err != nil { + t.Fatalf("disabled Register should be noop: %v", err) + } + if err := reporter.Heartbeat(context.Background(), nil); err != nil { + t.Fatalf("disabled Heartbeat should be noop: %v", err) + } + if err := reporter.ReportChunkProgress(context.Background(), 1, 2, "extract", nil, nil); err != nil { + t.Fatalf("disabled ReportChunkProgress should be noop: %v", err) + } +} + +func TestReportChunkRequiresEndpointAndComponentWhenEnabled(t *testing.T) { + _, err := NewReporter(context.Background(), Config{Enabled: true}) + if err == nil { + t.Fatal("expected missing endpoint error") + } + + _, err = NewReporter(context.Background(), Config{Enabled: true, Endpoint: "127.0.0.1:1"}) + if err == nil { + t.Fatal("expected missing component id error") + } +} + +func TestMain(m *testing.M) { + os.Exit(m.Run()) +} diff --git a/proto/control_plane.pb.go b/proto/control_plane.pb.go index 1c52cd7..89246cf 100644 --- a/proto/control_plane.pb.go +++ b/proto/control_plane.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.11 -// protoc v7.34.0 +// protoc-gen-go v1.36.10 +// protoc v6.32.1 // source: proto/control_plane.proto package flowctlpb @@ -138,6 +138,136 @@ func (RunStatus) EnumDescriptor() ([]byte, []int) { return file_proto_control_plane_proto_rawDescGZIP(), []int{1} } +// Historical chunk lifecycle status. +type ChunkStatus int32 + +const ( + ChunkStatus_CHUNK_STATUS_UNKNOWN ChunkStatus = 0 + ChunkStatus_CHUNK_STATUS_PLANNED ChunkStatus = 1 + ChunkStatus_CHUNK_STATUS_DISPATCHED ChunkStatus = 2 + ChunkStatus_CHUNK_STATUS_RUNNING ChunkStatus = 3 + ChunkStatus_CHUNK_STATUS_COMPLETED ChunkStatus = 4 + ChunkStatus_CHUNK_STATUS_VERIFIED ChunkStatus = 5 + ChunkStatus_CHUNK_STATUS_FAILED ChunkStatus = 6 + ChunkStatus_CHUNK_STATUS_RETRY_PENDING ChunkStatus = 7 + ChunkStatus_CHUNK_STATUS_ABORTED ChunkStatus = 8 +) + +// Enum value maps for ChunkStatus. +var ( + ChunkStatus_name = map[int32]string{ + 0: "CHUNK_STATUS_UNKNOWN", + 1: "CHUNK_STATUS_PLANNED", + 2: "CHUNK_STATUS_DISPATCHED", + 3: "CHUNK_STATUS_RUNNING", + 4: "CHUNK_STATUS_COMPLETED", + 5: "CHUNK_STATUS_VERIFIED", + 6: "CHUNK_STATUS_FAILED", + 7: "CHUNK_STATUS_RETRY_PENDING", + 8: "CHUNK_STATUS_ABORTED", + } + ChunkStatus_value = map[string]int32{ + "CHUNK_STATUS_UNKNOWN": 0, + "CHUNK_STATUS_PLANNED": 1, + "CHUNK_STATUS_DISPATCHED": 2, + "CHUNK_STATUS_RUNNING": 3, + "CHUNK_STATUS_COMPLETED": 4, + "CHUNK_STATUS_VERIFIED": 5, + "CHUNK_STATUS_FAILED": 6, + "CHUNK_STATUS_RETRY_PENDING": 7, + "CHUNK_STATUS_ABORTED": 8, + } +) + +func (x ChunkStatus) Enum() *ChunkStatus { + p := new(ChunkStatus) + *p = x + return p +} + +func (x ChunkStatus) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (ChunkStatus) Descriptor() protoreflect.EnumDescriptor { + return file_proto_control_plane_proto_enumTypes[2].Descriptor() +} + +func (ChunkStatus) Type() protoreflect.EnumType { + return &file_proto_control_plane_proto_enumTypes[2] +} + +func (x ChunkStatus) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use ChunkStatus.Descriptor instead. +func (ChunkStatus) EnumDescriptor() ([]byte, []int) { + return file_proto_control_plane_proto_rawDescGZIP(), []int{2} +} + +// Typed failure class reported by data-plane components. +type FailureClass int32 + +const ( + FailureClass_FAILURE_CLASS_UNKNOWN FailureClass = 0 + FailureClass_FAILURE_CLASS_RETRYABLE_INFRASTRUCTURE FailureClass = 1 + FailureClass_FAILURE_CLASS_NON_RETRYABLE_DATA FailureClass = 2 + FailureClass_FAILURE_CLASS_NON_RETRYABLE_SCHEMA FailureClass = 3 + FailureClass_FAILURE_CLASS_VERIFICATION FailureClass = 4 + FailureClass_FAILURE_CLASS_ALLOCATION_LOST FailureClass = 5 + FailureClass_FAILURE_CLASS_OOM FailureClass = 6 +) + +// Enum value maps for FailureClass. +var ( + FailureClass_name = map[int32]string{ + 0: "FAILURE_CLASS_UNKNOWN", + 1: "FAILURE_CLASS_RETRYABLE_INFRASTRUCTURE", + 2: "FAILURE_CLASS_NON_RETRYABLE_DATA", + 3: "FAILURE_CLASS_NON_RETRYABLE_SCHEMA", + 4: "FAILURE_CLASS_VERIFICATION", + 5: "FAILURE_CLASS_ALLOCATION_LOST", + 6: "FAILURE_CLASS_OOM", + } + FailureClass_value = map[string]int32{ + "FAILURE_CLASS_UNKNOWN": 0, + "FAILURE_CLASS_RETRYABLE_INFRASTRUCTURE": 1, + "FAILURE_CLASS_NON_RETRYABLE_DATA": 2, + "FAILURE_CLASS_NON_RETRYABLE_SCHEMA": 3, + "FAILURE_CLASS_VERIFICATION": 4, + "FAILURE_CLASS_ALLOCATION_LOST": 5, + "FAILURE_CLASS_OOM": 6, + } +) + +func (x FailureClass) Enum() *FailureClass { + p := new(FailureClass) + *p = x + return p +} + +func (x FailureClass) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (FailureClass) Descriptor() protoreflect.EnumDescriptor { + return file_proto_control_plane_proto_enumTypes[3].Descriptor() +} + +func (FailureClass) Type() protoreflect.EnumType { + return &file_proto_control_plane_proto_enumTypes[3] +} + +func (x FailureClass) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use FailureClass.Descriptor instead. +func (FailureClass) EnumDescriptor() ([]byte, []int) { + return file_proto_control_plane_proto_rawDescGZIP(), []int{3} +} + // Service registration information type ServiceInfo struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -1088,6 +1218,383 @@ func (x *StopPipelineRunRequest) GetRunId() string { return "" } +// Historical chunk run record. +type ChunkRun struct { + state protoimpl.MessageState `protogen:"open.v1"` + ChunkId string `protobuf:"bytes,1,opt,name=chunk_id,json=chunkId,proto3" json:"chunk_id,omitempty"` + PipelineRunId string `protobuf:"bytes,2,opt,name=pipeline_run_id,json=pipelineRunId,proto3" json:"pipeline_run_id,omitempty"` + ComponentId string `protobuf:"bytes,3,opt,name=component_id,json=componentId,proto3" json:"component_id,omitempty"` + ChunkStart int64 `protobuf:"varint,4,opt,name=chunk_start,json=chunkStart,proto3" json:"chunk_start,omitempty"` + ChunkEnd int64 `protobuf:"varint,5,opt,name=chunk_end,json=chunkEnd,proto3" json:"chunk_end,omitempty"` + Attempt int32 `protobuf:"varint,6,opt,name=attempt,proto3" json:"attempt,omitempty"` + Status ChunkStatus `protobuf:"varint,7,opt,name=status,proto3,enum=flowctl.ChunkStatus" json:"status,omitempty"` + FailureClass FailureClass `protobuf:"varint,8,opt,name=failure_class,json=failureClass,proto3,enum=flowctl.FailureClass" json:"failure_class,omitempty"` + Phase string `protobuf:"bytes,9,opt,name=phase,proto3" json:"phase,omitempty"` + Error string `protobuf:"bytes,10,opt,name=error,proto3" json:"error,omitempty"` + RecommendedAction string `protobuf:"bytes,11,opt,name=recommended_action,json=recommendedAction,proto3" json:"recommended_action,omitempty"` + StartedAt *timestamppb.Timestamp `protobuf:"bytes,12,opt,name=started_at,json=startedAt,proto3" json:"started_at,omitempty"` + CompletedAt *timestamppb.Timestamp `protobuf:"bytes,13,opt,name=completed_at,json=completedAt,proto3" json:"completed_at,omitempty"` + VerifiedAt *timestamppb.Timestamp `protobuf:"bytes,14,opt,name=verified_at,json=verifiedAt,proto3" json:"verified_at,omitempty"` + RowCounts map[string]int64 `protobuf:"bytes,15,rep,name=row_counts,json=rowCounts,proto3" json:"row_counts,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"` + Verification map[string]string `protobuf:"bytes,16,rep,name=verification,proto3" json:"verification,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + Metadata map[string]string `protobuf:"bytes,17,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ChunkRun) Reset() { + *x = ChunkRun{} + mi := &file_proto_control_plane_proto_msgTypes[14] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ChunkRun) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ChunkRun) ProtoMessage() {} + +func (x *ChunkRun) ProtoReflect() protoreflect.Message { + mi := &file_proto_control_plane_proto_msgTypes[14] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ChunkRun.ProtoReflect.Descriptor instead. +func (*ChunkRun) Descriptor() ([]byte, []int) { + return file_proto_control_plane_proto_rawDescGZIP(), []int{14} +} + +func (x *ChunkRun) GetChunkId() string { + if x != nil { + return x.ChunkId + } + return "" +} + +func (x *ChunkRun) GetPipelineRunId() string { + if x != nil { + return x.PipelineRunId + } + return "" +} + +func (x *ChunkRun) GetComponentId() string { + if x != nil { + return x.ComponentId + } + return "" +} + +func (x *ChunkRun) GetChunkStart() int64 { + if x != nil { + return x.ChunkStart + } + return 0 +} + +func (x *ChunkRun) GetChunkEnd() int64 { + if x != nil { + return x.ChunkEnd + } + return 0 +} + +func (x *ChunkRun) GetAttempt() int32 { + if x != nil { + return x.Attempt + } + return 0 +} + +func (x *ChunkRun) GetStatus() ChunkStatus { + if x != nil { + return x.Status + } + return ChunkStatus_CHUNK_STATUS_UNKNOWN +} + +func (x *ChunkRun) GetFailureClass() FailureClass { + if x != nil { + return x.FailureClass + } + return FailureClass_FAILURE_CLASS_UNKNOWN +} + +func (x *ChunkRun) GetPhase() string { + if x != nil { + return x.Phase + } + return "" +} + +func (x *ChunkRun) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +func (x *ChunkRun) GetRecommendedAction() string { + if x != nil { + return x.RecommendedAction + } + return "" +} + +func (x *ChunkRun) GetStartedAt() *timestamppb.Timestamp { + if x != nil { + return x.StartedAt + } + return nil +} + +func (x *ChunkRun) GetCompletedAt() *timestamppb.Timestamp { + if x != nil { + return x.CompletedAt + } + return nil +} + +func (x *ChunkRun) GetVerifiedAt() *timestamppb.Timestamp { + if x != nil { + return x.VerifiedAt + } + return nil +} + +func (x *ChunkRun) GetRowCounts() map[string]int64 { + if x != nil { + return x.RowCounts + } + return nil +} + +func (x *ChunkRun) GetVerification() map[string]string { + if x != nil { + return x.Verification + } + return nil +} + +func (x *ChunkRun) GetMetadata() map[string]string { + if x != nil { + return x.Metadata + } + return nil +} + +// Upsert chunk run request. +type UpsertChunkRunRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Chunk *ChunkRun `protobuf:"bytes,1,opt,name=chunk,proto3" json:"chunk,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *UpsertChunkRunRequest) Reset() { + *x = UpsertChunkRunRequest{} + mi := &file_proto_control_plane_proto_msgTypes[15] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *UpsertChunkRunRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UpsertChunkRunRequest) ProtoMessage() {} + +func (x *UpsertChunkRunRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_control_plane_proto_msgTypes[15] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UpsertChunkRunRequest.ProtoReflect.Descriptor instead. +func (*UpsertChunkRunRequest) Descriptor() ([]byte, []int) { + return file_proto_control_plane_proto_rawDescGZIP(), []int{15} +} + +func (x *UpsertChunkRunRequest) GetChunk() *ChunkRun { + if x != nil { + return x.Chunk + } + return nil +} + +// Get chunk run request. +type GetChunkRunRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + ChunkId string `protobuf:"bytes,1,opt,name=chunk_id,json=chunkId,proto3" json:"chunk_id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetChunkRunRequest) Reset() { + *x = GetChunkRunRequest{} + mi := &file_proto_control_plane_proto_msgTypes[16] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetChunkRunRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetChunkRunRequest) ProtoMessage() {} + +func (x *GetChunkRunRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_control_plane_proto_msgTypes[16] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetChunkRunRequest.ProtoReflect.Descriptor instead. +func (*GetChunkRunRequest) Descriptor() ([]byte, []int) { + return file_proto_control_plane_proto_rawDescGZIP(), []int{16} +} + +func (x *GetChunkRunRequest) GetChunkId() string { + if x != nil { + return x.ChunkId + } + return "" +} + +// List chunk runs request. +type ListChunkRunsRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + PipelineRunId string `protobuf:"bytes,1,opt,name=pipeline_run_id,json=pipelineRunId,proto3" json:"pipeline_run_id,omitempty"` + ComponentId string `protobuf:"bytes,2,opt,name=component_id,json=componentId,proto3" json:"component_id,omitempty"` + Status ChunkStatus `protobuf:"varint,3,opt,name=status,proto3,enum=flowctl.ChunkStatus" json:"status,omitempty"` + Limit int32 `protobuf:"varint,4,opt,name=limit,proto3" json:"limit,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ListChunkRunsRequest) Reset() { + *x = ListChunkRunsRequest{} + mi := &file_proto_control_plane_proto_msgTypes[17] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ListChunkRunsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListChunkRunsRequest) ProtoMessage() {} + +func (x *ListChunkRunsRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_control_plane_proto_msgTypes[17] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListChunkRunsRequest.ProtoReflect.Descriptor instead. +func (*ListChunkRunsRequest) Descriptor() ([]byte, []int) { + return file_proto_control_plane_proto_rawDescGZIP(), []int{17} +} + +func (x *ListChunkRunsRequest) GetPipelineRunId() string { + if x != nil { + return x.PipelineRunId + } + return "" +} + +func (x *ListChunkRunsRequest) GetComponentId() string { + if x != nil { + return x.ComponentId + } + return "" +} + +func (x *ListChunkRunsRequest) GetStatus() ChunkStatus { + if x != nil { + return x.Status + } + return ChunkStatus_CHUNK_STATUS_UNKNOWN +} + +func (x *ListChunkRunsRequest) GetLimit() int32 { + if x != nil { + return x.Limit + } + return 0 +} + +// List chunk runs response. +type ListChunkRunsResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Chunks []*ChunkRun `protobuf:"bytes,1,rep,name=chunks,proto3" json:"chunks,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ListChunkRunsResponse) Reset() { + *x = ListChunkRunsResponse{} + mi := &file_proto_control_plane_proto_msgTypes[18] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ListChunkRunsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListChunkRunsResponse) ProtoMessage() {} + +func (x *ListChunkRunsResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_control_plane_proto_msgTypes[18] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListChunkRunsResponse.ProtoReflect.Descriptor instead. +func (*ListChunkRunsResponse) Descriptor() ([]byte, []int) { + return file_proto_control_plane_proto_rawDescGZIP(), []int{18} +} + +func (x *ListChunkRunsResponse) GetChunks() []*ChunkRun { + if x != nil { + return x.Chunks + } + return nil +} + var File_proto_control_plane_proto protoreflect.FileDescriptor const file_proto_control_plane_proto_rawDesc = "" + @@ -1188,7 +1695,50 @@ const file_proto_control_plane_proto_rawDesc = "" + "\x18ListPipelineRunsResponse\x12(\n" + "\x04runs\x18\x01 \x03(\v2\x14.flowctl.PipelineRunR\x04runs\"/\n" + "\x16StopPipelineRunRequest\x12\x15\n" + - "\x06run_id\x18\x01 \x01(\tR\x05runId*\x92\x01\n" + + "\x06run_id\x18\x01 \x01(\tR\x05runId\"\xc7\a\n" + + "\bChunkRun\x12\x19\n" + + "\bchunk_id\x18\x01 \x01(\tR\achunkId\x12&\n" + + "\x0fpipeline_run_id\x18\x02 \x01(\tR\rpipelineRunId\x12!\n" + + "\fcomponent_id\x18\x03 \x01(\tR\vcomponentId\x12\x1f\n" + + "\vchunk_start\x18\x04 \x01(\x03R\n" + + "chunkStart\x12\x1b\n" + + "\tchunk_end\x18\x05 \x01(\x03R\bchunkEnd\x12\x18\n" + + "\aattempt\x18\x06 \x01(\x05R\aattempt\x12,\n" + + "\x06status\x18\a \x01(\x0e2\x14.flowctl.ChunkStatusR\x06status\x12:\n" + + "\rfailure_class\x18\b \x01(\x0e2\x15.flowctl.FailureClassR\ffailureClass\x12\x14\n" + + "\x05phase\x18\t \x01(\tR\x05phase\x12\x14\n" + + "\x05error\x18\n" + + " \x01(\tR\x05error\x12-\n" + + "\x12recommended_action\x18\v \x01(\tR\x11recommendedAction\x129\n" + + "\n" + + "started_at\x18\f \x01(\v2\x1a.google.protobuf.TimestampR\tstartedAt\x12=\n" + + "\fcompleted_at\x18\r \x01(\v2\x1a.google.protobuf.TimestampR\vcompletedAt\x12;\n" + + "\vverified_at\x18\x0e \x01(\v2\x1a.google.protobuf.TimestampR\n" + + "verifiedAt\x12?\n" + + "\n" + + "row_counts\x18\x0f \x03(\v2 .flowctl.ChunkRun.RowCountsEntryR\trowCounts\x12G\n" + + "\fverification\x18\x10 \x03(\v2#.flowctl.ChunkRun.VerificationEntryR\fverification\x12;\n" + + "\bmetadata\x18\x11 \x03(\v2\x1f.flowctl.ChunkRun.MetadataEntryR\bmetadata\x1a<\n" + + "\x0eRowCountsEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\x03R\x05value:\x028\x01\x1a?\n" + + "\x11VerificationEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\x1a;\n" + + "\rMetadataEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"@\n" + + "\x15UpsertChunkRunRequest\x12'\n" + + "\x05chunk\x18\x01 \x01(\v2\x11.flowctl.ChunkRunR\x05chunk\"/\n" + + "\x12GetChunkRunRequest\x12\x19\n" + + "\bchunk_id\x18\x01 \x01(\tR\achunkId\"\xa5\x01\n" + + "\x14ListChunkRunsRequest\x12&\n" + + "\x0fpipeline_run_id\x18\x01 \x01(\tR\rpipelineRunId\x12!\n" + + "\fcomponent_id\x18\x02 \x01(\tR\vcomponentId\x12,\n" + + "\x06status\x18\x03 \x01(\x0e2\x14.flowctl.ChunkStatusR\x06status\x12\x14\n" + + "\x05limit\x18\x04 \x01(\x05R\x05limit\"B\n" + + "\x15ListChunkRunsResponse\x12)\n" + + "\x06chunks\x18\x01 \x03(\v2\x11.flowctl.ChunkRunR\x06chunks*\x92\x01\n" + "\vServiceType\x12\x1c\n" + "\x18SERVICE_TYPE_UNSPECIFIED\x10\x00\x12\x17\n" + "\x13SERVICE_TYPE_SOURCE\x10\x01\x12\x1a\n" + @@ -1201,7 +1751,25 @@ const file_proto_control_plane_proto_rawDesc = "" + "\x12RUN_STATUS_RUNNING\x10\x02\x12\x18\n" + "\x14RUN_STATUS_COMPLETED\x10\x03\x12\x15\n" + "\x11RUN_STATUS_FAILED\x10\x04\x12\x16\n" + - "\x12RUN_STATUS_STOPPED\x10\x052\x91\x05\n" + + "\x12RUN_STATUS_STOPPED\x10\x05*\x82\x02\n" + + "\vChunkStatus\x12\x18\n" + + "\x14CHUNK_STATUS_UNKNOWN\x10\x00\x12\x18\n" + + "\x14CHUNK_STATUS_PLANNED\x10\x01\x12\x1b\n" + + "\x17CHUNK_STATUS_DISPATCHED\x10\x02\x12\x18\n" + + "\x14CHUNK_STATUS_RUNNING\x10\x03\x12\x1a\n" + + "\x16CHUNK_STATUS_COMPLETED\x10\x04\x12\x19\n" + + "\x15CHUNK_STATUS_VERIFIED\x10\x05\x12\x17\n" + + "\x13CHUNK_STATUS_FAILED\x10\x06\x12\x1e\n" + + "\x1aCHUNK_STATUS_RETRY_PENDING\x10\a\x12\x18\n" + + "\x14CHUNK_STATUS_ABORTED\x10\b*\xfd\x01\n" + + "\fFailureClass\x12\x19\n" + + "\x15FAILURE_CLASS_UNKNOWN\x10\x00\x12*\n" + + "&FAILURE_CLASS_RETRYABLE_INFRASTRUCTURE\x10\x01\x12$\n" + + " FAILURE_CLASS_NON_RETRYABLE_DATA\x10\x02\x12&\n" + + "\"FAILURE_CLASS_NON_RETRYABLE_SCHEMA\x10\x03\x12\x1e\n" + + "\x1aFAILURE_CLASS_VERIFICATION\x10\x04\x12!\n" + + "\x1dFAILURE_CLASS_ALLOCATION_LOST\x10\x05\x12\x15\n" + + "\x11FAILURE_CLASS_OOM\x10\x062\xe5\x06\n" + "\fControlPlane\x12:\n" + "\bRegister\x12\x14.flowctl.ServiceInfo\x1a\x18.flowctl.RegistrationAck\x12>\n" + "\tHeartbeat\x12\x19.flowctl.ServiceHeartbeat\x1a\x16.google.protobuf.Empty\x12@\n" + @@ -1211,7 +1779,10 @@ const file_proto_control_plane_proto_rawDesc = "" + "\x11UpdatePipelineRun\x12!.flowctl.UpdatePipelineRunRequest\x1a\x14.flowctl.PipelineRun\x12F\n" + "\x0eGetPipelineRun\x12\x1e.flowctl.GetPipelineRunRequest\x1a\x14.flowctl.PipelineRun\x12W\n" + "\x10ListPipelineRuns\x12 .flowctl.ListPipelineRunsRequest\x1a!.flowctl.ListPipelineRunsResponse\x12H\n" + - "\x0fStopPipelineRun\x12\x1f.flowctl.StopPipelineRunRequest\x1a\x14.flowctl.PipelineRunB/Z-github.com/withobsrvr/flowctl/proto;flowctlpbb\x06proto3" + "\x0fStopPipelineRun\x12\x1f.flowctl.StopPipelineRunRequest\x1a\x14.flowctl.PipelineRun\x12C\n" + + "\x0eUpsertChunkRun\x12\x1e.flowctl.UpsertChunkRunRequest\x1a\x11.flowctl.ChunkRun\x12=\n" + + "\vGetChunkRun\x12\x1b.flowctl.GetChunkRunRequest\x1a\x11.flowctl.ChunkRun\x12N\n" + + "\rListChunkRuns\x12\x1d.flowctl.ListChunkRunsRequest\x1a\x1e.flowctl.ListChunkRunsResponseB/Z-github.com/withobsrvr/flowctl/proto;flowctlpbb\x06proto3" var ( file_proto_control_plane_proto_rawDescOnce sync.Once @@ -1225,76 +1796,103 @@ func file_proto_control_plane_proto_rawDescGZIP() []byte { return file_proto_control_plane_proto_rawDescData } -var file_proto_control_plane_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_proto_control_plane_proto_msgTypes = make([]protoimpl.MessageInfo, 19) +var file_proto_control_plane_proto_enumTypes = make([]protoimpl.EnumInfo, 4) +var file_proto_control_plane_proto_msgTypes = make([]protoimpl.MessageInfo, 27) var file_proto_control_plane_proto_goTypes = []any{ (ServiceType)(0), // 0: flowctl.ServiceType (RunStatus)(0), // 1: flowctl.RunStatus - (*ServiceInfo)(nil), // 2: flowctl.ServiceInfo - (*RegistrationAck)(nil), // 3: flowctl.RegistrationAck - (*ServiceHeartbeat)(nil), // 4: flowctl.ServiceHeartbeat - (*ServiceStatus)(nil), // 5: flowctl.ServiceStatus - (*ServiceList)(nil), // 6: flowctl.ServiceList - (*RunMetrics)(nil), // 7: flowctl.RunMetrics - (*ComponentMetrics)(nil), // 8: flowctl.ComponentMetrics - (*PipelineRun)(nil), // 9: flowctl.PipelineRun - (*CreatePipelineRunRequest)(nil), // 10: flowctl.CreatePipelineRunRequest - (*UpdatePipelineRunRequest)(nil), // 11: flowctl.UpdatePipelineRunRequest - (*GetPipelineRunRequest)(nil), // 12: flowctl.GetPipelineRunRequest - (*ListPipelineRunsRequest)(nil), // 13: flowctl.ListPipelineRunsRequest - (*ListPipelineRunsResponse)(nil), // 14: flowctl.ListPipelineRunsResponse - (*StopPipelineRunRequest)(nil), // 15: flowctl.StopPipelineRunRequest - nil, // 16: flowctl.ServiceInfo.MetadataEntry - nil, // 17: flowctl.RegistrationAck.ConnectionInfoEntry - nil, // 18: flowctl.ServiceHeartbeat.MetricsEntry - nil, // 19: flowctl.ServiceStatus.MetricsEntry - nil, // 20: flowctl.RunMetrics.ComponentMetricsEntry - (*timestamppb.Timestamp)(nil), // 21: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 22: google.protobuf.Empty + (ChunkStatus)(0), // 2: flowctl.ChunkStatus + (FailureClass)(0), // 3: flowctl.FailureClass + (*ServiceInfo)(nil), // 4: flowctl.ServiceInfo + (*RegistrationAck)(nil), // 5: flowctl.RegistrationAck + (*ServiceHeartbeat)(nil), // 6: flowctl.ServiceHeartbeat + (*ServiceStatus)(nil), // 7: flowctl.ServiceStatus + (*ServiceList)(nil), // 8: flowctl.ServiceList + (*RunMetrics)(nil), // 9: flowctl.RunMetrics + (*ComponentMetrics)(nil), // 10: flowctl.ComponentMetrics + (*PipelineRun)(nil), // 11: flowctl.PipelineRun + (*CreatePipelineRunRequest)(nil), // 12: flowctl.CreatePipelineRunRequest + (*UpdatePipelineRunRequest)(nil), // 13: flowctl.UpdatePipelineRunRequest + (*GetPipelineRunRequest)(nil), // 14: flowctl.GetPipelineRunRequest + (*ListPipelineRunsRequest)(nil), // 15: flowctl.ListPipelineRunsRequest + (*ListPipelineRunsResponse)(nil), // 16: flowctl.ListPipelineRunsResponse + (*StopPipelineRunRequest)(nil), // 17: flowctl.StopPipelineRunRequest + (*ChunkRun)(nil), // 18: flowctl.ChunkRun + (*UpsertChunkRunRequest)(nil), // 19: flowctl.UpsertChunkRunRequest + (*GetChunkRunRequest)(nil), // 20: flowctl.GetChunkRunRequest + (*ListChunkRunsRequest)(nil), // 21: flowctl.ListChunkRunsRequest + (*ListChunkRunsResponse)(nil), // 22: flowctl.ListChunkRunsResponse + nil, // 23: flowctl.ServiceInfo.MetadataEntry + nil, // 24: flowctl.RegistrationAck.ConnectionInfoEntry + nil, // 25: flowctl.ServiceHeartbeat.MetricsEntry + nil, // 26: flowctl.ServiceStatus.MetricsEntry + nil, // 27: flowctl.RunMetrics.ComponentMetricsEntry + nil, // 28: flowctl.ChunkRun.RowCountsEntry + nil, // 29: flowctl.ChunkRun.VerificationEntry + nil, // 30: flowctl.ChunkRun.MetadataEntry + (*timestamppb.Timestamp)(nil), // 31: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 32: google.protobuf.Empty } var file_proto_control_plane_proto_depIdxs = []int32{ 0, // 0: flowctl.ServiceInfo.service_type:type_name -> flowctl.ServiceType - 16, // 1: flowctl.ServiceInfo.metadata:type_name -> flowctl.ServiceInfo.MetadataEntry - 17, // 2: flowctl.RegistrationAck.connection_info:type_name -> flowctl.RegistrationAck.ConnectionInfoEntry - 21, // 3: flowctl.ServiceHeartbeat.timestamp:type_name -> google.protobuf.Timestamp - 18, // 4: flowctl.ServiceHeartbeat.metrics:type_name -> flowctl.ServiceHeartbeat.MetricsEntry + 23, // 1: flowctl.ServiceInfo.metadata:type_name -> flowctl.ServiceInfo.MetadataEntry + 24, // 2: flowctl.RegistrationAck.connection_info:type_name -> flowctl.RegistrationAck.ConnectionInfoEntry + 31, // 3: flowctl.ServiceHeartbeat.timestamp:type_name -> google.protobuf.Timestamp + 25, // 4: flowctl.ServiceHeartbeat.metrics:type_name -> flowctl.ServiceHeartbeat.MetricsEntry 0, // 5: flowctl.ServiceStatus.service_type:type_name -> flowctl.ServiceType - 21, // 6: flowctl.ServiceStatus.last_heartbeat:type_name -> google.protobuf.Timestamp - 19, // 7: flowctl.ServiceStatus.metrics:type_name -> flowctl.ServiceStatus.MetricsEntry - 5, // 8: flowctl.ServiceList.services:type_name -> flowctl.ServiceStatus - 20, // 9: flowctl.RunMetrics.component_metrics:type_name -> flowctl.RunMetrics.ComponentMetricsEntry + 31, // 6: flowctl.ServiceStatus.last_heartbeat:type_name -> google.protobuf.Timestamp + 26, // 7: flowctl.ServiceStatus.metrics:type_name -> flowctl.ServiceStatus.MetricsEntry + 7, // 8: flowctl.ServiceList.services:type_name -> flowctl.ServiceStatus + 27, // 9: flowctl.RunMetrics.component_metrics:type_name -> flowctl.RunMetrics.ComponentMetricsEntry 1, // 10: flowctl.PipelineRun.status:type_name -> flowctl.RunStatus - 21, // 11: flowctl.PipelineRun.start_time:type_name -> google.protobuf.Timestamp - 21, // 12: flowctl.PipelineRun.end_time:type_name -> google.protobuf.Timestamp - 7, // 13: flowctl.PipelineRun.metrics:type_name -> flowctl.RunMetrics + 31, // 11: flowctl.PipelineRun.start_time:type_name -> google.protobuf.Timestamp + 31, // 12: flowctl.PipelineRun.end_time:type_name -> google.protobuf.Timestamp + 9, // 13: flowctl.PipelineRun.metrics:type_name -> flowctl.RunMetrics 1, // 14: flowctl.UpdatePipelineRunRequest.status:type_name -> flowctl.RunStatus - 7, // 15: flowctl.UpdatePipelineRunRequest.metrics:type_name -> flowctl.RunMetrics + 9, // 15: flowctl.UpdatePipelineRunRequest.metrics:type_name -> flowctl.RunMetrics 1, // 16: flowctl.ListPipelineRunsRequest.status:type_name -> flowctl.RunStatus - 9, // 17: flowctl.ListPipelineRunsResponse.runs:type_name -> flowctl.PipelineRun - 8, // 18: flowctl.RunMetrics.ComponentMetricsEntry.value:type_name -> flowctl.ComponentMetrics - 2, // 19: flowctl.ControlPlane.Register:input_type -> flowctl.ServiceInfo - 4, // 20: flowctl.ControlPlane.Heartbeat:input_type -> flowctl.ServiceHeartbeat - 2, // 21: flowctl.ControlPlane.GetServiceStatus:input_type -> flowctl.ServiceInfo - 22, // 22: flowctl.ControlPlane.ListServices:input_type -> google.protobuf.Empty - 10, // 23: flowctl.ControlPlane.CreatePipelineRun:input_type -> flowctl.CreatePipelineRunRequest - 11, // 24: flowctl.ControlPlane.UpdatePipelineRun:input_type -> flowctl.UpdatePipelineRunRequest - 12, // 25: flowctl.ControlPlane.GetPipelineRun:input_type -> flowctl.GetPipelineRunRequest - 13, // 26: flowctl.ControlPlane.ListPipelineRuns:input_type -> flowctl.ListPipelineRunsRequest - 15, // 27: flowctl.ControlPlane.StopPipelineRun:input_type -> flowctl.StopPipelineRunRequest - 3, // 28: flowctl.ControlPlane.Register:output_type -> flowctl.RegistrationAck - 22, // 29: flowctl.ControlPlane.Heartbeat:output_type -> google.protobuf.Empty - 5, // 30: flowctl.ControlPlane.GetServiceStatus:output_type -> flowctl.ServiceStatus - 6, // 31: flowctl.ControlPlane.ListServices:output_type -> flowctl.ServiceList - 9, // 32: flowctl.ControlPlane.CreatePipelineRun:output_type -> flowctl.PipelineRun - 9, // 33: flowctl.ControlPlane.UpdatePipelineRun:output_type -> flowctl.PipelineRun - 9, // 34: flowctl.ControlPlane.GetPipelineRun:output_type -> flowctl.PipelineRun - 14, // 35: flowctl.ControlPlane.ListPipelineRuns:output_type -> flowctl.ListPipelineRunsResponse - 9, // 36: flowctl.ControlPlane.StopPipelineRun:output_type -> flowctl.PipelineRun - 28, // [28:37] is the sub-list for method output_type - 19, // [19:28] is the sub-list for method input_type - 19, // [19:19] is the sub-list for extension type_name - 19, // [19:19] is the sub-list for extension extendee - 0, // [0:19] is the sub-list for field type_name + 11, // 17: flowctl.ListPipelineRunsResponse.runs:type_name -> flowctl.PipelineRun + 2, // 18: flowctl.ChunkRun.status:type_name -> flowctl.ChunkStatus + 3, // 19: flowctl.ChunkRun.failure_class:type_name -> flowctl.FailureClass + 31, // 20: flowctl.ChunkRun.started_at:type_name -> google.protobuf.Timestamp + 31, // 21: flowctl.ChunkRun.completed_at:type_name -> google.protobuf.Timestamp + 31, // 22: flowctl.ChunkRun.verified_at:type_name -> google.protobuf.Timestamp + 28, // 23: flowctl.ChunkRun.row_counts:type_name -> flowctl.ChunkRun.RowCountsEntry + 29, // 24: flowctl.ChunkRun.verification:type_name -> flowctl.ChunkRun.VerificationEntry + 30, // 25: flowctl.ChunkRun.metadata:type_name -> flowctl.ChunkRun.MetadataEntry + 18, // 26: flowctl.UpsertChunkRunRequest.chunk:type_name -> flowctl.ChunkRun + 2, // 27: flowctl.ListChunkRunsRequest.status:type_name -> flowctl.ChunkStatus + 18, // 28: flowctl.ListChunkRunsResponse.chunks:type_name -> flowctl.ChunkRun + 10, // 29: flowctl.RunMetrics.ComponentMetricsEntry.value:type_name -> flowctl.ComponentMetrics + 4, // 30: flowctl.ControlPlane.Register:input_type -> flowctl.ServiceInfo + 6, // 31: flowctl.ControlPlane.Heartbeat:input_type -> flowctl.ServiceHeartbeat + 4, // 32: flowctl.ControlPlane.GetServiceStatus:input_type -> flowctl.ServiceInfo + 32, // 33: flowctl.ControlPlane.ListServices:input_type -> google.protobuf.Empty + 12, // 34: flowctl.ControlPlane.CreatePipelineRun:input_type -> flowctl.CreatePipelineRunRequest + 13, // 35: flowctl.ControlPlane.UpdatePipelineRun:input_type -> flowctl.UpdatePipelineRunRequest + 14, // 36: flowctl.ControlPlane.GetPipelineRun:input_type -> flowctl.GetPipelineRunRequest + 15, // 37: flowctl.ControlPlane.ListPipelineRuns:input_type -> flowctl.ListPipelineRunsRequest + 17, // 38: flowctl.ControlPlane.StopPipelineRun:input_type -> flowctl.StopPipelineRunRequest + 19, // 39: flowctl.ControlPlane.UpsertChunkRun:input_type -> flowctl.UpsertChunkRunRequest + 20, // 40: flowctl.ControlPlane.GetChunkRun:input_type -> flowctl.GetChunkRunRequest + 21, // 41: flowctl.ControlPlane.ListChunkRuns:input_type -> flowctl.ListChunkRunsRequest + 5, // 42: flowctl.ControlPlane.Register:output_type -> flowctl.RegistrationAck + 32, // 43: flowctl.ControlPlane.Heartbeat:output_type -> google.protobuf.Empty + 7, // 44: flowctl.ControlPlane.GetServiceStatus:output_type -> flowctl.ServiceStatus + 8, // 45: flowctl.ControlPlane.ListServices:output_type -> flowctl.ServiceList + 11, // 46: flowctl.ControlPlane.CreatePipelineRun:output_type -> flowctl.PipelineRun + 11, // 47: flowctl.ControlPlane.UpdatePipelineRun:output_type -> flowctl.PipelineRun + 11, // 48: flowctl.ControlPlane.GetPipelineRun:output_type -> flowctl.PipelineRun + 16, // 49: flowctl.ControlPlane.ListPipelineRuns:output_type -> flowctl.ListPipelineRunsResponse + 11, // 50: flowctl.ControlPlane.StopPipelineRun:output_type -> flowctl.PipelineRun + 18, // 51: flowctl.ControlPlane.UpsertChunkRun:output_type -> flowctl.ChunkRun + 18, // 52: flowctl.ControlPlane.GetChunkRun:output_type -> flowctl.ChunkRun + 22, // 53: flowctl.ControlPlane.ListChunkRuns:output_type -> flowctl.ListChunkRunsResponse + 42, // [42:54] is the sub-list for method output_type + 30, // [30:42] is the sub-list for method input_type + 30, // [30:30] is the sub-list for extension type_name + 30, // [30:30] is the sub-list for extension extendee + 0, // [0:30] is the sub-list for field type_name } func init() { file_proto_control_plane_proto_init() } @@ -1307,8 +1905,8 @@ func file_proto_control_plane_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_control_plane_proto_rawDesc), len(file_proto_control_plane_proto_rawDesc)), - NumEnums: 2, - NumMessages: 19, + NumEnums: 4, + NumMessages: 27, NumExtensions: 0, NumServices: 1, }, diff --git a/proto/control_plane.proto b/proto/control_plane.proto index 9a21bc4..5a56f4e 100644 --- a/proto/control_plane.proto +++ b/proto/control_plane.proto @@ -66,6 +66,30 @@ enum RunStatus { RUN_STATUS_STOPPED = 5; } +// Historical chunk lifecycle status. +enum ChunkStatus { + CHUNK_STATUS_UNKNOWN = 0; + CHUNK_STATUS_PLANNED = 1; + CHUNK_STATUS_DISPATCHED = 2; + CHUNK_STATUS_RUNNING = 3; + CHUNK_STATUS_COMPLETED = 4; + CHUNK_STATUS_VERIFIED = 5; + CHUNK_STATUS_FAILED = 6; + CHUNK_STATUS_RETRY_PENDING = 7; + CHUNK_STATUS_ABORTED = 8; +} + +// Typed failure class reported by data-plane components. +enum FailureClass { + FAILURE_CLASS_UNKNOWN = 0; + FAILURE_CLASS_RETRYABLE_INFRASTRUCTURE = 1; + FAILURE_CLASS_NON_RETRYABLE_DATA = 2; + FAILURE_CLASS_NON_RETRYABLE_SCHEMA = 3; + FAILURE_CLASS_VERIFICATION = 4; + FAILURE_CLASS_ALLOCATION_LOST = 5; + FAILURE_CLASS_OOM = 6; +} + // Metrics for a pipeline run message RunMetrics { int64 events_processed = 1; @@ -135,6 +159,50 @@ message StopPipelineRunRequest { string run_id = 1; } +// Historical chunk run record. +message ChunkRun { + string chunk_id = 1; + string pipeline_run_id = 2; + string component_id = 3; + int64 chunk_start = 4; + int64 chunk_end = 5; + int32 attempt = 6; + ChunkStatus status = 7; + FailureClass failure_class = 8; + string phase = 9; + string error = 10; + string recommended_action = 11; + google.protobuf.Timestamp started_at = 12; + google.protobuf.Timestamp completed_at = 13; + google.protobuf.Timestamp verified_at = 14; + map row_counts = 15; + map verification = 16; + map metadata = 17; +} + +// Upsert chunk run request. +message UpsertChunkRunRequest { + ChunkRun chunk = 1; +} + +// Get chunk run request. +message GetChunkRunRequest { + string chunk_id = 1; +} + +// List chunk runs request. +message ListChunkRunsRequest { + string pipeline_run_id = 1; + string component_id = 2; + ChunkStatus status = 3; + int32 limit = 4; +} + +// List chunk runs response. +message ListChunkRunsResponse { + repeated ChunkRun chunks = 1; +} + // Control plane service definition service ControlPlane { // Register a new service with the control plane @@ -155,4 +223,9 @@ service ControlPlane { rpc GetPipelineRun(GetPipelineRunRequest) returns (PipelineRun); rpc ListPipelineRuns(ListPipelineRunsRequest) returns (ListPipelineRunsResponse); rpc StopPipelineRun(StopPipelineRunRequest) returns (PipelineRun); + + // Historical range chunk management + rpc UpsertChunkRun(UpsertChunkRunRequest) returns (ChunkRun); + rpc GetChunkRun(GetChunkRunRequest) returns (ChunkRun); + rpc ListChunkRuns(ListChunkRunsRequest) returns (ListChunkRunsResponse); } \ No newline at end of file diff --git a/proto/control_plane_grpc.pb.go b/proto/control_plane_grpc.pb.go index c8d5740..2abcb2c 100644 --- a/proto/control_plane_grpc.pb.go +++ b/proto/control_plane_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.6.0 -// - protoc v7.34.0 +// - protoc v6.32.1 // source: proto/control_plane.proto package flowctlpb @@ -29,6 +29,9 @@ const ( ControlPlane_GetPipelineRun_FullMethodName = "/flowctl.ControlPlane/GetPipelineRun" ControlPlane_ListPipelineRuns_FullMethodName = "/flowctl.ControlPlane/ListPipelineRuns" ControlPlane_StopPipelineRun_FullMethodName = "/flowctl.ControlPlane/StopPipelineRun" + ControlPlane_UpsertChunkRun_FullMethodName = "/flowctl.ControlPlane/UpsertChunkRun" + ControlPlane_GetChunkRun_FullMethodName = "/flowctl.ControlPlane/GetChunkRun" + ControlPlane_ListChunkRuns_FullMethodName = "/flowctl.ControlPlane/ListChunkRuns" ) // ControlPlaneClient is the client API for ControlPlane service. @@ -51,6 +54,10 @@ type ControlPlaneClient interface { GetPipelineRun(ctx context.Context, in *GetPipelineRunRequest, opts ...grpc.CallOption) (*PipelineRun, error) ListPipelineRuns(ctx context.Context, in *ListPipelineRunsRequest, opts ...grpc.CallOption) (*ListPipelineRunsResponse, error) StopPipelineRun(ctx context.Context, in *StopPipelineRunRequest, opts ...grpc.CallOption) (*PipelineRun, error) + // Historical range chunk management + UpsertChunkRun(ctx context.Context, in *UpsertChunkRunRequest, opts ...grpc.CallOption) (*ChunkRun, error) + GetChunkRun(ctx context.Context, in *GetChunkRunRequest, opts ...grpc.CallOption) (*ChunkRun, error) + ListChunkRuns(ctx context.Context, in *ListChunkRunsRequest, opts ...grpc.CallOption) (*ListChunkRunsResponse, error) } type controlPlaneClient struct { @@ -151,6 +158,36 @@ func (c *controlPlaneClient) StopPipelineRun(ctx context.Context, in *StopPipeli return out, nil } +func (c *controlPlaneClient) UpsertChunkRun(ctx context.Context, in *UpsertChunkRunRequest, opts ...grpc.CallOption) (*ChunkRun, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(ChunkRun) + err := c.cc.Invoke(ctx, ControlPlane_UpsertChunkRun_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *controlPlaneClient) GetChunkRun(ctx context.Context, in *GetChunkRunRequest, opts ...grpc.CallOption) (*ChunkRun, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(ChunkRun) + err := c.cc.Invoke(ctx, ControlPlane_GetChunkRun_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *controlPlaneClient) ListChunkRuns(ctx context.Context, in *ListChunkRunsRequest, opts ...grpc.CallOption) (*ListChunkRunsResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(ListChunkRunsResponse) + err := c.cc.Invoke(ctx, ControlPlane_ListChunkRuns_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // ControlPlaneServer is the server API for ControlPlane service. // All implementations must embed UnimplementedControlPlaneServer // for forward compatibility. @@ -171,6 +208,10 @@ type ControlPlaneServer interface { GetPipelineRun(context.Context, *GetPipelineRunRequest) (*PipelineRun, error) ListPipelineRuns(context.Context, *ListPipelineRunsRequest) (*ListPipelineRunsResponse, error) StopPipelineRun(context.Context, *StopPipelineRunRequest) (*PipelineRun, error) + // Historical range chunk management + UpsertChunkRun(context.Context, *UpsertChunkRunRequest) (*ChunkRun, error) + GetChunkRun(context.Context, *GetChunkRunRequest) (*ChunkRun, error) + ListChunkRuns(context.Context, *ListChunkRunsRequest) (*ListChunkRunsResponse, error) mustEmbedUnimplementedControlPlaneServer() } @@ -208,6 +249,15 @@ func (UnimplementedControlPlaneServer) ListPipelineRuns(context.Context, *ListPi func (UnimplementedControlPlaneServer) StopPipelineRun(context.Context, *StopPipelineRunRequest) (*PipelineRun, error) { return nil, status.Error(codes.Unimplemented, "method StopPipelineRun not implemented") } +func (UnimplementedControlPlaneServer) UpsertChunkRun(context.Context, *UpsertChunkRunRequest) (*ChunkRun, error) { + return nil, status.Error(codes.Unimplemented, "method UpsertChunkRun not implemented") +} +func (UnimplementedControlPlaneServer) GetChunkRun(context.Context, *GetChunkRunRequest) (*ChunkRun, error) { + return nil, status.Error(codes.Unimplemented, "method GetChunkRun not implemented") +} +func (UnimplementedControlPlaneServer) ListChunkRuns(context.Context, *ListChunkRunsRequest) (*ListChunkRunsResponse, error) { + return nil, status.Error(codes.Unimplemented, "method ListChunkRuns not implemented") +} func (UnimplementedControlPlaneServer) mustEmbedUnimplementedControlPlaneServer() {} func (UnimplementedControlPlaneServer) testEmbeddedByValue() {} @@ -391,6 +441,60 @@ func _ControlPlane_StopPipelineRun_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _ControlPlane_UpsertChunkRun_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(UpsertChunkRunRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ControlPlaneServer).UpsertChunkRun(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: ControlPlane_UpsertChunkRun_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ControlPlaneServer).UpsertChunkRun(ctx, req.(*UpsertChunkRunRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _ControlPlane_GetChunkRun_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetChunkRunRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ControlPlaneServer).GetChunkRun(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: ControlPlane_GetChunkRun_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ControlPlaneServer).GetChunkRun(ctx, req.(*GetChunkRunRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _ControlPlane_ListChunkRuns_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ListChunkRunsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ControlPlaneServer).ListChunkRuns(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: ControlPlane_ListChunkRuns_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ControlPlaneServer).ListChunkRuns(ctx, req.(*ListChunkRunsRequest)) + } + return interceptor(ctx, in, info, handler) +} + // ControlPlane_ServiceDesc is the grpc.ServiceDesc for ControlPlane service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -434,6 +538,18 @@ var ControlPlane_ServiceDesc = grpc.ServiceDesc{ MethodName: "StopPipelineRun", Handler: _ControlPlane_StopPipelineRun_Handler, }, + { + MethodName: "UpsertChunkRun", + Handler: _ControlPlane_UpsertChunkRun_Handler, + }, + { + MethodName: "GetChunkRun", + Handler: _ControlPlane_GetChunkRun_Handler, + }, + { + MethodName: "ListChunkRuns", + Handler: _ControlPlane_ListChunkRuns_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "proto/control_plane.proto",