diff --git a/internal/metrics/consumers/otel/transformer.go b/internal/metrics/consumers/otel/transformer.go index 917b6393..1c83d138 100644 --- a/internal/metrics/consumers/otel/transformer.go +++ b/internal/metrics/consumers/otel/transformer.go @@ -99,6 +99,10 @@ func (t *Transformer) TransformAndRecord(event metrics.MetricEvent) error { return nil case metrics.MetricTypeNUMAStats: return t.transformNUMAStats(ctx, event.Data, t.buildAttributes(event)) + case metrics.MetricTypePSI: + return t.transformPSIStats(ctx, event.Data, t.buildAttributes(event)) + case metrics.MetricTypeCgroupPSI: + return t.transformCgroupPSIStats(ctx, event.Data, t.buildAttributes(event)) default: t.logger.V(1).Info("Unknown metric type", "type", event.MetricType) return nil @@ -866,3 +870,133 @@ func (t *Transformer) transformNUMAStats(ctx context.Context, data any, attrs [] return nil } + +// transformPSIStats transforms Pressure Stall Information statistics +func (t *Transformer) transformPSIStats(ctx context.Context, data any, attrs []attribute.KeyValue) error { + stats, ok := data.(*performance.PSIStats) + if !ok { + return fmt.Errorf("invalid PSI stats data type") + } + + // Pre-allocate slice with capacity for resource attribute + baseLen := len(attrs) + resourceAttrs := make([]attribute.KeyValue, baseLen, baseLen+1) + copy(resourceAttrs, attrs) + + // Transform CPU pressure + if stats.CPU != nil { + cpuAttrs := append(resourceAttrs[:baseLen], attribute.String("resource", "cpu")) + t.recordPSIResourceMetrics(ctx, stats.CPU, cpuAttrs) + } + + // Transform Memory pressure + if stats.Memory != nil { + memAttrs := append(resourceAttrs[:baseLen], attribute.String("resource", "memory")) + t.recordPSIResourceMetrics(ctx, stats.Memory, memAttrs) + } + + // Transform I/O pressure + if stats.IO != nil { + ioAttrs := append(resourceAttrs[:baseLen], attribute.String("resource", "io")) + t.recordPSIResourceMetrics(ctx, stats.IO, ioAttrs) + } + + return nil +} + +// recordPSIResourceMetrics records PSI metrics for a single resource (CPU, memory, or I/O) +func (t *Transformer) recordPSIResourceMetrics(ctx context.Context, stats *performance.PSIResourceStats, attrs []attribute.KeyValue) { + // Pre-allocate slice with capacity for stall_type attribute to avoid hidden allocations + stallAttrs := make([]attribute.KeyValue, len(attrs), len(attrs)+1) + copy(stallAttrs, attrs) + + // "some" metrics - at least one task stalled + someAttrs := append(stallAttrs, attribute.String("stall_type", "some")) + + if gauge, err := t.getOrCreateFloat64Gauge("system.psi.pressure.avg10", "PSI 10-second average pressure", "%"); err == nil { + gauge.Record(ctx, stats.SomeAvg10, metric.WithAttributes(someAttrs...)) + } + + if gauge, err := t.getOrCreateFloat64Gauge("system.psi.pressure.avg60", "PSI 60-second average pressure", "%"); err == nil { + gauge.Record(ctx, stats.SomeAvg60, metric.WithAttributes(someAttrs...)) + } + + if gauge, err := t.getOrCreateFloat64Gauge("system.psi.pressure.avg300", "PSI 300-second average pressure", "%"); err == nil { + gauge.Record(ctx, stats.SomeAvg300, metric.WithAttributes(someAttrs...)) + } + + if counter, err := t.getOrCreateInt64Counter("system.psi.pressure.total", "PSI total stall time", "us"); err == nil { + counter.Add(ctx, int64(stats.SomeTotal), metric.WithAttributes(someAttrs...)) + } + + // "full" metrics - all non-idle tasks stalled + // Note: For CPU at system level, "full" is always 0 + // Reuse stallAttrs backing array by resetting length + fullAttrs := append(stallAttrs[:len(attrs)], attribute.String("stall_type", "full")) + + if gauge, err := t.getOrCreateFloat64Gauge("system.psi.pressure.avg10", "PSI 10-second average pressure", "%"); err == nil { + gauge.Record(ctx, stats.FullAvg10, metric.WithAttributes(fullAttrs...)) + } + + if gauge, err := t.getOrCreateFloat64Gauge("system.psi.pressure.avg60", "PSI 60-second average pressure", "%"); err == nil { + gauge.Record(ctx, stats.FullAvg60, metric.WithAttributes(fullAttrs...)) + } + + if gauge, err := t.getOrCreateFloat64Gauge("system.psi.pressure.avg300", "PSI 300-second average pressure", "%"); err == nil { + gauge.Record(ctx, stats.FullAvg300, metric.WithAttributes(fullAttrs...)) + } + + if counter, err := t.getOrCreateInt64Counter("system.psi.pressure.total", "PSI total stall time", "us"); err == nil { + counter.Add(ctx, int64(stats.FullTotal), metric.WithAttributes(fullAttrs...)) + } +} + +// transformCgroupPSIStats transforms per-container PSI statistics +func (t *Transformer) transformCgroupPSIStats(ctx context.Context, data any, attrs []attribute.KeyValue) error { + statsList, ok := data.([]*performance.CgroupPSIStats) + if !ok { + return fmt.Errorf("invalid cgroup PSI stats data type") + } + + // Pre-allocate slice with capacity for container attrs + resource type + // to avoid repeated allocations in the loop: base + container.id + cgroup.path + container.name + resource + const maxExtraAttrs = 4 + baseLen := len(attrs) + + for _, stats := range statsList { + // Reuse a single pre-allocated slice per container iteration + containerAttrs := make([]attribute.KeyValue, baseLen, baseLen+maxExtraAttrs) + copy(containerAttrs, attrs) + + containerAttrs = append(containerAttrs, + attribute.String("container.id", stats.ContainerID), + attribute.String("cgroup.path", stats.CgroupPath), + ) + + if stats.ContainerName != "" { + containerAttrs = append(containerAttrs, attribute.String("container.name", stats.ContainerName)) + } + + containerBaseLen := len(containerAttrs) + + // Transform CPU pressure + if stats.CPU != nil { + cpuAttrs := append(containerAttrs[:containerBaseLen:cap(containerAttrs)], attribute.String("resource", "cpu")) + t.recordPSIResourceMetrics(ctx, stats.CPU, cpuAttrs) + } + + // Transform Memory pressure + if stats.Memory != nil { + memAttrs := append(containerAttrs[:containerBaseLen:cap(containerAttrs)], attribute.String("resource", "memory")) + t.recordPSIResourceMetrics(ctx, stats.Memory, memAttrs) + } + + // Transform I/O pressure + if stats.IO != nil { + ioAttrs := append(containerAttrs[:containerBaseLen:cap(containerAttrs)], attribute.String("resource", "io")) + t.recordPSIResourceMetrics(ctx, stats.IO, ioAttrs) + } + } + + return nil +} diff --git a/internal/metrics/event.go b/internal/metrics/event.go index 05f1051d..37cb89fb 100644 --- a/internal/metrics/event.go +++ b/internal/metrics/event.go @@ -29,9 +29,11 @@ const ( MetricTypeKernel MetricType = "kernel" MetricTypeSystem MetricType = "system" MetricTypeNUMAStats MetricType = "numa_stats" + MetricTypePSI MetricType = "psi" // Pressure Stall Information // Runtime Container Statistics MetricTypeCgroupCPU MetricType = "cgroup_cpu" MetricTypeCgroupMemory MetricType = "cgroup_memory" + MetricTypeCgroupPSI MetricType = "cgroup_psi" // Pressure Stall Information per container MetricTypeCgroupIO MetricType = "cgroup_io" // Future MetricTypeCgroupNetwork MetricType = "cgroup_network" // Future // Hardware configuration collectors diff --git a/pkg/performance/collectors/cgroup_psi.go b/pkg/performance/collectors/cgroup_psi.go new file mode 100644 index 00000000..7207b55a --- /dev/null +++ b/pkg/performance/collectors/cgroup_psi.go @@ -0,0 +1,140 @@ +// Copyright Antimetal, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +package collectors + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "github.com/antimetal/agent/pkg/containers" + "github.com/antimetal/agent/pkg/performance" + "github.com/go-logr/logr" +) + +func init() { + performance.Register(performance.MetricTypeCgroupPSI, performance.PartialNewContinuousPointCollector( + func(logger logr.Logger, config performance.CollectionConfig) (performance.PointCollector, error) { + return NewCgroupPSICollector(logger, config) + }, + )) +} + +var _ performance.PointCollector = (*CgroupPSICollector)(nil) + +// CgroupPSICollector collects Pressure Stall Information from container cgroups +// PSI provides metrics about resource contention (CPU, memory, I/O) per container +// Reference: https://www.kernel.org/doc/html/latest/accounting/psi.html +// +// PSI is standard in cgroup v2 (kernel 4.20+) but rarely available in cgroup v1 +// Gracefully handles missing PSI files by skipping unavailable resources +type CgroupPSICollector struct { + performance.BaseCollector + cgroupPath string + discovery *containers.Discovery +} + +func NewCgroupPSICollector(logger logr.Logger, config performance.CollectionConfig) (*CgroupPSICollector, error) { + if err := config.Validate(performance.ValidateOptions{RequireHostSysPath: true}); err != nil { + return nil, err + } + + capabilities := performance.CollectorCapabilities{ + SupportsOneShot: true, + SupportsContinuous: false, + RequiredCapabilities: nil, + MinKernelVersion: "4.20.0", // PSI introduced in 4.20 + } + + cgroupPath := filepath.Join(config.HostSysPath, "fs", "cgroup") + + return &CgroupPSICollector{ + BaseCollector: performance.NewBaseCollector( + performance.MetricTypeCgroupPSI, + "Cgroup PSI Collector", + logger, + config, + capabilities, + ), + cgroupPath: cgroupPath, + discovery: containers.NewDiscovery(cgroupPath), + }, nil +} + +func (c *CgroupPSICollector) Collect(ctx context.Context) (performance.Event, error) { + version, err := c.discovery.DetectCgroupVersion() + if err != nil { + return performance.Event{}, fmt.Errorf("failed to detect cgroup version: %w", err) + } + + c.Logger().V(2).Info("Detected cgroup version", "version", version) + + // Discover containers - use empty subsystem since PSI files are in container's root cgroup + discoveredContainers, err := c.discovery.DiscoverContainers("", version) + if err != nil { + return performance.Event{}, fmt.Errorf("failed to discover containers: %w", err) + } + + var stats []*performance.CgroupPSIStats + for _, container := range discoveredContainers { + select { + case <-ctx.Done(): + return performance.Event{Metric: performance.MetricTypeCgroupPSI, Data: stats}, ctx.Err() + default: + } + + stat, err := c.collectContainerPSI(container) + if err != nil { + c.Logger().V(1).Info("Failed to collect PSI for container", + "containerID", container.ID, + "error", err) + continue + } + stats = append(stats, stat) + } + + return performance.Event{Metric: performance.MetricTypeCgroupPSI, Data: stats}, nil +} + +func (c *CgroupPSICollector) collectContainerPSI(container containers.Container) (*performance.CgroupPSIStats, error) { + stats := &performance.CgroupPSIStats{ + ContainerID: container.ID, + CgroupPath: container.CgroupPath, + } + + // Read cpu.pressure (optional - may not exist in v1) + cpuPath := filepath.Join(container.CgroupPath, "cpu.pressure") + if data, err := os.ReadFile(cpuPath); err == nil { + if parsed, err := ParsePSIData(string(data)); err == nil { + stats.CPU = parsed + } + } + + // Read memory.pressure (optional - may not exist in v1) + memPath := filepath.Join(container.CgroupPath, "memory.pressure") + if data, err := os.ReadFile(memPath); err == nil { + if parsed, err := ParsePSIData(string(data)); err == nil { + stats.Memory = parsed + } + } + + // Read io.pressure (optional - may not exist in v1) + ioPath := filepath.Join(container.CgroupPath, "io.pressure") + if data, err := os.ReadFile(ioPath); err == nil { + if parsed, err := ParsePSIData(string(data)); err == nil { + stats.IO = parsed + } + } + + // Return error only if ALL PSI files are missing + if stats.CPU == nil && stats.Memory == nil && stats.IO == nil { + return nil, fmt.Errorf("no PSI data available for container %s (cgroup v1 may not support PSI)", container.ID) + } + + return stats, nil +} diff --git a/pkg/performance/collectors/cgroup_psi_test.go b/pkg/performance/collectors/cgroup_psi_test.go new file mode 100644 index 00000000..8201f358 --- /dev/null +++ b/pkg/performance/collectors/cgroup_psi_test.go @@ -0,0 +1,240 @@ +// Copyright Antimetal, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +//go:build !integration + +package collectors_test + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/antimetal/agent/pkg/performance" + "github.com/antimetal/agent/pkg/performance/collectors" + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Reuse PSI test data from psi_test.go +const ( + cgroupValidCPUPSI = `some avg10=0.50 avg60=1.05 avg300=1.50 total=5000000 +full avg10=0.00 avg60=0.00 avg300=0.00 total=0` + + cgroupValidMemoryPSI = `some avg10=8.20 avg60=10.50 avg300=12.75 total=8000000000 +full avg10=2.10 avg60=3.50 avg300=5.20 total=2000000000` + + cgroupValidIOPSI = `some avg10=15.50 avg60=18.30 avg300=20.10 total=15000000000 +full avg10=5.00 avg60=7.50 avg300=10.00 total=5000000000` +) + +// TestCgroupPSICollector_Constructor tests cgroup PSI collector initialization +func TestCgroupPSICollector_Constructor(t *testing.T) { + t.Run("valid configuration", func(t *testing.T) { + tmpDir := t.TempDir() + cgroupDir := filepath.Join(tmpDir, "fs", "cgroup") + require.NoError(t, os.MkdirAll(cgroupDir, 0755)) + + // Create cgroup v2 marker + require.NoError(t, os.WriteFile(filepath.Join(cgroupDir, "cgroup.controllers"), []byte("cpu memory io"), 0644)) + + config := performance.CollectionConfig{ + HostSysPath: tmpDir, + } + + collector, err := collectors.NewCgroupPSICollector(logr.Discard(), config) + require.NoError(t, err) + require.NotNil(t, collector) + }) + + t.Run("missing host sys path", func(t *testing.T) { + config := performance.CollectionConfig{ + HostSysPath: "", + } + + collector, err := collectors.NewCgroupPSICollector(logr.Discard(), config) + assert.Error(t, err) + assert.Nil(t, collector) + }) +} + +// TestCgroupPSICollector_CgroupV2 tests collection from cgroup v2 +func TestCgroupPSICollector_CgroupV2(t *testing.T) { + t.Run("collect from single container", func(t *testing.T) { + tmpDir := t.TempDir() + cgroupDir := filepath.Join(tmpDir, "fs", "cgroup") + require.NoError(t, os.MkdirAll(cgroupDir, 0755)) + + // Create cgroup v2 marker + require.NoError(t, os.WriteFile(filepath.Join(cgroupDir, "cgroup.controllers"), []byte("cpu memory io"), 0644)) + + // Create a container with PSI files (container IDs must be 12+ hex chars) + containerID := "abc123def456" + containerPath := filepath.Join(cgroupDir, "docker", containerID) + require.NoError(t, os.MkdirAll(containerPath, 0755)) + require.NoError(t, os.WriteFile(filepath.Join(containerPath, "cgroup.procs"), []byte("1234\n"), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(containerPath, "cpu.pressure"), []byte(cgroupValidCPUPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(containerPath, "memory.pressure"), []byte(cgroupValidMemoryPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(containerPath, "io.pressure"), []byte(cgroupValidIOPSI), 0644)) + + config := performance.CollectionConfig{HostSysPath: tmpDir} + collector, err := collectors.NewCgroupPSICollector(logr.Discard(), config) + require.NoError(t, err) + + event, err := collector.Collect(context.Background()) + require.NoError(t, err) + + stats, ok := event.Data.([]*performance.CgroupPSIStats) + require.True(t, ok) + require.Len(t, stats, 1) + + // Verify container stats + assert.Contains(t, stats[0].ContainerID, containerID) + assert.NotNil(t, stats[0].CPU) + assert.NotNil(t, stats[0].Memory) + assert.NotNil(t, stats[0].IO) + + // Verify CPU pressure + assert.Equal(t, 0.50, stats[0].CPU.SomeAvg10) + assert.Equal(t, uint64(5000000), stats[0].CPU.SomeTotal) + + // Verify Memory pressure + assert.Equal(t, 8.20, stats[0].Memory.SomeAvg10) + assert.Equal(t, 2.10, stats[0].Memory.FullAvg10) + }) + + t.Run("collect from multiple containers", func(t *testing.T) { + tmpDir := t.TempDir() + cgroupDir := filepath.Join(tmpDir, "fs", "cgroup") + require.NoError(t, os.MkdirAll(cgroupDir, 0755)) + + require.NoError(t, os.WriteFile(filepath.Join(cgroupDir, "cgroup.controllers"), []byte("cpu memory io"), 0644)) + + // Container 1 (container IDs must be 12+ hex chars) + container1Path := filepath.Join(cgroupDir, "docker", "aabbccdd1111") + require.NoError(t, os.MkdirAll(container1Path, 0755)) + require.NoError(t, os.WriteFile(filepath.Join(container1Path, "cgroup.procs"), []byte("100\n"), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(container1Path, "cpu.pressure"), []byte(cgroupValidCPUPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(container1Path, "memory.pressure"), []byte(cgroupValidMemoryPSI), 0644)) + + // Container 2 + container2Path := filepath.Join(cgroupDir, "docker", "aabbccdd2222") + require.NoError(t, os.MkdirAll(container2Path, 0755)) + require.NoError(t, os.WriteFile(filepath.Join(container2Path, "cgroup.procs"), []byte("200\n"), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(container2Path, "cpu.pressure"), []byte(cgroupValidCPUPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(container2Path, "io.pressure"), []byte(cgroupValidIOPSI), 0644)) + + config := performance.CollectionConfig{HostSysPath: tmpDir} + collector, err := collectors.NewCgroupPSICollector(logr.Discard(), config) + require.NoError(t, err) + + event, err := collector.Collect(context.Background()) + require.NoError(t, err) + + stats := event.Data.([]*performance.CgroupPSIStats) + require.Len(t, stats, 2) + }) + + t.Run("partial PSI files - graceful degradation", func(t *testing.T) { + tmpDir := t.TempDir() + cgroupDir := filepath.Join(tmpDir, "fs", "cgroup") + require.NoError(t, os.MkdirAll(cgroupDir, 0755)) + + require.NoError(t, os.WriteFile(filepath.Join(cgroupDir, "cgroup.controllers"), []byte("cpu memory io"), 0644)) + + // Container with only memory.pressure (CPU and IO missing) + // Container IDs must be 12+ hex chars + containerPath := filepath.Join(cgroupDir, "docker", "aabbccdd3333") + require.NoError(t, os.MkdirAll(containerPath, 0755)) + require.NoError(t, os.WriteFile(filepath.Join(containerPath, "cgroup.procs"), []byte("1\n"), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(containerPath, "memory.pressure"), []byte(cgroupValidMemoryPSI), 0644)) + + config := performance.CollectionConfig{HostSysPath: tmpDir} + collector, err := collectors.NewCgroupPSICollector(logr.Discard(), config) + require.NoError(t, err) + + event, err := collector.Collect(context.Background()) + require.NoError(t, err) + + stats := event.Data.([]*performance.CgroupPSIStats) + require.Len(t, stats, 1) + + // Should have memory but not CPU/IO + assert.Nil(t, stats[0].CPU, "CPU PSI should be nil when file missing") + assert.NotNil(t, stats[0].Memory, "Memory PSI should be present") + assert.Nil(t, stats[0].IO, "IO PSI should be nil when file missing") + }) + + t.Run("no PSI files - skip container", func(t *testing.T) { + tmpDir := t.TempDir() + cgroupDir := filepath.Join(tmpDir, "fs", "cgroup") + require.NoError(t, os.MkdirAll(cgroupDir, 0755)) + + require.NoError(t, os.WriteFile(filepath.Join(cgroupDir, "cgroup.controllers"), []byte("cpu memory io"), 0644)) + + // Container with no PSI files (cgroup v1 scenario) + // Container IDs must be 12+ hex chars + containerPath := filepath.Join(cgroupDir, "docker", "aabbccdd4444") + require.NoError(t, os.MkdirAll(containerPath, 0755)) + require.NoError(t, os.WriteFile(filepath.Join(containerPath, "cgroup.procs"), []byte("1\n"), 0644)) + + config := performance.CollectionConfig{HostSysPath: tmpDir} + collector, err := collectors.NewCgroupPSICollector(logr.Discard(), config) + require.NoError(t, err) + + event, err := collector.Collect(context.Background()) + require.NoError(t, err) + + stats := event.Data.([]*performance.CgroupPSIStats) + assert.Len(t, stats, 0, "Containers without any PSI files should be skipped") + }) +} + +// TestCgroupPSICollector_ErrorHandling tests error scenarios +func TestCgroupPSICollector_ErrorHandling(t *testing.T) { + t.Run("missing cgroup directory", func(t *testing.T) { + tmpDir := t.TempDir() + + config := performance.CollectionConfig{HostSysPath: tmpDir} + collector, err := collectors.NewCgroupPSICollector(logr.Discard(), config) + require.NoError(t, err) + + _, err = collector.Collect(context.Background()) + // Should return error when cgroup directory structure is invalid + assert.Error(t, err) + }) + + t.Run("malformed PSI file - skip that resource", func(t *testing.T) { + tmpDir := t.TempDir() + cgroupDir := filepath.Join(tmpDir, "fs", "cgroup") + require.NoError(t, os.MkdirAll(cgroupDir, 0755)) + + require.NoError(t, os.WriteFile(filepath.Join(cgroupDir, "cgroup.controllers"), []byte("cpu memory io"), 0644)) + + // Container IDs must be 12+ hex chars + containerPath := filepath.Join(cgroupDir, "docker", "aabbccdd5555") + require.NoError(t, os.MkdirAll(containerPath, 0755)) + require.NoError(t, os.WriteFile(filepath.Join(containerPath, "cgroup.procs"), []byte("1\n"), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(containerPath, "cpu.pressure"), []byte("invalid data"), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(containerPath, "memory.pressure"), []byte(cgroupValidMemoryPSI), 0644)) + + config := performance.CollectionConfig{HostSysPath: tmpDir} + collector, err := collectors.NewCgroupPSICollector(logr.Discard(), config) + require.NoError(t, err) + + event, err := collector.Collect(context.Background()) + require.NoError(t, err) + + stats := event.Data.([]*performance.CgroupPSIStats) + require.Len(t, stats, 1) + + // CPU should be nil (parse failed), but memory should work + assert.Nil(t, stats[0].CPU, "Malformed CPU PSI should be skipped") + assert.NotNil(t, stats[0].Memory, "Valid memory PSI should be parsed") + }) +} diff --git a/pkg/performance/collectors/psi.go b/pkg/performance/collectors/psi.go new file mode 100644 index 00000000..3104581f --- /dev/null +++ b/pkg/performance/collectors/psi.go @@ -0,0 +1,208 @@ +// Copyright Antimetal, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +package collectors + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/antimetal/agent/pkg/performance" + "github.com/go-logr/logr" +) + +func init() { + performance.Register(performance.MetricTypePSI, performance.PartialNewContinuousPointCollector( + func(logger logr.Logger, config performance.CollectionConfig) (performance.PointCollector, error) { + return NewPSICollector(logger, config) + }, + )) +} + +var _ performance.PointCollector = (*PSICollector)(nil) + +// PSICollector collects Pressure Stall Information from /proc/pressure/ +// PSI provides metrics about resource contention (CPU, memory, I/O) +// Reference: https://www.kernel.org/doc/html/latest/accounting/psi.html +// Available since kernel 4.20 +type PSICollector struct { + performance.BaseCollector + cpuPath string + memoryPath string + ioPath string +} + +func NewPSICollector(logger logr.Logger, config performance.CollectionConfig) (*PSICollector, error) { + if err := config.Validate(performance.ValidateOptions{RequireHostProcPath: true}); err != nil { + return nil, err + } + + capabilities := performance.CollectorCapabilities{ + SupportsOneShot: true, + SupportsContinuous: false, + RequiredCapabilities: nil, + MinKernelVersion: "4.20.0", + } + + pressureDir := filepath.Join(config.HostProcPath, "pressure") + if _, err := os.Stat(pressureDir); err != nil { + return nil, fmt.Errorf("PSI not available (kernel < 4.20 or CONFIG_PSI=n): %w", err) + } + + return &PSICollector{ + BaseCollector: performance.NewBaseCollector( + performance.MetricTypePSI, + "Pressure Stall Information Collector", + logger, + config, + capabilities, + ), + cpuPath: filepath.Join(pressureDir, "cpu"), + memoryPath: filepath.Join(pressureDir, "memory"), + ioPath: filepath.Join(pressureDir, "io"), + }, nil +} + +func (c *PSICollector) Collect(ctx context.Context) (performance.Event, error) { + stats, err := c.collectPSIStats() + if err != nil { + return performance.Event{}, err + } + return performance.Event{Metric: performance.MetricTypePSI, Data: stats}, nil +} + +func (c *PSICollector) collectPSIStats() (*performance.PSIStats, error) { + stats := &performance.PSIStats{} + + cpu, err := c.readPSIFile(c.cpuPath) + if err != nil { + return nil, fmt.Errorf("failed to read CPU pressure: %w", err) + } + stats.CPU = cpu + + memory, err := c.readPSIFile(c.memoryPath) + if err != nil { + return nil, fmt.Errorf("failed to read memory pressure: %w", err) + } + stats.Memory = memory + + io, err := c.readPSIFile(c.ioPath) + if err != nil { + return nil, fmt.Errorf("failed to read I/O pressure: %w", err) + } + stats.IO = io + + return stats, nil +} + +func (c *PSICollector) readPSIFile(path string) (*performance.PSIResourceStats, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read %s: %w", path, err) + } + + return ParsePSIData(string(data)) +} + +// ParsePSIData parses PSI file content into PSIResourceStats +// Format: two lines with "some" and "full" prefix, followed by key=value pairs +// Used by both system-level and cgroup PSI collectors +func ParsePSIData(data string) (*performance.PSIResourceStats, error) { + stats := &performance.PSIResourceStats{} + trimmedData := strings.TrimSpace(data) + + // Empty content is valid - return zero values for graceful degradation + if trimmedData == "" { + return stats, nil + } + + lines := strings.Split(trimmedData, "\n") + validLinesFound := false + + for _, line := range lines { + line = strings.TrimSpace(line) + if line == "" { + continue + } + + if strings.HasPrefix(line, "some ") { + if err := parsePSILine(strings.TrimPrefix(line, "some "), &stats.SomeAvg10, &stats.SomeAvg60, &stats.SomeAvg300, &stats.SomeTotal); err != nil { + return nil, fmt.Errorf("failed to parse 'some' line: %w", err) + } + validLinesFound = true + } else if strings.HasPrefix(line, "full ") { + if err := parsePSILine(strings.TrimPrefix(line, "full "), &stats.FullAvg10, &stats.FullAvg60, &stats.FullAvg300, &stats.FullTotal); err != nil { + return nil, fmt.Errorf("failed to parse 'full' line: %w", err) + } + validLinesFound = true + } + } + + // Non-empty content with no valid PSI lines is an error (invalid/malformed file) + if !validLinesFound { + return nil, fmt.Errorf("no valid PSI data found") + } + + return stats, nil +} + +func parsePSILine(line string, avg10, avg60, avg300 *float64, total *uint64) error { + fields := strings.Fields(line) + + for _, field := range fields { + parts := strings.SplitN(field, "=", 2) + if len(parts) != 2 { + continue + } + + key, value := parts[0], parts[1] + + switch key { + case "avg10": + f, err := parsePSIPercentage(value) + if err != nil { + return fmt.Errorf("failed to parse avg10: %w", err) + } + *avg10 = f + case "avg60": + f, err := parsePSIPercentage(value) + if err != nil { + return fmt.Errorf("failed to parse avg60: %w", err) + } + *avg60 = f + case "avg300": + f, err := parsePSIPercentage(value) + if err != nil { + return fmt.Errorf("failed to parse avg300: %w", err) + } + *avg300 = f + case "total": + t, err := strconv.ParseUint(value, 10, 64) + if err != nil { + return fmt.Errorf("failed to parse total: %w", err) + } + *total = t + } + } + + return nil +} + +// parsePSIPercentage parses a PSI percentage value and validates it's in range [0, 100] +func parsePSIPercentage(value string) (float64, error) { + f, err := strconv.ParseFloat(value, 64) + if err != nil { + return 0, err + } + if f < 0 || f > 100 { + return 0, fmt.Errorf("value %.2f out of valid range [0, 100]", f) + } + return f, nil +} diff --git a/pkg/performance/collectors/psi_test.go b/pkg/performance/collectors/psi_test.go new file mode 100644 index 00000000..ef0e6240 --- /dev/null +++ b/pkg/performance/collectors/psi_test.go @@ -0,0 +1,474 @@ +// Copyright Antimetal, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +//go:build !integration + +package collectors_test + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/antimetal/agent/pkg/performance" + "github.com/antimetal/agent/pkg/performance/collectors" + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Test data fixtures - realistic PSI output from real systems + +const ( + validCPUPSI = `some avg10=0.00 avg60=0.05 avg300=0.10 total=1234567890 +full avg10=0.00 avg60=0.00 avg300=0.00 total=0` + + validMemoryPSI = `some avg10=12.50 avg60=15.20 avg300=18.75 total=9876543210 +full avg10=5.25 avg60=8.10 avg300=12.30 total=5432109876` + + validIOPSI = `some avg10=2.30 avg60=3.45 avg300=4.67 total=3333333333 +full avg10=1.10 avg60=1.50 avg300=2.00 total=1111111111` + + zeroPSI = `some avg10=0.00 avg60=0.00 avg300=0.00 total=0 +full avg10=0.00 avg60=0.00 avg300=0.00 total=0` + + highPressurePSI = `some avg10=95.50 avg60=90.25 avg300=85.75 total=999999999999 +full avg10=75.00 avg60=70.50 avg300=65.25 total=888888888888` + + malformedPSI = `some avg10=invalid avg60=0.00 avg300=0.00 total=0 +full avg10=0.00 avg60=0.00 avg300=0.00 total=0` + + emptyPSI = `` +) + +// TestPSICollector_Constructor tests PSI collector initialization +func TestPSICollector_Constructor(t *testing.T) { + t.Run("valid configuration", func(t *testing.T) { + tmpDir := t.TempDir() + pressureDir := filepath.Join(tmpDir, "pressure") + require.NoError(t, os.MkdirAll(pressureDir, 0755)) + + // Create pressure files + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "cpu"), []byte(validCPUPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "memory"), []byte(validMemoryPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "io"), []byte(validIOPSI), 0644)) + + config := performance.CollectionConfig{ + HostProcPath: tmpDir, + } + + collector, err := collectors.NewPSICollector(logr.Discard(), config) + require.NoError(t, err) + require.NotNil(t, collector) + }) + + t.Run("missing pressure directory - PSI not available", func(t *testing.T) { + tmpDir := t.TempDir() + + config := performance.CollectionConfig{ + HostProcPath: tmpDir, + } + + collector, err := collectors.NewPSICollector(logr.Discard(), config) + assert.Error(t, err, "Should fail when /proc/pressure doesn't exist") + assert.Nil(t, collector) + assert.Contains(t, err.Error(), "PSI not available") + }) + + t.Run("missing host proc path", func(t *testing.T) { + config := performance.CollectionConfig{ + HostProcPath: "", + } + + collector, err := collectors.NewPSICollector(logr.Discard(), config) + assert.Error(t, err, "Should fail with missing HostProcPath") + assert.Nil(t, collector) + }) +} + +// TestPSICollector_Collect tests PSI data collection +func TestPSICollector_Collect(t *testing.T) { + t.Run("collect all PSI metrics successfully", func(t *testing.T) { + tmpDir := t.TempDir() + pressureDir := filepath.Join(tmpDir, "pressure") + require.NoError(t, os.MkdirAll(pressureDir, 0755)) + + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "cpu"), []byte(validCPUPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "memory"), []byte(validMemoryPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "io"), []byte(validIOPSI), 0644)) + + config := performance.CollectionConfig{HostProcPath: tmpDir} + collector, err := collectors.NewPSICollector(logr.Discard(), config) + require.NoError(t, err) + + event, err := collector.Collect(context.Background()) + require.NoError(t, err) + + stats, ok := event.Data.(*performance.PSIStats) + require.True(t, ok, "Event data should be PSIStats") + + // Verify CPU pressure + require.NotNil(t, stats.CPU) + assert.Equal(t, 0.00, stats.CPU.SomeAvg10) + assert.Equal(t, 0.05, stats.CPU.SomeAvg60) + assert.Equal(t, 0.10, stats.CPU.SomeAvg300) + assert.Equal(t, uint64(1234567890), stats.CPU.SomeTotal) + assert.Equal(t, 0.00, stats.CPU.FullAvg10) // CPU full is always 0 + + // Verify Memory pressure + require.NotNil(t, stats.Memory) + assert.Equal(t, 12.50, stats.Memory.SomeAvg10) + assert.Equal(t, 15.20, stats.Memory.SomeAvg60) + assert.Equal(t, 18.75, stats.Memory.SomeAvg300) + assert.Equal(t, uint64(9876543210), stats.Memory.SomeTotal) + assert.Equal(t, 5.25, stats.Memory.FullAvg10) + assert.Equal(t, 8.10, stats.Memory.FullAvg60) + assert.Equal(t, 12.30, stats.Memory.FullAvg300) + assert.Equal(t, uint64(5432109876), stats.Memory.FullTotal) + + // Verify I/O pressure + require.NotNil(t, stats.IO) + assert.Equal(t, 2.30, stats.IO.SomeAvg10) + assert.Equal(t, 3.45, stats.IO.SomeAvg60) + assert.Equal(t, 4.67, stats.IO.SomeAvg300) + assert.Equal(t, uint64(3333333333), stats.IO.SomeTotal) + }) + + t.Run("zero pressure values", func(t *testing.T) { + tmpDir := t.TempDir() + pressureDir := filepath.Join(tmpDir, "pressure") + require.NoError(t, os.MkdirAll(pressureDir, 0755)) + + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "cpu"), []byte(zeroPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "memory"), []byte(zeroPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "io"), []byte(zeroPSI), 0644)) + + config := performance.CollectionConfig{HostProcPath: tmpDir} + collector, err := collectors.NewPSICollector(logr.Discard(), config) + require.NoError(t, err) + + event, err := collector.Collect(context.Background()) + require.NoError(t, err) + + stats := event.Data.(*performance.PSIStats) + assert.Equal(t, 0.00, stats.CPU.SomeAvg10) + assert.Equal(t, uint64(0), stats.CPU.SomeTotal) + assert.Equal(t, 0.00, stats.Memory.SomeAvg10) + assert.Equal(t, uint64(0), stats.Memory.SomeTotal) + }) + + t.Run("high pressure scenario", func(t *testing.T) { + tmpDir := t.TempDir() + pressureDir := filepath.Join(tmpDir, "pressure") + require.NoError(t, os.MkdirAll(pressureDir, 0755)) + + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "cpu"), []byte(highPressurePSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "memory"), []byte(highPressurePSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "io"), []byte(highPressurePSI), 0644)) + + config := performance.CollectionConfig{HostProcPath: tmpDir} + collector, err := collectors.NewPSICollector(logr.Discard(), config) + require.NoError(t, err) + + event, err := collector.Collect(context.Background()) + require.NoError(t, err) + + stats := event.Data.(*performance.PSIStats) + assert.Equal(t, 95.50, stats.CPU.SomeAvg10) + assert.Equal(t, 75.00, stats.Memory.FullAvg10) + }) +} + +// TestPSICollector_ErrorHandling tests error scenarios +func TestPSICollector_ErrorHandling(t *testing.T) { + t.Run("missing CPU pressure file", func(t *testing.T) { + tmpDir := t.TempDir() + pressureDir := filepath.Join(tmpDir, "pressure") + require.NoError(t, os.MkdirAll(pressureDir, 0755)) + + // Only create memory and io files + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "memory"), []byte(validMemoryPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "io"), []byte(validIOPSI), 0644)) + + config := performance.CollectionConfig{HostProcPath: tmpDir} + collector, err := collectors.NewPSICollector(logr.Discard(), config) + require.NoError(t, err) + + _, err = collector.Collect(context.Background()) + assert.Error(t, err, "Should fail when CPU pressure file is missing") + assert.Contains(t, err.Error(), "CPU pressure") + }) + + t.Run("malformed PSI data", func(t *testing.T) { + tmpDir := t.TempDir() + pressureDir := filepath.Join(tmpDir, "pressure") + require.NoError(t, os.MkdirAll(pressureDir, 0755)) + + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "cpu"), []byte(malformedPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "memory"), []byte(validMemoryPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "io"), []byte(validIOPSI), 0644)) + + config := performance.CollectionConfig{HostProcPath: tmpDir} + collector, err := collectors.NewPSICollector(logr.Discard(), config) + require.NoError(t, err) + + _, err = collector.Collect(context.Background()) + assert.Error(t, err, "Should fail with malformed PSI data") + }) + + t.Run("empty PSI file", func(t *testing.T) { + tmpDir := t.TempDir() + pressureDir := filepath.Join(tmpDir, "pressure") + require.NoError(t, os.MkdirAll(pressureDir, 0755)) + + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "cpu"), []byte(emptyPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "memory"), []byte(validMemoryPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "io"), []byte(validIOPSI), 0644)) + + config := performance.CollectionConfig{HostProcPath: tmpDir} + collector, err := collectors.NewPSICollector(logr.Discard(), config) + require.NoError(t, err) + + event, err := collector.Collect(context.Background()) + require.NoError(t, err, "Empty PSI should parse successfully with zero values") + + stats := event.Data.(*performance.PSIStats) + assert.NotNil(t, stats.CPU) + }) +} + +// TestPSIParsing_EdgeCases tests PSI parsing edge cases +func TestPSIParsing_EdgeCases(t *testing.T) { + testCases := []struct { + name string + psiContent string + expectError bool + description string + }{ + { + name: "whitespace variations", + psiContent: `some avg10=1.23 avg60=2.34 avg300=3.45 total=12345 +full avg10=0.12 avg60=0.23 avg300=0.34 total=1234`, + expectError: false, + description: "Should handle extra whitespace", + }, + { + name: "only some line", + psiContent: `some avg10=1.00 avg60=2.00 avg300=3.00 total=100`, + expectError: false, + description: "Should handle missing full line", + }, + { + name: "decimal precision", + psiContent: `some avg10=0.123456 avg60=1.234567 avg300=2.345678 total=123456789012345 +full avg10=0.000001 avg60=0.000002 avg300=0.000003 total=1`, + expectError: false, + description: "Should handle high decimal precision", + }, + { + name: "missing avg fields", + psiContent: `some avg10=1.00 total=100`, + expectError: false, + description: "Should handle missing avg fields (defaults to 0)", + }, + { + name: "missing total field", + psiContent: `some avg10=1.00 avg60=2.00 avg300=3.00`, + expectError: false, + description: "Should handle missing total (defaults to 0)", + }, + { + name: "negative values", + psiContent: `some avg10=-1.00 avg60=2.00 avg300=3.00 total=100`, + expectError: true, + description: "Should reject negative percentage values", + }, + { + name: "values over 100", + psiContent: `some avg10=150.00 avg60=2.00 avg300=3.00 total=100`, + expectError: true, + description: "Should reject percentage values over 100", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tmpDir := t.TempDir() + pressureDir := filepath.Join(tmpDir, "pressure") + require.NoError(t, os.MkdirAll(pressureDir, 0755)) + + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "cpu"), []byte(tc.psiContent), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "memory"), []byte(validMemoryPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "io"), []byte(validIOPSI), 0644)) + + config := performance.CollectionConfig{HostProcPath: tmpDir} + collector, err := collectors.NewPSICollector(logr.Discard(), config) + require.NoError(t, err) + + event, err := collector.Collect(context.Background()) + + if tc.expectError { + assert.Error(t, err, tc.description) + } else { + assert.NoError(t, err, tc.description) + stats := event.Data.(*performance.PSIStats) + assert.NotNil(t, stats.CPU, tc.description) + } + }) + } +} + +// TestPSIParsing_RealWorldData tests parsing with real PSI output from different systems +func TestPSIParsing_RealWorldData(t *testing.T) { + testCases := []struct { + name string + cpuPSI string + memoryPSI string + ioPSI string + verifyFunc func(*testing.T, *performance.PSIStats) + }{ + { + name: "idle system", + cpuPSI: `some avg10=0.00 avg60=0.00 avg300=0.00 total=123456 +full avg10=0.00 avg60=0.00 avg300=0.00 total=0`, + memoryPSI: `some avg10=0.00 avg60=0.00 avg300=0.00 total=234567 +full avg10=0.00 avg60=0.00 avg300=0.00 total=0`, + ioPSI: `some avg10=0.00 avg60=0.00 avg300=0.00 total=345678 +full avg10=0.00 avg60=0.00 avg300=0.00 total=0`, + verifyFunc: func(t *testing.T, stats *performance.PSIStats) { + assert.Equal(t, 0.00, stats.CPU.SomeAvg10, "Idle system should have zero averages") + assert.Greater(t, stats.CPU.SomeTotal, uint64(0), "Total should accumulate even when idle") + }, + }, + { + name: "memory constrained system", + cpuPSI: validCPUPSI, + memoryPSI: `some avg10=45.50 avg60=42.30 avg300=40.10 total=99999999999 +full avg10=25.20 avg60=22.10 avg300=20.50 total=55555555555`, + ioPSI: validIOPSI, + verifyFunc: func(t *testing.T, stats *performance.PSIStats) { + assert.Greater(t, stats.Memory.SomeAvg10, 40.0, "Memory constrained system") + assert.Greater(t, stats.Memory.FullAvg10, 20.0, "Should have full memory stalls") + }, + }, + { + name: "I/O constrained system", + cpuPSI: validCPUPSI, + memoryPSI: validMemoryPSI, + ioPSI: `some avg10=65.75 avg60=60.50 avg300=55.25 total=77777777777 +full avg10=45.50 avg60=42.30 avg300=40.10 total=44444444444`, + verifyFunc: func(t *testing.T, stats *performance.PSIStats) { + assert.Greater(t, stats.IO.SomeAvg10, 60.0, "I/O constrained system") + assert.Greater(t, stats.IO.FullAvg10, 40.0, "Should have full I/O stalls") + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tmpDir := t.TempDir() + pressureDir := filepath.Join(tmpDir, "pressure") + require.NoError(t, os.MkdirAll(pressureDir, 0755)) + + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "cpu"), []byte(tc.cpuPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "memory"), []byte(tc.memoryPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "io"), []byte(tc.ioPSI), 0644)) + + config := performance.CollectionConfig{HostProcPath: tmpDir} + collector, err := collectors.NewPSICollector(logr.Discard(), config) + require.NoError(t, err) + + event, err := collector.Collect(context.Background()) + require.NoError(t, err) + + stats := event.Data.(*performance.PSIStats) + tc.verifyFunc(t, stats) + }) + } +} + +// TestPSIParsing_CPUNofull tests that CPU "full" is always zero +func TestPSIParsing_CPUNoFull(t *testing.T) { + tmpDir := t.TempDir() + pressureDir := filepath.Join(tmpDir, "pressure") + require.NoError(t, os.MkdirAll(pressureDir, 0755)) + + // CPU pressure with non-zero full values (shouldn't happen in real systems post-5.13) + cpuPSI := `some avg10=5.00 avg60=4.00 avg300=3.00 total=1000000 +full avg10=0.00 avg60=0.00 avg300=0.00 total=0` + + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "cpu"), []byte(cpuPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "memory"), []byte(validMemoryPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "io"), []byte(validIOPSI), 0644)) + + config := performance.CollectionConfig{HostProcPath: tmpDir} + collector, err := collectors.NewPSICollector(logr.Discard(), config) + require.NoError(t, err) + + event, err := collector.Collect(context.Background()) + require.NoError(t, err) + + stats := event.Data.(*performance.PSIStats) + assert.Equal(t, 0.00, stats.CPU.FullAvg10, "CPU full should be zero at system level") + assert.Equal(t, 0.00, stats.CPU.FullAvg60, "CPU full should be zero at system level") + assert.Equal(t, 0.00, stats.CPU.FullAvg300, "CPU full should be zero at system level") + assert.Equal(t, uint64(0), stats.CPU.FullTotal, "CPU full total should be zero") +} + +// TestPSICollector_FilePermissions tests permission handling +func TestPSICollector_FilePermissions(t *testing.T) { + if os.Getuid() == 0 { + t.Skip("Skipping permission test when running as root") + } + + t.Run("unreadable pressure file", func(t *testing.T) { + tmpDir := t.TempDir() + pressureDir := filepath.Join(tmpDir, "pressure") + require.NoError(t, os.MkdirAll(pressureDir, 0755)) + + cpuFile := filepath.Join(pressureDir, "cpu") + require.NoError(t, os.WriteFile(cpuFile, []byte(validCPUPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "memory"), []byte(validMemoryPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "io"), []byte(validIOPSI), 0644)) + + // Remove read permission + require.NoError(t, os.Chmod(cpuFile, 0000)) + defer func() { _ = os.Chmod(cpuFile, 0644) }() + + config := performance.CollectionConfig{HostProcPath: tmpDir} + collector, err := collectors.NewPSICollector(logr.Discard(), config) + require.NoError(t, err) + + _, err = collector.Collect(context.Background()) + assert.Error(t, err, "Should fail when pressure file is unreadable") + }) +} + +// TestPSICollector_LargeValues tests handling of very large total values +func TestPSICollector_LargeValues(t *testing.T) { + tmpDir := t.TempDir() + pressureDir := filepath.Join(tmpDir, "pressure") + require.NoError(t, os.MkdirAll(pressureDir, 0755)) + + // Large total values (systems that have been up for a long time) + largeTotalPSI := `some avg10=1.00 avg60=2.00 avg300=3.00 total=18446744073709551615 +full avg10=0.50 avg60=1.00 avg300=1.50 total=9223372036854775807` + + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "cpu"), []byte(largeTotalPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "memory"), []byte(largeTotalPSI), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(pressureDir, "io"), []byte(largeTotalPSI), 0644)) + + config := performance.CollectionConfig{HostProcPath: tmpDir} + collector, err := collectors.NewPSICollector(logr.Discard(), config) + require.NoError(t, err) + + event, err := collector.Collect(context.Background()) + require.NoError(t, err, "Should handle large uint64 values") + + stats := event.Data.(*performance.PSIStats) + assert.Equal(t, uint64(18446744073709551615), stats.CPU.SomeTotal, "Should preserve max uint64") +} diff --git a/pkg/performance/types.go b/pkg/performance/types.go index 76255c4d..018ddf50 100644 --- a/pkg/performance/types.go +++ b/pkg/performance/types.go @@ -27,9 +27,11 @@ const ( MetricTypeKernel MetricType = "kernel" MetricTypeSystem MetricType = "system" MetricTypeNUMAStats MetricType = "numa_stats" + MetricTypePSI MetricType = "psi" // Pressure Stall Information // Runtime Container Statistics MetricTypeCgroupCPU MetricType = "cgroup_cpu" MetricTypeCgroupMemory MetricType = "cgroup_memory" + MetricTypeCgroupPSI MetricType = "cgroup_psi" // Pressure Stall Information per container MetricTypeCgroupIO MetricType = "cgroup_io" // Future MetricTypeCgroupNetwork MetricType = "cgroup_network" // Future // Hardware configuration collectors @@ -159,6 +161,35 @@ type LoadStats struct { Uptime time.Duration `json:"uptime"` } +// PSIStats represents Pressure Stall Information for system-level resources +// Data sourced from /proc/pressure/{cpu,memory,io} +// Reference: https://www.kernel.org/doc/html/latest/accounting/psi.html +// Available since kernel 4.20 +type PSIStats struct { + CPU *PSIResourceStats `json:"cpu"` // CPU pressure from /proc/pressure/cpu + Memory *PSIResourceStats `json:"memory"` // Memory pressure from /proc/pressure/memory + IO *PSIResourceStats `json:"io"` // I/O pressure from /proc/pressure/io +} + +// PSIResourceStats represents PSI metrics for a single resource (CPU, memory, or I/O) +// Each file contains two lines: "some" and "full" with time-averaged percentages +// "some" = at least some tasks were stalled +// "full" = all non-idle tasks were stalled simultaneously (CPU "full" is always 0) +type PSIResourceStats struct { + // "some" metrics - at least one task stalled + SomeAvg10 float64 `json:"some_avg10"` // 10-second average percentage + SomeAvg60 float64 `json:"some_avg60"` // 60-second average percentage + SomeAvg300 float64 `json:"some_avg300"` // 300-second average percentage + SomeTotal uint64 `json:"some_total"` // Absolute stall time in microseconds (cumulative counter) + + // "full" metrics - all non-idle tasks stalled + // Note: For CPU, "full" is undefined at system level (always 0) + FullAvg10 float64 `json:"full_avg10"` // 10-second average percentage + FullAvg60 float64 `json:"full_avg60"` // 60-second average percentage + FullAvg300 float64 `json:"full_avg300"` // 300-second average percentage + FullTotal uint64 `json:"full_total"` // Absolute stall time in microseconds (cumulative counter) +} + // MemoryStats represents runtime memory usage statistics from /proc/meminfo // Used by MemoryCollector for operational monitoring and performance analysis type MemoryStats struct { @@ -587,9 +618,11 @@ func DefaultCollectionConfig() CollectionConfig { MetricTypeTCP: true, MetricTypeSystem: true, MetricTypeKernel: true, + MetricTypePSI: true, // Runtime container resource collectors MetricTypeCgroupCPU: true, MetricTypeCgroupMemory: true, + MetricTypeCgroupPSI: true, // Hardware configuration collectors MetricTypeCPUInfo: true, MetricTypeMemoryInfo: true, @@ -896,6 +929,22 @@ type CgroupMemoryStats struct { CachePercent float64 // Cache as percentage of total usage } +// CgroupPSIStats represents Pressure Stall Information for a container/cgroup +// Data sourced from cgroup {cpu,memory,io}.pressure files +// Reference: https://www.kernel.org/doc/html/latest/accounting/psi.html +// Available in cgroup v2 and some cgroup v1 configurations (kernel 4.20+) +type CgroupPSIStats struct { + // Container identification + ContainerID string + ContainerName string // If available from runtime + CgroupPath string + + // PSI metrics per resource + CPU *PSIResourceStats `json:"cpu"` // CPU pressure from cpu.pressure + Memory *PSIResourceStats `json:"memory"` // Memory pressure from memory.pressure + IO *PSIResourceStats `json:"io"` // I/O pressure from io.pressure +} + // ContainerInfo provides container runtime metadata type ContainerInfo struct { ID string diff --git a/pkg/performance/types_test.go b/pkg/performance/types_test.go index bbc4f921..41a530b4 100644 --- a/pkg/performance/types_test.go +++ b/pkg/performance/types_test.go @@ -32,9 +32,11 @@ func TestCollectionConfig_ApplyDefaults(t *testing.T) { MetricTypeTCP: true, MetricTypeSystem: true, MetricTypeKernel: true, + MetricTypePSI: true, // Runtime container resource collectors MetricTypeCgroupCPU: true, MetricTypeCgroupMemory: true, + MetricTypeCgroupPSI: true, // Hardware configuration collectors MetricTypeCPUInfo: true, MetricTypeMemoryInfo: true, @@ -65,9 +67,11 @@ func TestCollectionConfig_ApplyDefaults(t *testing.T) { MetricTypeTCP: true, MetricTypeSystem: true, MetricTypeKernel: true, + MetricTypePSI: true, // Runtime container resource collectors MetricTypeCgroupCPU: true, MetricTypeCgroupMemory: true, + MetricTypeCgroupPSI: true, // Hardware configuration collectors MetricTypeCPUInfo: true, MetricTypeMemoryInfo: true,