-
Notifications
You must be signed in to change notification settings - Fork 0
feature: Tracking and inspecting historical chunks #28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
tmosleyIII
merged 3 commits into
main
from
feature/tmosley/update-control-plane-features
Jun 14, 2026
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,222 @@ | ||
| package cmd | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "os" | ||
| "sort" | ||
| "strings" | ||
| "text/tabwriter" | ||
| "time" | ||
|
|
||
| "github.com/spf13/cobra" | ||
| flowctlpb "github.com/withobsrvr/flowctl/proto" | ||
| ) | ||
|
|
||
| var ( | ||
| chunksRunID string | ||
| chunksComponentID string | ||
| chunksStatus string | ||
| chunksLimit int32 | ||
| ) | ||
|
|
||
| var chunksCmd = &cobra.Command{ | ||
| Use: "chunks", | ||
| Short: "Inspect historical range chunk runs", | ||
| Long: `Inspect historical range chunk runs tracked by the control plane. | ||
|
|
||
| Chunks are the durable control-plane records for bounded ledger work units such | ||
| as Bronze backfill/repair ranges. Data-plane components report chunk status, | ||
| row counts, verification results, and typed failure classes.`, | ||
| } | ||
|
|
||
| var chunksListCmd = &cobra.Command{ | ||
| Use: "list", | ||
| Short: "List chunk runs", | ||
| RunE: func(cmd *cobra.Command, args []string) error { | ||
| client, conn, err := connectToControlPlane() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer conn.Close() | ||
|
|
||
| status, err := parseChunkStatus(chunksStatus) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||
| defer cancel() | ||
|
|
||
| runID := chunksRunID | ||
| if runID != "" { | ||
| runID, err = resolveRunID(ctx, client, runID) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| resp, err := client.ListChunkRuns(ctx, &flowctlpb.ListChunkRunsRequest{ | ||
| PipelineRunId: runID, | ||
| ComponentId: chunksComponentID, | ||
| Status: status, | ||
| Limit: chunksLimit, | ||
| }) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to list chunk runs: %w", err) | ||
| } | ||
|
|
||
| if len(resp.Chunks) == 0 { | ||
| fmt.Println("No chunk runs found") | ||
| return nil | ||
| } | ||
|
|
||
| sort.Slice(resp.Chunks, func(i, j int) bool { | ||
| if resp.Chunks[i].ChunkStart == resp.Chunks[j].ChunkStart { | ||
| return resp.Chunks[i].Attempt < resp.Chunks[j].Attempt | ||
| } | ||
| return resp.Chunks[i].ChunkStart < resp.Chunks[j].ChunkStart | ||
| }) | ||
|
|
||
| w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) | ||
| fmt.Fprintln(w, "CHUNK ID\tCOMPONENT\tRANGE\tATTEMPT\tSTATUS\tFAILURE\tPHASE\tROWS") | ||
| for _, chunk := range resp.Chunks { | ||
| fmt.Fprintf(w, "%s\t%s\t%d-%d\t%d\t%s\t%s\t%s\t%d\n", | ||
| chunk.ChunkId, | ||
| chunk.ComponentId, | ||
| chunk.ChunkStart, | ||
| chunk.ChunkEnd, | ||
| chunk.Attempt, | ||
| formatChunkStatus(chunk.Status), | ||
| formatFailureClass(chunk.FailureClass), | ||
| chunk.Phase, | ||
| totalRows(chunk.RowCounts), | ||
| ) | ||
| } | ||
| w.Flush() | ||
|
|
||
| fmt.Printf("\n%d chunk run(s) found\n", len(resp.Chunks)) | ||
| return nil | ||
| }, | ||
| } | ||
|
|
||
| var chunksShowCmd = &cobra.Command{ | ||
| Use: "show <chunk-id>", | ||
| Short: "Show detailed information about a chunk run", | ||
| Args: cobra.ExactArgs(1), | ||
| RunE: func(cmd *cobra.Command, args []string) error { | ||
| client, conn, err := connectToControlPlane() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer conn.Close() | ||
|
|
||
| ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||
| defer cancel() | ||
|
|
||
| chunk, err := client.GetChunkRun(ctx, &flowctlpb.GetChunkRunRequest{ChunkId: args[0]}) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to get chunk run: %w", err) | ||
| } | ||
|
|
||
| fmt.Printf("Chunk ID: %s\n", chunk.ChunkId) | ||
| fmt.Printf("Run ID: %s\n", chunk.PipelineRunId) | ||
| fmt.Printf("Component: %s\n", chunk.ComponentId) | ||
| fmt.Printf("Range: %d-%d\n", chunk.ChunkStart, chunk.ChunkEnd) | ||
| fmt.Printf("Attempt: %d\n", chunk.Attempt) | ||
| fmt.Printf("Status: %s\n", formatChunkStatus(chunk.Status)) | ||
| fmt.Printf("Phase: %s\n", chunk.Phase) | ||
| if chunk.FailureClass != flowctlpb.FailureClass_FAILURE_CLASS_UNKNOWN { | ||
| fmt.Printf("Failure: %s\n", formatFailureClass(chunk.FailureClass)) | ||
| } | ||
| if chunk.Error != "" { | ||
| fmt.Printf("Error: %s\n", chunk.Error) | ||
| } | ||
| if chunk.RecommendedAction != "" { | ||
| fmt.Printf("Next action: %s\n", chunk.RecommendedAction) | ||
| } | ||
| if chunk.StartedAt != nil { | ||
| fmt.Printf("Started: %s\n", chunk.StartedAt.AsTime().Format("2006-01-02 15:04:05 MST")) | ||
| } | ||
| if chunk.CompletedAt != nil { | ||
| fmt.Printf("Completed: %s\n", chunk.CompletedAt.AsTime().Format("2006-01-02 15:04:05 MST")) | ||
| } | ||
| if chunk.VerifiedAt != nil { | ||
| fmt.Printf("Verified: %s\n", chunk.VerifiedAt.AsTime().Format("2006-01-02 15:04:05 MST")) | ||
| } | ||
| if len(chunk.RowCounts) > 0 { | ||
| fmt.Println("\nRow counts:") | ||
| keys := make([]string, 0, len(chunk.RowCounts)) | ||
| for k := range chunk.RowCounts { | ||
| keys = append(keys, k) | ||
| } | ||
| sort.Strings(keys) | ||
| for _, k := range keys { | ||
| fmt.Printf(" %s: %d\n", k, chunk.RowCounts[k]) | ||
| } | ||
| } | ||
| if len(chunk.Verification) > 0 { | ||
| fmt.Println("\nVerification:") | ||
| keys := make([]string, 0, len(chunk.Verification)) | ||
| for k := range chunk.Verification { | ||
| keys = append(keys, k) | ||
| } | ||
| sort.Strings(keys) | ||
| for _, k := range keys { | ||
| fmt.Printf(" %s: %s\n", k, chunk.Verification[k]) | ||
| } | ||
| } | ||
| return nil | ||
| }, | ||
| } | ||
|
|
||
| func parseChunkStatus(value string) (flowctlpb.ChunkStatus, error) { | ||
| if value == "" { | ||
| return flowctlpb.ChunkStatus_CHUNK_STATUS_UNKNOWN, nil | ||
| } | ||
| key := "CHUNK_STATUS_" + strings.ToUpper(strings.ReplaceAll(value, "-", "_")) | ||
| status, ok := flowctlpb.ChunkStatus_value[key] | ||
| if !ok { | ||
| return flowctlpb.ChunkStatus_CHUNK_STATUS_UNKNOWN, fmt.Errorf("unknown chunk status %q", value) | ||
| } | ||
| return flowctlpb.ChunkStatus(status), nil | ||
| } | ||
|
|
||
| func formatChunkStatus(status flowctlpb.ChunkStatus) string { | ||
| return strings.ToLower(strings.TrimPrefix(status.String(), "CHUNK_STATUS_")) | ||
| } | ||
|
|
||
| func formatFailureClass(failure flowctlpb.FailureClass) string { | ||
| if failure == flowctlpb.FailureClass_FAILURE_CLASS_UNKNOWN { | ||
| return "-" | ||
| } | ||
| return strings.ToLower(strings.TrimPrefix(failure.String(), "FAILURE_CLASS_")) | ||
| } | ||
|
|
||
| func shortChunkID(chunkID string) string { | ||
| if len(chunkID) <= 18 { | ||
| return chunkID | ||
| } | ||
| return chunkID[:18] | ||
| } | ||
|
|
||
| func totalRows(rows map[string]int64) int64 { | ||
| var total int64 | ||
| for _, count := range rows { | ||
| total += count | ||
| } | ||
| return total | ||
| } | ||
|
|
||
| func init() { | ||
| rootCmd.AddCommand(chunksCmd) | ||
| chunksCmd.AddCommand(chunksListCmd) | ||
| chunksCmd.AddCommand(chunksShowCmd) | ||
|
|
||
| chunksCmd.PersistentFlags().StringVar(&pipelinesControlPlaneAddr, "control-plane-address", "127.0.0.1", "Control plane address") | ||
| chunksCmd.PersistentFlags().IntVar(&pipelinesControlPlanePort, "control-plane-port", 8080, "Control plane port") | ||
| chunksListCmd.Flags().StringVar(&chunksRunID, "run", "", "Pipeline run ID or unique prefix") | ||
| chunksListCmd.Flags().StringVar(&chunksComponentID, "component", "", "Component ID filter") | ||
| chunksListCmd.Flags().StringVar(&chunksStatus, "status", "", "Chunk status filter (planned, dispatched, running, completed, verified, failed, retry-pending, aborted)") | ||
| chunksListCmd.Flags().Int32Var(&chunksLimit, "limit", 100, "Maximum number of chunks to show") | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
| # Component flowctl reporting | ||
|
|
||
| Data-plane components can report bounded historical work-unit state to the flowctl control plane. This is the contract used by historical ledger workers such as Bronze and Silver loaders. | ||
|
|
||
| ## Environment contract | ||
|
|
||
| ```text | ||
| ENABLE_FLOWCTL=true | ||
| FLOWCTL_ENDPOINT=<host:port> | ||
| FLOWCTL_COMPONENT_ID=<stable component id> | ||
| FLOWCTL_RUN_ID=<pipeline run id> | ||
| FLOWCTL_ATTEMPT=<attempt number> | ||
| FLOWCTL_HEARTBEAT_INTERVAL_MS=10000 | ||
| ``` | ||
|
|
||
| `flowctl run` injects `FLOWCTL_ENDPOINT`, `FLOWCTL_COMPONENT_ID`, `FLOWCTL_RUN_ID`, `FLOWCTL_ATTEMPT`, and heartbeat interval values for process-managed components. | ||
|
|
||
| Historical range workers should also accept: | ||
|
|
||
| ```text | ||
| START_LEDGER=<inclusive start> | ||
| END_LEDGER=<inclusive end> | ||
| CHUNK_START=<inclusive chunk start> | ||
| CHUNK_END=<inclusive chunk end> | ||
| ``` | ||
|
|
||
| ## Go helper | ||
|
|
||
| Use `github.com/withobsrvr/flowctl/pkg/component` from a component binary: | ||
|
|
||
| ```go | ||
| cfg := component.ConfigFromEnv() | ||
| reporter, err := component.NewReporter(ctx, cfg) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer reporter.Close() | ||
|
|
||
| if err := reporter.Register(ctx, flowctlpb.ServiceType_SERVICE_TYPE_SOURCE, map[string]string{ | ||
| "pipeline": "obsrvr-mainnet-bronze-repair", | ||
| "network": "pubnet", | ||
| }); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| go reporter.StartHeartbeatLoop(ctx, func() map[string]float64 { | ||
| return map[string]float64{"ledgers_processed": float64(processed)} | ||
| }, nil) | ||
|
|
||
| _ = reporter.ReportChunkProgress(ctx, chunkStart, chunkEnd, "ducklake_push", nil, nil) | ||
|
|
||
| _ = reporter.ReportChunkCompleted(ctx, chunkStart, chunkEnd, true, map[string]int64{ | ||
| "ledgers_row_v2": 250000, | ||
| "transactions_row_v2": 22685377, | ||
| }, map[string]string{ | ||
| "gate": "bronze-silver-readiness-direct", | ||
| "passed": "true", | ||
| }) | ||
| ``` | ||
|
|
||
| On failure: | ||
|
|
||
| ```go | ||
| _ = reporter.ReportChunkFailed( | ||
| ctx, | ||
| chunkStart, | ||
| chunkEnd, | ||
| "verification", | ||
| flowctlpb.FailureClass_FAILURE_CLASS_RETRYABLE_INFRASTRUCTURE, | ||
| "catalog postgres connection timed out", | ||
| "retry_verification", | ||
| ) | ||
| ``` | ||
|
|
||
| ## Operator inspection | ||
|
|
||
| ```bash | ||
| flowctl chunks list --run <run-id> | ||
| flowctl chunks list --run <run-id> --status failed | ||
| flowctl chunks show <chunk-id> | ||
| ``` |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.