Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions internal/metrics/consumers/otel/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ func (t *Transformer) TransformAndRecord(event metrics.MetricEvent) error {
return nil
case metrics.MetricTypeNUMAStats:
return t.transformNUMAStats(ctx, event.Data, t.buildAttributes(event))
case metrics.MetricTypePSI:
return t.transformPSIStats(ctx, event.Data, t.buildAttributes(event))
case metrics.MetricTypeCgroupPSI:
return t.transformCgroupPSIStats(ctx, event.Data, t.buildAttributes(event))
default:
t.logger.V(1).Info("Unknown metric type", "type", event.MetricType)
return nil
Expand Down Expand Up @@ -866,3 +870,133 @@ func (t *Transformer) transformNUMAStats(ctx context.Context, data any, attrs []

return nil
}

// transformPSIStats transforms Pressure Stall Information statistics
func (t *Transformer) transformPSIStats(ctx context.Context, data any, attrs []attribute.KeyValue) error {
stats, ok := data.(*performance.PSIStats)
if !ok {
return fmt.Errorf("invalid PSI stats data type")
}

// Pre-allocate slice with capacity for resource attribute
baseLen := len(attrs)
resourceAttrs := make([]attribute.KeyValue, baseLen, baseLen+1)
copy(resourceAttrs, attrs)

// Transform CPU pressure
if stats.CPU != nil {
cpuAttrs := append(resourceAttrs[:baseLen], attribute.String("resource", "cpu"))
t.recordPSIResourceMetrics(ctx, stats.CPU, cpuAttrs)
}

// Transform Memory pressure
if stats.Memory != nil {
memAttrs := append(resourceAttrs[:baseLen], attribute.String("resource", "memory"))
t.recordPSIResourceMetrics(ctx, stats.Memory, memAttrs)
}

// Transform I/O pressure
if stats.IO != nil {
ioAttrs := append(resourceAttrs[:baseLen], attribute.String("resource", "io"))
t.recordPSIResourceMetrics(ctx, stats.IO, ioAttrs)
}

return nil
}

// recordPSIResourceMetrics records PSI metrics for a single resource (CPU, memory, or I/O)
func (t *Transformer) recordPSIResourceMetrics(ctx context.Context, stats *performance.PSIResourceStats, attrs []attribute.KeyValue) {
// Pre-allocate slice with capacity for stall_type attribute to avoid hidden allocations
stallAttrs := make([]attribute.KeyValue, len(attrs), len(attrs)+1)
copy(stallAttrs, attrs)

// "some" metrics - at least one task stalled
someAttrs := append(stallAttrs, attribute.String("stall_type", "some"))

if gauge, err := t.getOrCreateFloat64Gauge("system.psi.pressure.avg10", "PSI 10-second average pressure", "%"); err == nil {
gauge.Record(ctx, stats.SomeAvg10, metric.WithAttributes(someAttrs...))
}

if gauge, err := t.getOrCreateFloat64Gauge("system.psi.pressure.avg60", "PSI 60-second average pressure", "%"); err == nil {
gauge.Record(ctx, stats.SomeAvg60, metric.WithAttributes(someAttrs...))
}

if gauge, err := t.getOrCreateFloat64Gauge("system.psi.pressure.avg300", "PSI 300-second average pressure", "%"); err == nil {
gauge.Record(ctx, stats.SomeAvg300, metric.WithAttributes(someAttrs...))
}

if counter, err := t.getOrCreateInt64Counter("system.psi.pressure.total", "PSI total stall time", "us"); err == nil {
counter.Add(ctx, int64(stats.SomeTotal), metric.WithAttributes(someAttrs...))
}

// "full" metrics - all non-idle tasks stalled
// Note: For CPU at system level, "full" is always 0
// Reuse stallAttrs backing array by resetting length
fullAttrs := append(stallAttrs[:len(attrs)], attribute.String("stall_type", "full"))

if gauge, err := t.getOrCreateFloat64Gauge("system.psi.pressure.avg10", "PSI 10-second average pressure", "%"); err == nil {
gauge.Record(ctx, stats.FullAvg10, metric.WithAttributes(fullAttrs...))
}

if gauge, err := t.getOrCreateFloat64Gauge("system.psi.pressure.avg60", "PSI 60-second average pressure", "%"); err == nil {
gauge.Record(ctx, stats.FullAvg60, metric.WithAttributes(fullAttrs...))
}

if gauge, err := t.getOrCreateFloat64Gauge("system.psi.pressure.avg300", "PSI 300-second average pressure", "%"); err == nil {
gauge.Record(ctx, stats.FullAvg300, metric.WithAttributes(fullAttrs...))
}

if counter, err := t.getOrCreateInt64Counter("system.psi.pressure.total", "PSI total stall time", "us"); err == nil {
counter.Add(ctx, int64(stats.FullTotal), metric.WithAttributes(fullAttrs...))
}
}

// transformCgroupPSIStats transforms per-container PSI statistics
func (t *Transformer) transformCgroupPSIStats(ctx context.Context, data any, attrs []attribute.KeyValue) error {
statsList, ok := data.([]*performance.CgroupPSIStats)
if !ok {
return fmt.Errorf("invalid cgroup PSI stats data type")
}

// Pre-allocate slice with capacity for container attrs + resource type
// to avoid repeated allocations in the loop: base + container.id + cgroup.path + container.name + resource
const maxExtraAttrs = 4
baseLen := len(attrs)

for _, stats := range statsList {
// Reuse a single pre-allocated slice per container iteration
containerAttrs := make([]attribute.KeyValue, baseLen, baseLen+maxExtraAttrs)
copy(containerAttrs, attrs)

containerAttrs = append(containerAttrs,
attribute.String("container.id", stats.ContainerID),
attribute.String("cgroup.path", stats.CgroupPath),
)

if stats.ContainerName != "" {
containerAttrs = append(containerAttrs, attribute.String("container.name", stats.ContainerName))
}

containerBaseLen := len(containerAttrs)

// Transform CPU pressure
if stats.CPU != nil {
cpuAttrs := append(containerAttrs[:containerBaseLen:cap(containerAttrs)], attribute.String("resource", "cpu"))
t.recordPSIResourceMetrics(ctx, stats.CPU, cpuAttrs)
}

// Transform Memory pressure
if stats.Memory != nil {
memAttrs := append(containerAttrs[:containerBaseLen:cap(containerAttrs)], attribute.String("resource", "memory"))
t.recordPSIResourceMetrics(ctx, stats.Memory, memAttrs)
}

// Transform I/O pressure
if stats.IO != nil {
ioAttrs := append(containerAttrs[:containerBaseLen:cap(containerAttrs)], attribute.String("resource", "io"))
t.recordPSIResourceMetrics(ctx, stats.IO, ioAttrs)
}
}

return nil
}
2 changes: 2 additions & 0 deletions internal/metrics/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ const (
MetricTypeKernel MetricType = "kernel"
MetricTypeSystem MetricType = "system"
MetricTypeNUMAStats MetricType = "numa_stats"
MetricTypePSI MetricType = "psi" // Pressure Stall Information
// Runtime Container Statistics
MetricTypeCgroupCPU MetricType = "cgroup_cpu"
MetricTypeCgroupMemory MetricType = "cgroup_memory"
MetricTypeCgroupPSI MetricType = "cgroup_psi" // Pressure Stall Information per container
MetricTypeCgroupIO MetricType = "cgroup_io" // Future
MetricTypeCgroupNetwork MetricType = "cgroup_network" // Future
// Hardware configuration collectors
Expand Down
140 changes: 140 additions & 0 deletions pkg/performance/collectors/cgroup_psi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright Antimetal, Inc. All rights reserved.
//
// Use of this source code is governed by a source available license that can be found in the
// LICENSE file or at:
// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt

package collectors

import (
"context"
"fmt"
"os"
"path/filepath"

"github.com/antimetal/agent/pkg/containers"
"github.com/antimetal/agent/pkg/performance"
"github.com/go-logr/logr"
)

func init() {
performance.Register(performance.MetricTypeCgroupPSI, performance.PartialNewContinuousPointCollector(
func(logger logr.Logger, config performance.CollectionConfig) (performance.PointCollector, error) {
return NewCgroupPSICollector(logger, config)
},
))
}

var _ performance.PointCollector = (*CgroupPSICollector)(nil)

// CgroupPSICollector collects Pressure Stall Information from container cgroups
// PSI provides metrics about resource contention (CPU, memory, I/O) per container
// Reference: https://www.kernel.org/doc/html/latest/accounting/psi.html
//
// PSI is standard in cgroup v2 (kernel 4.20+) but rarely available in cgroup v1
// Gracefully handles missing PSI files by skipping unavailable resources
type CgroupPSICollector struct {
performance.BaseCollector
cgroupPath string
discovery *containers.Discovery
}

func NewCgroupPSICollector(logger logr.Logger, config performance.CollectionConfig) (*CgroupPSICollector, error) {
if err := config.Validate(performance.ValidateOptions{RequireHostSysPath: true}); err != nil {
return nil, err
}

capabilities := performance.CollectorCapabilities{
SupportsOneShot: true,
SupportsContinuous: false,
RequiredCapabilities: nil,
MinKernelVersion: "4.20.0", // PSI introduced in 4.20
}

cgroupPath := filepath.Join(config.HostSysPath, "fs", "cgroup")

return &CgroupPSICollector{
BaseCollector: performance.NewBaseCollector(
performance.MetricTypeCgroupPSI,
"Cgroup PSI Collector",
logger,
config,
capabilities,
),
cgroupPath: cgroupPath,
discovery: containers.NewDiscovery(cgroupPath),
}, nil
}

func (c *CgroupPSICollector) Collect(ctx context.Context) (performance.Event, error) {
version, err := c.discovery.DetectCgroupVersion()
if err != nil {
return performance.Event{}, fmt.Errorf("failed to detect cgroup version: %w", err)
}

c.Logger().V(2).Info("Detected cgroup version", "version", version)

// Discover containers - use empty subsystem since PSI files are in container's root cgroup
discoveredContainers, err := c.discovery.DiscoverContainers("", version)
if err != nil {
return performance.Event{}, fmt.Errorf("failed to discover containers: %w", err)
}

var stats []*performance.CgroupPSIStats
for _, container := range discoveredContainers {
select {
case <-ctx.Done():
return performance.Event{Metric: performance.MetricTypeCgroupPSI, Data: stats}, ctx.Err()
default:
}

stat, err := c.collectContainerPSI(container)
if err != nil {
c.Logger().V(1).Info("Failed to collect PSI for container",
"containerID", container.ID,
"error", err)
continue
}
stats = append(stats, stat)
}

return performance.Event{Metric: performance.MetricTypeCgroupPSI, Data: stats}, nil
}

func (c *CgroupPSICollector) collectContainerPSI(container containers.Container) (*performance.CgroupPSIStats, error) {
stats := &performance.CgroupPSIStats{
ContainerID: container.ID,
CgroupPath: container.CgroupPath,
}

// Read cpu.pressure (optional - may not exist in v1)
cpuPath := filepath.Join(container.CgroupPath, "cpu.pressure")
if data, err := os.ReadFile(cpuPath); err == nil {
if parsed, err := ParsePSIData(string(data)); err == nil {
stats.CPU = parsed
}
}

// Read memory.pressure (optional - may not exist in v1)
memPath := filepath.Join(container.CgroupPath, "memory.pressure")
if data, err := os.ReadFile(memPath); err == nil {
if parsed, err := ParsePSIData(string(data)); err == nil {
stats.Memory = parsed
}
}

// Read io.pressure (optional - may not exist in v1)
ioPath := filepath.Join(container.CgroupPath, "io.pressure")
if data, err := os.ReadFile(ioPath); err == nil {
if parsed, err := ParsePSIData(string(data)); err == nil {
stats.IO = parsed
}
}

// Return error only if ALL PSI files are missing
if stats.CPU == nil && stats.Memory == nil && stats.IO == nil {
return nil, fmt.Errorf("no PSI data available for container %s (cgroup v1 may not support PSI)", container.ID)
}

return stats, nil
}
Loading
Loading