Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 76 additions & 20 deletions cmd/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ type natsBundle struct {
jobsKV jetstream.KeyValue
registryKV jetstream.KeyValue
factsKV jetstream.KeyValue
stateKV jetstream.KeyValue
objStore file.ObjectStoreManager
}

Expand Down Expand Up @@ -115,10 +114,10 @@ func setupAPIServer(
)

checker := newHealthChecker(b.nc, b.jobsKV)
auditStore, auditKV, serverOpts := createAuditStore(ctx, log, b.nc, namespace)
metricsProvider := newMetricsProvider(
b.nc, b.jobsKV, b.registryKV, b.factsKV, b.stateKV, auditKV, streamName, b.jobClient,
)
auditStore, serverOpts := createAuditStore(ctx, log, b.nc, namespace)
kvBuckets := configuredKVBuckets(namespace)
objBuckets := configuredObjectBuckets(namespace)
metricsProvider := newMetricsProvider(b.nc, streamName, kvBuckets, objBuckets, b.jobClient)

sm := api.New(appConfig, log, serverOpts...)
registerAPIHandlers(
Expand Down Expand Up @@ -208,7 +207,6 @@ func connectNATSBundle(
jobsKV: jobsKV,
registryKV: registryKV,
factsKV: factsKV,
stateKV: stateKV,
objStore: objStore,
}
}
Expand Down Expand Up @@ -241,14 +239,46 @@ func newHealthChecker(
}
}

// configuredKVBuckets returns the namespaced names of all KV buckets
// declared in osapi.yaml. Only non-empty bucket configs are included.
func configuredKVBuckets(namespace string) []string {
var buckets []string
add := func(name string) {
if name != "" {
buckets = append(buckets, job.ApplyNamespaceToInfraName(namespace, name))
}
}

add(appConfig.NATS.KV.Bucket)
add(appConfig.NATS.KV.ResponseBucket)
add(appConfig.NATS.Audit.Bucket)
add(appConfig.NATS.Registry.Bucket)
add(appConfig.NATS.Facts.Bucket)
add(appConfig.NATS.State.Bucket)
add(appConfig.NATS.FileState.Bucket)

return buckets
}

// configuredObjectBuckets returns the namespaced names of all Object Store
// buckets declared in osapi.yaml.
func configuredObjectBuckets(namespace string) []string {
var buckets []string
if appConfig.NATS.Objects.Bucket != "" {
buckets = append(
buckets,
job.ApplyNamespaceToInfraName(namespace, appConfig.NATS.Objects.Bucket),
)
}

return buckets
}

func newMetricsProvider(
nc messaging.NATSClient,
jobsKV jetstream.KeyValue,
registryKV jetstream.KeyValue,
factsKV jetstream.KeyValue,
stateKV jetstream.KeyValue,
auditKV jetstream.KeyValue,
streamName string,
kvBuckets []string,
objBuckets []string,
jc jobclient.JobClient,
) *health.ClosureMetricsProvider {
return &health.ClosureMetricsProvider{
Expand Down Expand Up @@ -285,19 +315,21 @@ func newMetricsProvider(
}, nil
},
KVInfoFn: func(fnCtx context.Context) ([]health.KVMetrics, error) {
buckets := []jetstream.KeyValue{jobsKV, registryKV, factsKV, stateKV, auditKV}
results := make([]health.KVMetrics, 0, len(buckets))
natsConn, ok := nc.(*natsclient.Client)
if !ok || natsConn.ExtJS == nil {
return nil, fmt.Errorf("jetstream client unavailable")
}

for _, kv := range buckets {
if kv == nil {
results := make([]health.KVMetrics, 0, len(kvBuckets))
for _, name := range kvBuckets {
kv, err := natsConn.ExtJS.KeyValue(fnCtx, name)
if err != nil {
continue
}

status, err := kv.Status(fnCtx)
if err != nil {
continue
}

results = append(results, health.KVMetrics{
Name: status.Bucket(),
Keys: int(status.Values()),
Expand All @@ -307,6 +339,30 @@ func newMetricsProvider(

return results, nil
},
ObjectStoreInfoFn: func(fnCtx context.Context) ([]health.ObjectStoreMetrics, error) {
natsConn, ok := nc.(*natsclient.Client)
if !ok || natsConn.ExtJS == nil {
return nil, fmt.Errorf("jetstream client unavailable")
}

results := make([]health.ObjectStoreMetrics, 0, len(objBuckets))
for _, name := range objBuckets {
obj, err := natsConn.ExtJS.ObjectStore(fnCtx, name)
if err != nil {
continue
}
status, err := obj.Status(fnCtx)
if err != nil {
continue
}
results = append(results, health.ObjectStoreMetrics{
Name: status.Bucket(),
Size: status.Size(),
})
}

return results, nil
},
ConsumerStatsFn: func(fnCtx context.Context) (*health.ConsumerMetrics, error) {
natsConn, ok := nc.(*natsclient.Client)
if !ok || natsConn.ExtJS == nil {
Expand Down Expand Up @@ -389,9 +445,9 @@ func createAuditStore(
log *slog.Logger,
nc messaging.NATSClient,
namespace string,
) (audit.Store, jetstream.KeyValue, []api.Option) {
) (audit.Store, []api.Option) {
if appConfig.NATS.Audit.Bucket == "" {
return nil, nil, nil
return nil, nil
}

auditKVConfig := cli.BuildAuditKVConfig(namespace, appConfig.NATS.Audit)
Expand All @@ -402,7 +458,7 @@ func createAuditStore(

store := audit.NewKVStore(log, auditKV)

return store, auditKV, []api.Option{api.WithAuditStore(store)}
return store, []api.Option{api.WithAuditStore(store)}
}

func registerAPIHandlers(
Expand Down
8 changes: 8 additions & 0 deletions cmd/client_health_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ func displayStatusHealth(
))
}

// Object Stores
for _, o := range data.ObjectStores {
cli.PrintKV("Object Store", fmt.Sprintf(
"%s "+cli.DimStyle.Render("(%s)"),
o.Name, cli.FormatBytes(o.Size),
))
}

// Consumers last — the table can be long with many agents
if data.Consumers != nil {
fmt.Println()
Expand Down
Loading