From 1b1557bad278046acde21970fdc9a768a959003d Mon Sep 17 00:00:00 2001 From: Tillman Mosley III Date: Sat, 20 Jun 2026 08:35:23 -0400 Subject: [PATCH 1/2] Chore: Historical chunks --- README.md | 32 +++ go.mod | 68 ++++--- go.sum | 155 ++++++++------- pkg/component/reporter.go | 343 +++++++++++++++++++++++++++++++++ pkg/component/reporter_test.go | 89 +++++++++ 5 files changed, 593 insertions(+), 94 deletions(-) create mode 100644 pkg/component/reporter.go create mode 100644 pkg/component/reporter_test.go diff --git a/README.md b/README.md index ef53547..901f00f 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ A comprehensive Go SDK for building [flowctl](https://github.com/withObsrvr/flow - **Three Complete SDKs**: Source, Processor, and Consumer - **Event-First API**: Work with strongly-typed `*flowctlv1.Event` objects - **Automatic Control Plane Integration**: Registration, heartbeats, and discovery +- **Historical Chunk Reporting**: Report bounded ledger work-unit progress, completion, verification, and failures - **Built-in Observability**: Health checks, metrics, and logging - **Production-Ready**: Graceful shutdown, error handling, and backpressure - **Developer-Friendly**: 70-85% less code than manual gRPC implementation @@ -143,6 +144,37 @@ if err != nil { } ``` +## Historical Chunk Reporting + +For bounded historical workers, use the component reporter to emit flowctl chunk state: + +```go +import ( + "github.com/withObsrvr/flowctl-sdk/pkg/component" + flowctlpb "github.com/withobsrvr/flowctl/proto" +) + +cfg := component.ConfigFromEnv() +reporter, err := component.NewReporter(ctx, cfg) +if err != nil { + return err +} +defer reporter.Close() + +_ = reporter.Register(ctx, flowctlpb.ServiceType_SERVICE_TYPE_SOURCE, map[string]string{ + "network": "pubnet", +}) + +_ = reporter.ReportChunkProgress(ctx, chunkStart, chunkEnd, "extract", nil, nil) +_ = reporter.ReportChunkCompleted(ctx, chunkStart, chunkEnd, true, map[string]int64{ + "ledgers": chunkEnd - chunkStart + 1, +}, map[string]string{"passed": "true"}) +``` + +The reporter reads the standard `flowctl run` component environment contract: +`ENABLE_FLOWCTL`, `FLOWCTL_ENDPOINT`, `FLOWCTL_COMPONENT_ID`, `FLOWCTL_RUN_ID`, +`FLOWCTL_ATTEMPT`, and `FLOWCTL_HEARTBEAT_INTERVAL_MS`. + ## Custom Metrics Track custom metrics easily: diff --git a/go.mod b/go.mod index 96cadaa..c8d3cca 100644 --- a/go.mod +++ b/go.mod @@ -3,25 +3,31 @@ module github.com/withObsrvr/flowctl-sdk go 1.26.1 require ( - github.com/withObsrvr/flow-proto v0.0.0-20251209215201-bd54ee3e43e9 + github.com/withObsrvr/flow-proto v0.1.3 google.golang.org/grpc v1.79.2 ) require ( github.com/lib/pq v1.10.9 github.com/stellar/go-stellar-sdk v0.5.0 + github.com/withobsrvr/flowctl v0.0.10 google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 ) require ( - cloud.google.com/go v0.114.0 // indirect - cloud.google.com/go/auth v0.5.1 // indirect - cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect + cel.dev/expr v0.25.1 // indirect + cloud.google.com/go v0.116.0 // indirect + cloud.google.com/go/auth v0.13.0 // indirect + cloud.google.com/go/auth/oauth2adapt v0.2.6 // indirect cloud.google.com/go/compute/metadata v0.9.0 // indirect - cloud.google.com/go/iam v1.1.8 // indirect - cloud.google.com/go/storage v1.42.0 // indirect - github.com/Microsoft/go-winio v0.6.1 // indirect + cloud.google.com/go/iam v1.2.2 // indirect + cloud.google.com/go/monitoring v1.21.2 // indirect + cloud.google.com/go/storage v1.49.0 // indirect + github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0 // indirect + github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.48.1 // indirect + github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.48.1 // indirect + github.com/Microsoft/go-winio v0.6.2 // indirect github.com/aws/aws-sdk-go v1.45.27 // indirect github.com/aws/aws-sdk-go-v2 v1.36.5 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11 // indirect @@ -45,57 +51,63 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5 // indirect github.com/creachadair/jrpc2 v1.2.0 // indirect github.com/creachadair/mds v0.13.4 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/djherbis/fscache v0.10.1 // indirect + github.com/envoyproxy/go-control-plane/envoy v1.36.0 // indirect + github.com/envoyproxy/protoc-gen-validate v1.3.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-errors/errors v1.5.1 // indirect + github.com/go-jose/go-jose/v4 v4.1.3 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/golang/protobuf v1.5.4 // indirect - github.com/google/s2a-go v0.1.7 // indirect + github.com/google/s2a-go v0.1.8 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect - github.com/googleapis/gax-go/v2 v2.12.4 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect + github.com/googleapis/gax-go/v2 v2.14.1 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - github.com/klauspost/compress v1.17.6 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.17.0 // indirect - github.com/prometheus/client_model v0.5.0 // indirect + github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/segmentio/go-loggly v0.5.1-0.20171222203950-eb91657e62b2 // indirect github.com/sirupsen/logrus v1.9.3 // indirect + github.com/spiffe/go-spiffe/v2 v2.6.0 // indirect github.com/stellar/go-xdr v0.0.0-20260312225820-cc2b0611aabf // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/stretchr/testify v1.11.1 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect - go.opentelemetry.io/otel v1.39.0 // indirect - go.opentelemetry.io/otel/metric v1.39.0 // indirect - go.opentelemetry.io/otel/trace v1.39.0 // indirect - golang.org/x/crypto v0.46.0 // indirect + go.opentelemetry.io/contrib/detectors/gcp v1.39.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect + go.opentelemetry.io/otel v1.42.0 // indirect + go.opentelemetry.io/otel/metric v1.42.0 // indirect + go.opentelemetry.io/otel/sdk v1.42.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.42.0 // indirect + go.opentelemetry.io/otel/trace v1.42.0 // indirect + golang.org/x/crypto v0.48.0 // indirect golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect - golang.org/x/mod v0.30.0 // indirect - golang.org/x/net v0.48.0 // indirect + golang.org/x/net v0.50.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect golang.org/x/sync v0.19.0 // indirect - golang.org/x/sys v0.39.0 // indirect - golang.org/x/text v0.32.0 // indirect - golang.org/x/time v0.5.0 // indirect - golang.org/x/tools v0.39.0 // indirect - google.golang.org/api v0.183.0 // indirect - google.golang.org/genproto v0.0.0-20240528184218-531527333157 // indirect + golang.org/x/sys v0.41.0 // indirect + golang.org/x/text v0.34.0 // indirect + golang.org/x/time v0.8.0 // indirect + google.golang.org/api v0.215.0 // indirect + google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 // indirect gopkg.in/djherbis/atime.v1 v1.0.0 // indirect gopkg.in/djherbis/stream.v1 v1.3.1 // indirect ) diff --git a/go.sum b/go.sum index 4876474..8b02609 100644 --- a/go.sum +++ b/go.sum @@ -1,23 +1,39 @@ +cel.dev/expr v0.25.1 h1:1KrZg61W6TWSxuNZ37Xy49ps13NUovb66QLprthtwi4= +cel.dev/expr v0.25.1/go.mod h1:hrXvqGP6G6gyx8UAHSHJ5RGk//1Oj5nXQ2NI02Nrsg4= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -cloud.google.com/go v0.114.0 h1:OIPFAdfrFDFO2ve2U7r/H5SwSbBzEdrBdE7xkgwc+kY= -cloud.google.com/go v0.114.0/go.mod h1:ZV9La5YYxctro1HTPug5lXH/GefROyW8PPD4T8n9J8E= -cloud.google.com/go/auth v0.5.1 h1:0QNO7VThG54LUzKiQxv8C6x1YX7lUrzlAa1nVLF8CIw= -cloud.google.com/go/auth v0.5.1/go.mod h1:vbZT8GjzDf3AVqCcQmqeeM32U9HBFc32vVVAbwDsa6s= -cloud.google.com/go/auth/oauth2adapt v0.2.2 h1:+TTV8aXpjeChS9M+aTtN/TjdQnzJvmzKFt//oWu7HX4= -cloud.google.com/go/auth/oauth2adapt v0.2.2/go.mod h1:wcYjgpZI9+Yu7LyYBg4pqSiaRkfEK3GQcpb7C/uyF1Q= +cloud.google.com/go v0.116.0 h1:B3fRrSDkLRt5qSHWe40ERJvhvnQwdZiHu0bJOpldweE= +cloud.google.com/go v0.116.0/go.mod h1:cEPSRWPzZEswwdr9BxE6ChEn01dWlTaF05LiC2Xs70U= +cloud.google.com/go/auth v0.13.0 h1:8Fu8TZy167JkW8Tj3q7dIkr2v4cndv41ouecJx0PAHs= +cloud.google.com/go/auth v0.13.0/go.mod h1:COOjD9gwfKNKz+IIduatIhYJQIc0mG3H102r/EMxX6Q= +cloud.google.com/go/auth/oauth2adapt v0.2.6 h1:V6a6XDu2lTwPZWOawrAa9HUK+DB2zfJyTuciBG5hFkU= +cloud.google.com/go/auth/oauth2adapt v0.2.6/go.mod h1:AlmsELtlEBnaNTL7jCj8VQFLy6mbZv0s4Q7NGBeQ5E8= cloud.google.com/go/compute/metadata v0.9.0 h1:pDUj4QMoPejqq20dK0Pg2N4yG9zIkYGdBtwLoEkH9Zs= cloud.google.com/go/compute/metadata v0.9.0/go.mod h1:E0bWwX5wTnLPedCKqk3pJmVgCBSM6qQI1yTBdEb3C10= -cloud.google.com/go/iam v1.1.8 h1:r7umDwhj+BQyz0ScZMp4QrGXjSTI3ZINnpgU2nlB/K0= -cloud.google.com/go/iam v1.1.8/go.mod h1:GvE6lyMmfxXauzNq8NbgJbeVQNspG+tcdL/W8QO1+zE= -cloud.google.com/go/longrunning v0.5.7 h1:WLbHekDbjK1fVFD3ibpFFVoyizlLRl73I7YKuAKilhU= -cloud.google.com/go/longrunning v0.5.7/go.mod h1:8GClkudohy1Fxm3owmBGid8W0pSgodEMwEAztp38Xng= -cloud.google.com/go/pubsub v1.38.0 h1:J1OT7h51ifATIedjqk/uBNPh+1hkvUaH4VKbz4UuAsc= -cloud.google.com/go/pubsub v1.38.0/go.mod h1:IPMJSWSus/cu57UyR01Jqa/bNOQA+XnPF6Z4dKW4fAA= -cloud.google.com/go/storage v1.42.0 h1:4QtGpplCVt1wz6g5o1ifXd656P5z+yNgzdw1tVfp0cU= -cloud.google.com/go/storage v1.42.0/go.mod h1:HjMXRFq65pGKFn6hxj6x3HCyR41uSB72Z0SO/Vn6JFQ= +cloud.google.com/go/iam v1.2.2 h1:ozUSofHUGf/F4tCNy/mu9tHLTaxZFLOUiKzjcgWHGIA= +cloud.google.com/go/iam v1.2.2/go.mod h1:0Ys8ccaZHdI1dEUilwzqng/6ps2YB6vRsjIe00/+6JY= +cloud.google.com/go/logging v1.12.0 h1:ex1igYcGFd4S/RZWOCU51StlIEuey5bjqwH9ZYjHibk= +cloud.google.com/go/logging v1.12.0/go.mod h1:wwYBt5HlYP1InnrtYI0wtwttpVU1rifnMT7RejksUAM= +cloud.google.com/go/longrunning v0.6.2 h1:xjDfh1pQcWPEvnfjZmwjKQEcHnpz6lHjfy7Fo0MK+hc= +cloud.google.com/go/longrunning v0.6.2/go.mod h1:k/vIs83RN4bE3YCswdXC5PFfWVILjm3hpEUlSko4PiI= +cloud.google.com/go/monitoring v1.21.2 h1:FChwVtClH19E7pJ+e0xUhJPGksctZNVOk2UhMmblmdU= +cloud.google.com/go/monitoring v1.21.2/go.mod h1:hS3pXvaG8KgWTSz+dAdyzPrGUYmi2Q+WFX8g2hqVEZU= +cloud.google.com/go/pubsub v1.45.1 h1:ZC/UzYcrmK12THWn1P72z+Pnp2vu/zCZRXyhAfP1hJY= +cloud.google.com/go/pubsub v1.45.1/go.mod h1:3bn7fTmzZFwaUjllitv1WlsNMkqBgGUb3UdMhI54eCc= +cloud.google.com/go/storage v1.49.0 h1:zenOPBOWHCnojRd9aJZAyQXBYqkJkdQS42dxL55CIMw= +cloud.google.com/go/storage v1.49.0/go.mod h1:k1eHhhpLvrPjVGfo0mOUPEJ4Y2+a/Hv5PiwehZI9qGU= +cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI= +cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= -github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0 h1:sBEjpZlNHzK1voKq9695PJSX2o5NEXl7/OL3coiIY0c= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0/go.mod h1:P4WPRUkOhJC13W//jWpyfJNDAIpvRbAUIYLX/4jtlE0= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.48.1 h1:UQ0AhxogsIRZDkElkblfnwjc3IaltCm2HUMvezQaL7s= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.48.1/go.mod h1:jyqM3eLpJ3IbIFDTKVz2rF9T/xWGW0rIriGwnz8l9Tk= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.48.1 h1:oTX4vsorBZo/Zdum6OKPA4o7544hm6smoRv1QjpTwGo= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.48.1/go.mod h1:0wEl7vrAD8mehJyohS9HZy+WyEOaQO2mJx86Cvh93kM= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.48.1 h1:8nn+rsCvTq9axyEh382S0PFLBeaFwNsT43IrPWzctRU= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.48.1/go.mod h1:viRWSEhtMZqz1rhwmOVKkWl6SwmVowfL9O2YR5gI2PE= +github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= +github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/ajg/form v0.0.0-20160822230020-523a5da1a92f h1:zvClvFQwU++UpIUBGC8YmDlfhUrweEy1R1Fj1gu5iIM= github.com/ajg/form v0.0.0-20160822230020-523a5da1a92f/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= @@ -87,8 +103,11 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.14.0 h1:hbG2kr4RuFj222B6+7T83thSPqLjwBIfQawTkC++2HA= +github.com/envoyproxy/go-control-plane v0.14.0/go.mod h1:NcS5X47pLl/hfqxU70yPwL9ZMkUlwlKxtAohpi2wBEU= github.com/envoyproxy/go-control-plane/envoy v1.36.0 h1:yg/JjO5E7ubRyKX3m07GF3reDNEnfOboJ0QySbH736g= github.com/envoyproxy/go-control-plane/envoy v1.36.0/go.mod h1:ty89S1YCCVruQAm9OtKeEkQLTb+Lkz0k8v9W0Oxsv98= +github.com/envoyproxy/go-control-plane/ratelimit v0.1.0 h1:/G9QYbddjL25KvtKTv3an9lx6VBE2cnb8wp1vEGNYGI= +github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod h1:Wk+tMFAFbCXaJPzVVHnPgRKdUdwW/KdbRt94AzgRee4= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v1.3.0 h1:TvGH1wof4H33rezVKWSpqKz5NXWg5VPuZ0uONDT6eb4= github.com/envoyproxy/protoc-gen-validate v1.3.0/go.mod h1:HvYl7zwPa5mffgyeTUHA9zHIH36nmrm7oCbo4YKoSWA= @@ -98,8 +117,8 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2 github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= -github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= +github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/fsouza/fake-gcs-server v1.49.2 h1:fukDqzEQM50QkA0jAbl6cLqeDu3maQjwZBuys759TR4= github.com/fsouza/fake-gcs-server v1.49.2/go.mod h1:17SYzJEXRcaAA5ATwwvgBkSIqIy7r1icnGM0y/y4foY= github.com/gavv/monotime v0.0.0-20161010190848-47d58efa6955 h1:gmtGRvSexPU4B1T/yYo0sLOKzER1YT+b4kPxPpm0Ty4= @@ -108,6 +127,8 @@ github.com/go-chi/chi v4.1.2+incompatible h1:fGFk2Gmi/YKXk0OmGfBh0WgmN3XB8lVnEyN github.com/go-chi/chi v4.1.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk= github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= +github.com/go-jose/go-jose/v4 v4.1.3 h1:CVLmWDhDVRa6Mi/IgCgaopNosCaHz7zrMeF9MlZRkrs= +github.com/go-jose/go-jose/v4 v4.1.3/go.mod h1:x4oUasVrzR7071A4TnHLGSPpNOm2a21K9Kf04k1rs08= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -143,15 +164,15 @@ github.com/google/martian/v3 v3.3.3 h1:DIhPTQrbPkgs2yJYdXU/eNACCG5DVQjySNRNlflZ9 github.com/google/martian/v3 v3.3.3/go.mod h1:iEPrYcgCF7jA9OtScMFQyAlZZ4YXTKEtJ1E6RWzmBA0= github.com/google/renameio/v2 v2.0.0 h1:UifI23ZTGY8Tt29JbYFiuyIU3eX+RNFtUwefq9qAhxg= github.com/google/renameio/v2 v2.0.0/go.mod h1:BtmJXm5YlszgC+TD4HOEEUFgkJP3nLxehU6hfe7jRt4= -github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= -github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= +github.com/google/s2a-go v0.1.8 h1:zZDs9gcbt9ZPLV0ndSyQk6Kacx2g/X+SKYovpnz3SMM= +github.com/google/s2a-go v0.1.8/go.mod h1:6iNWHTpQ+nfNRN5E00MSdfDwVesa8hhS32PhPO8deJA= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= -github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= -github.com/googleapis/gax-go/v2 v2.12.4 h1:9gWcmF85Wvq4ryPFvGFaOgPIs1AQX0d0bcbGw4Z96qg= -github.com/googleapis/gax-go/v2 v2.12.4/go.mod h1:KYEYLorsnIGDi/rPC8b5TdlB9kbKoFubselGIoBMCwI= +github.com/googleapis/enterprise-certificate-proxy v0.3.4 h1:XYIDZApgAnrN1c855gTgghdIA6Stxb52D5RnLI1SLyw= +github.com/googleapis/enterprise-certificate-proxy v0.3.4/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA= +github.com/googleapis/gax-go/v2 v2.14.1 h1:hb0FFeiPaQskmvakKu5EbCbpntQn48jyHuvrkurSS/Q= +github.com/googleapis/gax-go/v2 v2.14.1/go.mod h1:Hb/NubMaVM88SrNkvl8X/o8XWwDJEPqouaLeN2IUxoA= github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE= github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= @@ -168,8 +189,8 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= -github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI= -github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -202,8 +223,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= -github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= @@ -216,6 +237,8 @@ github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spiffe/go-spiffe/v2 v2.6.0 h1:l+DolpxNWYgruGQVV0xsfeya3CsC7m8iBzDnMpsbLuo= +github.com/spiffe/go-spiffe/v2 v2.6.0/go.mod h1:gm2SeUoMZEtpnzPNs2Csc0D/gX33k1xIx7lEzqblHEs= github.com/stellar/go-stellar-sdk v0.5.0 h1:xpOO+ZTyvGz54wTm7pwl2Gf1e6lZl0ExrJ/tKb+Roj4= github.com/stellar/go-stellar-sdk v0.5.0/go.mod h1:tLKAQPxa2I5UvGMabBbUXcY3fmgYnfDudrMeK7CDX4w= github.com/stellar/go-xdr v0.0.0-20260312225820-cc2b0611aabf h1:GY1RVbX3Hg7poPXEf6yojjP0hyypvgUgZmCqQU9D0xg= @@ -235,8 +258,10 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.34.0 h1:d3AAQJ2DRcxJYHm7OXNXtXt2as1vMDfxeIcFvhmGGm4= github.com/valyala/fasthttp v1.34.0/go.mod h1:epZA5N+7pY6ZaEKRmstzOuYJx9HI8DI1oaCGZpdH4h0= -github.com/withObsrvr/flow-proto v0.0.0-20251209215201-bd54ee3e43e9 h1:Pq+W+0MhM3nCx1wI9lsd1kXl5S/0Kup/HzHpvJNgOXk= -github.com/withObsrvr/flow-proto v0.0.0-20251209215201-bd54ee3e43e9/go.mod h1:gCX0x2CG0FR7Ndg6yPzeQjK1qRIr2K5zgw4d+uFHeNw= +github.com/withObsrvr/flow-proto v0.1.3 h1:L6uCl7HgnHS7g81TX2faQnYLjKJAXRjgudI0elCxW6M= +github.com/withObsrvr/flow-proto v0.1.3/go.mod h1:gCX0x2CG0FR7Ndg6yPzeQjK1qRIr2K5zgw4d+uFHeNw= +github.com/withobsrvr/flowctl v0.0.10 h1:hz5Kv2ZetZ0s4lFeF+IZySTEX1E+jrk3rgRRwlkTR8Y= +github.com/withobsrvr/flowctl v0.0.10/go.mod h1:URr+2oiBtAmQ0kdjGcf9ybkTsYmDwrMsJ+iaO19UrHU= github.com/xdrpp/goxdr v0.1.1 h1:E1B2c6E8eYhOVyd7yEpOyopzTPirUeF6mVOfXfGyJyc= github.com/xdrpp/goxdr v0.1.1/go.mod h1:dXo1scL/l6s7iME1gxHWo2XCppbHEKZS7m/KyYWkNzA= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c= @@ -256,25 +281,29 @@ go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= -go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= -go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= -go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0= -go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= -go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= -go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= -go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= -go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= -go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= -go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= +go.opentelemetry.io/contrib/detectors/gcp v1.39.0 h1:kWRNZMsfBHZ+uHjiH4y7Etn2FK26LAGkNFw7RHv1DhE= +go.opentelemetry.io/contrib/detectors/gcp v1.39.0/go.mod h1:t/OGqzHBa5v6RHZwrDBJ2OirWc+4q/w2fTbLZwAKjTk= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 h1:r6I7RJCN86bpD/FQwedZ0vSixDpwuWREjW9oRMsmqDc= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0/go.mod h1:B9yO6b04uB80CzjedvewuqDhxJxi11s7/GtiGa8bAjI= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 h1:F7Jx+6hwnZ41NSFTO5q4LYDtJRXBf2PD0rNBkeB/lus= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0/go.mod h1:UHB22Z8QsdRDrnAtX4PntOl36ajSxcdUMt1sF7Y6E7Q= +go.opentelemetry.io/otel v1.42.0 h1:lSQGzTgVR3+sgJDAU/7/ZMjN9Z+vUip7leaqBKy4sho= +go.opentelemetry.io/otel v1.42.0/go.mod h1:lJNsdRMxCUIWuMlVJWzecSMuNjE7dOYyWlqOXWkdqCc= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.29.0 h1:WDdP9acbMYjbKIyJUhTvtzj601sVJOqgWdUxSdR/Ysc= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.29.0/go.mod h1:BLbf7zbNIONBLPwvFnwNHGj4zge8uTCM/UPIVW1Mq2I= +go.opentelemetry.io/otel/metric v1.42.0 h1:2jXG+3oZLNXEPfNmnpxKDeZsFI5o4J+nz6xUlaFdF/4= +go.opentelemetry.io/otel/metric v1.42.0/go.mod h1:RlUN/7vTU7Ao/diDkEpQpnz3/92J9ko05BIwxYa2SSI= +go.opentelemetry.io/otel/sdk v1.42.0 h1:LyC8+jqk6UJwdrI/8VydAq/hvkFKNHZVIWuslJXYsDo= +go.opentelemetry.io/otel/sdk v1.42.0/go.mod h1:rGHCAxd9DAph0joO4W6OPwxjNTYWghRWmkHuGbayMts= +go.opentelemetry.io/otel/sdk/metric v1.42.0 h1:D/1QR46Clz6ajyZ3G8SgNlTJKBdGp84q9RKCAZ3YGuA= +go.opentelemetry.io/otel/sdk/metric v1.42.0/go.mod h1:Ua6AAlDKdZ7tdvaQKfSmnFTdHx37+J4ba8MwVCYM5hc= +go.opentelemetry.io/otel/trace v1.42.0 h1:OUCgIPt+mzOnaUTpOQcBiM/PLQ/Op7oq6g4LenLmOYY= +go.opentelemetry.io/otel/trace v1.42.0/go.mod h1:f3K9S+IFqnumBkKhRJMeaZeNk9epyhnCmQh/EysQCdc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.46.0 h1:cKRW/pmt1pKAfetfu+RCEvjvZkA9RimPbh7bhFjGVBU= -golang.org/x/crypto v0.46.0/go.mod h1:Evb/oLKmMraqjZ2iQTwDwvCtJkczlDuTmdJXoZVzqU0= +golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= +golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= @@ -282,8 +311,6 @@ golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTk golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.30.0 h1:fDEXFVZ/fmCKProc/yAXXUijritrDzahmwwefnjoPFk= -golang.org/x/mod v0.30.0/go.mod h1:lAsf5O2EvJeSFMiBxXDki7sCgAxEUcZHXoXMKT4GJKc= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -294,8 +321,8 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= -golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= -golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= +golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= +golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.34.0 h1:hqK/t4AKgbqWkdkcAeI8XLmbK+4m4G5YeQRrmiotGlw= golang.org/x/oauth2 v0.34.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= @@ -315,8 +342,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= -golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -324,10 +351,10 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= -golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= -golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= -golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= +golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= +golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= +golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -335,27 +362,23 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.39.0 h1:ik4ho21kwuQln40uelmciQPp9SipgNDdrafrYA4TmQQ= -golang.org/x/tools v0.39.0/go.mod h1:JnefbkDPyD8UU2kI5fuf8ZX4/yUeh9W877ZeBONxUqQ= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= -golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= -google.golang.org/api v0.183.0 h1:PNMeRDwo1pJdgNcFQ9GstuLe/noWKIc89pRWRLMvLwE= -google.golang.org/api v0.183.0/go.mod h1:q43adC5/pHoSZTx5h2mSmdF7NcyfW9JuDyIOJAgS9ZQ= +google.golang.org/api v0.215.0 h1:jdYF4qnyczlEz2ReWIsosNLDuzXyvFHJtI5gcr0J7t0= +google.golang.org/api v0.215.0/go.mod h1:fta3CVtuJYOEdugLNWm6WodzOS8KdFckABwN4I40hzY= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20240528184218-531527333157 h1:u7WMYrIrVvs0TF5yaKwKNbcJyySYf+HAIFXxWltJOXE= -google.golang.org/genproto v0.0.0-20240528184218-531527333157/go.mod h1:ubQlAQnzejB8uZzszhrTCU2Fyp6Vi7ZE5nn0c3W8+qQ= +google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 h1:ToEetK57OidYuqD4Q5w+vfEnPvPpuTwedCNVohYJfNk= +google.golang.org/genproto v0.0.0-20241118233622-e639e219e697/go.mod h1:JJrvXBWRZaFMxBufik1a4RpFw4HhgVtBBWQeQgUj2cc= google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 h1:fCvbg86sFXwdrl5LgVcTEvNC+2txB5mgROGmRL5mrls= google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:+rXWjjaukWZun3mLfjmVnQi18E1AsFbDN9QdJ5YXLto= -google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 h1:gRkg/vSppuSQoDjxyiGfN4Upv/h/DQmIR10ZU8dh4Ww= -google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 h1:mWPCjDEyshlQYzBpMNHaEof6UX1PmHcaUODUywQ0uac= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= diff --git a/pkg/component/reporter.go b/pkg/component/reporter.go new file mode 100644 index 0000000..ff398a5 --- /dev/null +++ b/pkg/component/reporter.go @@ -0,0 +1,343 @@ +package component + +import ( + "context" + "fmt" + "os" + "strconv" + "strings" + "time" + + flowctlpb "github.com/withobsrvr/flowctl/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/timestamppb" +) + +const ( + defaultHeartbeatInterval = 10 * time.Second + defaultDialTimeout = 5 * time.Second +) + +// Config describes the flowctl control-plane connection advertised to a data-plane component. +type Config struct { + Enabled bool + Endpoint string + ComponentID string + RunID string + Attempt int32 + HeartbeatInterval time.Duration +} + +// ConfigFromEnv loads the standard flowctl component environment contract. +func ConfigFromEnv() Config { + attempt := int32(1) + if raw := os.Getenv("FLOWCTL_ATTEMPT"); raw != "" { + if parsed, err := strconv.ParseInt(raw, 10, 32); err == nil && parsed > 0 { + attempt = int32(parsed) + } + } + + interval := defaultHeartbeatInterval + if raw := os.Getenv("FLOWCTL_HEARTBEAT_INTERVAL_MS"); raw != "" { + if parsed, err := strconv.ParseInt(raw, 10, 64); err == nil && parsed > 0 { + interval = time.Duration(parsed) * time.Millisecond + } + } + + return Config{ + Enabled: strings.EqualFold(os.Getenv("ENABLE_FLOWCTL"), "true"), + Endpoint: os.Getenv("FLOWCTL_ENDPOINT"), + ComponentID: os.Getenv("FLOWCTL_COMPONENT_ID"), + RunID: os.Getenv("FLOWCTL_RUN_ID"), + Attempt: attempt, + HeartbeatInterval: interval, + } +} + +// Reporter emits component lifecycle and historical chunk state to flowctl. +type Reporter struct { + cfg Config + conn *grpc.ClientConn + client flowctlpb.ControlPlaneClient + serviceID string +} + +// NewReporter connects to the control plane. If cfg.Enabled is false it returns a disabled no-op reporter. +func NewReporter(ctx context.Context, cfg Config) (*Reporter, error) { + r := &Reporter{cfg: cfg} + if !cfg.Enabled { + return r, nil + } + if cfg.Endpoint == "" { + return nil, fmt.Errorf("FLOWCTL_ENDPOINT is required when ENABLE_FLOWCTL=true") + } + if cfg.ComponentID == "" { + return nil, fmt.Errorf("FLOWCTL_COMPONENT_ID is required when ENABLE_FLOWCTL=true") + } + + dialCtx := ctx + cancel := func() {} + if _, ok := ctx.Deadline(); !ok { + dialCtx, cancel = context.WithTimeout(ctx, defaultDialTimeout) + } + defer cancel() + + conn, err := grpc.DialContext(dialCtx, cfg.Endpoint, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + ) + if err != nil { + return nil, fmt.Errorf("connect to flowctl control plane: %w", err) + } + + r.conn = conn + r.client = flowctlpb.NewControlPlaneClient(conn) + return r, nil +} + +// Close closes the underlying gRPC connection. +func (r *Reporter) Close() error { + if r == nil || r.conn == nil { + return nil + } + return r.conn.Close() +} + +// Enabled reports whether this reporter will send events. +func (r *Reporter) Enabled() bool { + return r != nil && r.cfg.Enabled +} + +// Register announces the component to flowctl. +func (r *Reporter) Register(ctx context.Context, serviceType flowctlpb.ServiceType, metadata map[string]string) error { + if !r.Enabled() { + return nil + } + + meta := copyMap(metadata) + meta["flowctl_run_id"] = r.cfg.RunID + meta["flowctl_attempt"] = strconv.Itoa(int(r.cfg.Attempt)) + + resp, err := r.client.Register(ctx, &flowctlpb.ServiceInfo{ + ServiceId: r.cfg.ComponentID, + ComponentId: r.cfg.ComponentID, + ServiceType: serviceType, + Metadata: meta, + }) + if err != nil { + return fmt.Errorf("register component with flowctl: %w", err) + } + if resp.ServiceId != "" { + r.serviceID = resp.ServiceId + } else { + r.serviceID = r.cfg.ComponentID + } + return nil +} + +// Heartbeat sends a coarse liveness/metric heartbeat. +func (r *Reporter) Heartbeat(ctx context.Context, metrics map[string]float64) error { + if !r.Enabled() { + return nil + } + serviceID := r.serviceID + if serviceID == "" { + serviceID = r.cfg.ComponentID + } + _, err := r.client.Heartbeat(ctx, &flowctlpb.ServiceHeartbeat{ + ServiceId: serviceID, + Metrics: metrics, + }) + if err != nil { + return fmt.Errorf("send flowctl heartbeat: %w", err) + } + return nil +} + +// StartHeartbeatLoop emits heartbeats until ctx is cancelled. Errors are sent to errCh when provided. +func (r *Reporter) StartHeartbeatLoop(ctx context.Context, metrics func() map[string]float64, errCh chan<- error) { + if !r.Enabled() { + return + } + interval := r.cfg.HeartbeatInterval + if interval <= 0 { + interval = defaultHeartbeatInterval + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + var snapshot map[string]float64 + if metrics != nil { + snapshot = metrics() + } + if err := r.Heartbeat(ctx, snapshot); err != nil && errCh != nil { + select { + case errCh <- err: + default: + } + } + } + } +} + +// ChunkUpdate describes a bounded historical work-unit state update. +type ChunkUpdate struct { + ChunkID string + ComponentID string + RunID string + ChunkStart int64 + ChunkEnd int64 + Attempt int32 + Status flowctlpb.ChunkStatus + FailureClass flowctlpb.FailureClass + Phase string + Error string + RecommendedAction string + StartedAt *time.Time + CompletedAt *time.Time + VerifiedAt *time.Time + RowCounts map[string]int64 + Verification map[string]string + Metadata map[string]string +} + +// ReportChunk upserts a historical chunk state record. +func (r *Reporter) ReportChunk(ctx context.Context, update ChunkUpdate) (*flowctlpb.ChunkRun, error) { + if !r.Enabled() { + return nil, nil + } + + componentID := firstNonEmpty(update.ComponentID, r.cfg.ComponentID) + runID := firstNonEmpty(update.RunID, r.cfg.RunID) + attempt := update.Attempt + if attempt == 0 { + attempt = r.cfg.Attempt + } + if attempt == 0 { + attempt = 1 + } + + chunk := &flowctlpb.ChunkRun{ + ChunkId: update.ChunkID, + PipelineRunId: runID, + ComponentId: componentID, + ChunkStart: update.ChunkStart, + ChunkEnd: update.ChunkEnd, + Attempt: attempt, + Status: update.Status, + FailureClass: update.FailureClass, + Phase: update.Phase, + Error: update.Error, + RecommendedAction: update.RecommendedAction, + RowCounts: copyInt64Map(update.RowCounts), + Verification: copyMap(update.Verification), + Metadata: copyMap(update.Metadata), + } + if chunk.Status == flowctlpb.ChunkStatus_CHUNK_STATUS_UNKNOWN { + chunk.Status = flowctlpb.ChunkStatus_CHUNK_STATUS_RUNNING + } + if update.StartedAt != nil { + chunk.StartedAt = timestamppb.New(*update.StartedAt) + } + if update.CompletedAt != nil { + chunk.CompletedAt = timestamppb.New(*update.CompletedAt) + } + if update.VerifiedAt != nil { + chunk.VerifiedAt = timestamppb.New(*update.VerifiedAt) + } + + resp, err := r.client.UpsertChunkRun(ctx, &flowctlpb.UpsertChunkRunRequest{Chunk: chunk}) + if err != nil { + return nil, fmt.Errorf("report flowctl chunk run: %w", err) + } + return resp, nil +} + +// ReportChunkProgress marks a chunk running in a phase. +func (r *Reporter) ReportChunkProgress(ctx context.Context, start, end int64, phase string, rowCounts map[string]int64, metadata map[string]string) error { + now := time.Now() + _, err := r.ReportChunk(ctx, ChunkUpdate{ + ChunkStart: start, + ChunkEnd: end, + Status: flowctlpb.ChunkStatus_CHUNK_STATUS_RUNNING, + Phase: phase, + StartedAt: &now, + RowCounts: rowCounts, + Metadata: metadata, + }) + return err +} + +// ReportChunkCompleted marks a chunk completed or verified. +func (r *Reporter) ReportChunkCompleted(ctx context.Context, start, end int64, verified bool, rowCounts map[string]int64, verification map[string]string) error { + now := time.Now() + status := flowctlpb.ChunkStatus_CHUNK_STATUS_COMPLETED + var verifiedAt *time.Time + if verified { + status = flowctlpb.ChunkStatus_CHUNK_STATUS_VERIFIED + verifiedAt = &now + } + _, err := r.ReportChunk(ctx, ChunkUpdate{ + ChunkStart: start, + ChunkEnd: end, + Status: status, + CompletedAt: &now, + VerifiedAt: verifiedAt, + RowCounts: rowCounts, + Verification: verification, + }) + return err +} + +// ReportChunkFailed marks a chunk failed with a typed failure class. +func (r *Reporter) ReportChunkFailed(ctx context.Context, start, end int64, phase string, failure flowctlpb.FailureClass, errText string, recommendedAction string) error { + now := time.Now() + _, err := r.ReportChunk(ctx, ChunkUpdate{ + ChunkStart: start, + ChunkEnd: end, + Status: flowctlpb.ChunkStatus_CHUNK_STATUS_FAILED, + FailureClass: failure, + Phase: phase, + Error: errText, + RecommendedAction: recommendedAction, + CompletedAt: &now, + }) + return err +} + +func firstNonEmpty(values ...string) string { + for _, value := range values { + if value != "" { + return value + } + } + return "" +} + +func copyMap(src map[string]string) map[string]string { + if len(src) == 0 { + return map[string]string{} + } + dst := make(map[string]string, len(src)) + for k, v := range src { + dst[k] = v + } + return dst +} + +func copyInt64Map(src map[string]int64) map[string]int64 { + if len(src) == 0 { + return map[string]int64{} + } + dst := make(map[string]int64, len(src)) + for k, v := range src { + dst[k] = v + } + return dst +} diff --git a/pkg/component/reporter_test.go b/pkg/component/reporter_test.go new file mode 100644 index 0000000..323bf14 --- /dev/null +++ b/pkg/component/reporter_test.go @@ -0,0 +1,89 @@ +package component + +import ( + "context" + "testing" + "time" + + flowctlpb "github.com/withobsrvr/flowctl/proto" +) + +func TestConfigFromEnvDefaultsWhenDisabled(t *testing.T) { + t.Setenv("ENABLE_FLOWCTL", "") + t.Setenv("FLOWCTL_ENDPOINT", "") + t.Setenv("FLOWCTL_COMPONENT_ID", "") + t.Setenv("FLOWCTL_RUN_ID", "") + t.Setenv("FLOWCTL_ATTEMPT", "") + t.Setenv("FLOWCTL_HEARTBEAT_INTERVAL_MS", "") + + cfg := ConfigFromEnv() + if cfg.Enabled { + t.Fatal("expected flowctl to be disabled") + } + if cfg.Attempt != 1 { + t.Fatalf("expected default attempt 1, got %d", cfg.Attempt) + } + if cfg.HeartbeatInterval != defaultHeartbeatInterval { + t.Fatalf("expected default heartbeat interval %s, got %s", defaultHeartbeatInterval, cfg.HeartbeatInterval) + } +} + +func TestConfigFromEnvParsesFlowctlContract(t *testing.T) { + t.Setenv("ENABLE_FLOWCTL", "true") + t.Setenv("FLOWCTL_ENDPOINT", "flowctl.service.consul:8080") + t.Setenv("FLOWCTL_COMPONENT_ID", "bronze-history-loader") + t.Setenv("FLOWCTL_RUN_ID", "mainnet-bronze-repair-20260612") + t.Setenv("FLOWCTL_ATTEMPT", "3") + t.Setenv("FLOWCTL_HEARTBEAT_INTERVAL_MS", "2500") + + cfg := ConfigFromEnv() + if !cfg.Enabled { + t.Fatal("expected flowctl to be enabled") + } + if cfg.Endpoint != "flowctl.service.consul:8080" { + t.Fatalf("unexpected endpoint: %s", cfg.Endpoint) + } + if cfg.ComponentID != "bronze-history-loader" { + t.Fatalf("unexpected component id: %s", cfg.ComponentID) + } + if cfg.RunID != "mainnet-bronze-repair-20260612" { + t.Fatalf("unexpected run id: %s", cfg.RunID) + } + if cfg.Attempt != 3 { + t.Fatalf("expected attempt 3, got %d", cfg.Attempt) + } + if cfg.HeartbeatInterval != 2500*time.Millisecond { + t.Fatalf("expected heartbeat interval 2500ms, got %s", cfg.HeartbeatInterval) + } +} + +func TestDisabledReporterIsNoop(t *testing.T) { + reporter, err := NewReporter(context.Background(), Config{Enabled: false}) + if err != nil { + t.Fatalf("disabled reporter should not error: %v", err) + } + if reporter.Enabled() { + t.Fatal("expected reporter to be disabled") + } + if err := reporter.Register(context.Background(), flowctlpb.ServiceType_SERVICE_TYPE_SOURCE, nil); err != nil { + t.Fatalf("disabled Register should be noop: %v", err) + } + if err := reporter.Heartbeat(context.Background(), nil); err != nil { + t.Fatalf("disabled Heartbeat should be noop: %v", err) + } + if err := reporter.ReportChunkProgress(context.Background(), 1, 2, "extract", nil, nil); err != nil { + t.Fatalf("disabled ReportChunkProgress should be noop: %v", err) + } +} + +func TestReporterRequiresEndpointAndComponentWhenEnabled(t *testing.T) { + _, err := NewReporter(context.Background(), Config{Enabled: true}) + if err == nil { + t.Fatal("expected missing endpoint error") + } + + _, err = NewReporter(context.Background(), Config{Enabled: true, Endpoint: "127.0.0.1:1"}) + if err == nil { + t.Fatal("expected missing component id error") + } +} From a07d206cdee7e98bd3685f9d43ffebab06c18d15 Mon Sep 17 00:00:00 2001 From: Tillman Mosley III Date: Sat, 20 Jun 2026 08:47:43 -0400 Subject: [PATCH 2/2] pr fixes --- pkg/component/reporter.go | 43 +++++++++++++++++++++++++++------- pkg/component/reporter_test.go | 35 +++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 8 deletions(-) diff --git a/pkg/component/reporter.go b/pkg/component/reporter.go index ff398a5..4265f02 100644 --- a/pkg/component/reporter.go +++ b/pkg/component/reporter.go @@ -6,6 +6,7 @@ import ( "os" "strconv" "strings" + "sync" "time" flowctlpb "github.com/withobsrvr/flowctl/proto" @@ -57,14 +58,18 @@ func ConfigFromEnv() Config { // Reporter emits component lifecycle and historical chunk state to flowctl. type Reporter struct { - cfg Config - conn *grpc.ClientConn - client flowctlpb.ControlPlaneClient + cfg Config + + conn *grpc.ClientConn + client flowctlpb.ControlPlaneClient + + mu sync.RWMutex serviceID string } // NewReporter connects to the control plane. If cfg.Enabled is false it returns a disabled no-op reporter. func NewReporter(ctx context.Context, cfg Config) (*Reporter, error) { + cfg = normalizeConfig(cfg) r := &Reporter{cfg: cfg} if !cfg.Enabled { return r, nil @@ -109,6 +114,28 @@ func (r *Reporter) Enabled() bool { return r != nil && r.cfg.Enabled } +func (r *Reporter) getServiceID() string { + r.mu.RLock() + defer r.mu.RUnlock() + return r.serviceID +} + +func (r *Reporter) setServiceID(serviceID string) { + r.mu.Lock() + defer r.mu.Unlock() + r.serviceID = serviceID +} + +func normalizeConfig(cfg Config) Config { + if cfg.Attempt <= 0 { + cfg.Attempt = 1 + } + if cfg.HeartbeatInterval <= 0 { + cfg.HeartbeatInterval = defaultHeartbeatInterval + } + return cfg +} + // Register announces the component to flowctl. func (r *Reporter) Register(ctx context.Context, serviceType flowctlpb.ServiceType, metadata map[string]string) error { if !r.Enabled() { @@ -128,11 +155,11 @@ func (r *Reporter) Register(ctx context.Context, serviceType flowctlpb.ServiceTy if err != nil { return fmt.Errorf("register component with flowctl: %w", err) } - if resp.ServiceId != "" { - r.serviceID = resp.ServiceId - } else { - r.serviceID = r.cfg.ComponentID + serviceID := resp.ServiceId + if serviceID == "" { + serviceID = r.cfg.ComponentID } + r.setServiceID(serviceID) return nil } @@ -141,7 +168,7 @@ func (r *Reporter) Heartbeat(ctx context.Context, metrics map[string]float64) er if !r.Enabled() { return nil } - serviceID := r.serviceID + serviceID := r.getServiceID() if serviceID == "" { serviceID = r.cfg.ComponentID } diff --git a/pkg/component/reporter_test.go b/pkg/component/reporter_test.go index 323bf14..925e533 100644 --- a/pkg/component/reporter_test.go +++ b/pkg/component/reporter_test.go @@ -65,6 +65,12 @@ func TestDisabledReporterIsNoop(t *testing.T) { if reporter.Enabled() { t.Fatal("expected reporter to be disabled") } + if reporter.cfg.Attempt != 1 { + t.Fatalf("expected normalized attempt 1, got %d", reporter.cfg.Attempt) + } + if reporter.cfg.HeartbeatInterval != defaultHeartbeatInterval { + t.Fatalf("expected normalized heartbeat interval %s, got %s", defaultHeartbeatInterval, reporter.cfg.HeartbeatInterval) + } if err := reporter.Register(context.Background(), flowctlpb.ServiceType_SERVICE_TYPE_SOURCE, nil); err != nil { t.Fatalf("disabled Register should be noop: %v", err) } @@ -76,6 +82,35 @@ func TestDisabledReporterIsNoop(t *testing.T) { } } +func TestReporterServiceIDAccessors(t *testing.T) { + reporter := &Reporter{cfg: Config{ComponentID: "component-a"}} + if got := reporter.getServiceID(); got != "" { + t.Fatalf("expected empty service id, got %q", got) + } + reporter.setServiceID("service-a") + if got := reporter.getServiceID(); got != "service-a" { + t.Fatalf("expected service-a, got %q", got) + } +} + +func TestNormalizeConfig(t *testing.T) { + cfg := normalizeConfig(Config{}) + if cfg.Attempt != 1 { + t.Fatalf("expected default attempt 1, got %d", cfg.Attempt) + } + if cfg.HeartbeatInterval != defaultHeartbeatInterval { + t.Fatalf("expected default heartbeat interval %s, got %s", defaultHeartbeatInterval, cfg.HeartbeatInterval) + } + + cfg = normalizeConfig(Config{Attempt: 4, HeartbeatInterval: 2 * time.Second}) + if cfg.Attempt != 4 { + t.Fatalf("expected attempt to be preserved, got %d", cfg.Attempt) + } + if cfg.HeartbeatInterval != 2*time.Second { + t.Fatalf("expected heartbeat interval to be preserved, got %s", cfg.HeartbeatInterval) + } +} + func TestReporterRequiresEndpointAndComponentWhenEnabled(t *testing.T) { _, err := NewReporter(context.Background(), Config{Enabled: true}) if err == nil {