From f5f4dfe511878ef6b97b9998ac6d3a3dadc98186 Mon Sep 17 00:00:00 2001 From: thiagoftsm Date: Mon, 4 May 2026 14:41:43 +0000 Subject: [PATCH 1/5] tests: Add more unit tests (P1) --- gotests/main_test.go | 379 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 379 insertions(+) diff --git a/gotests/main_test.go b/gotests/main_test.go index 4f327810..dc780b60 100644 --- a/gotests/main_test.go +++ b/gotests/main_test.go @@ -569,3 +569,382 @@ func TestUnitTestEnv(t *testing.T) { } }) } + +func TestMapDataHelpers(t *testing.T) { + t.Run("roundUpSize aligns to boundary", func(t *testing.T) { + cases := []struct{ value, align, want int }{ + {0, 8, 0}, + {1, 8, 8}, + {7, 8, 8}, + {8, 8, 8}, + {9, 8, 16}, + {3, 4, 4}, + {4, 4, 4}, + {5, 4, 8}, + } + for _, tc := range cases { + if got := roundUpSize(tc.value, tc.align); got != tc.want { + t.Fatalf("roundUpSize(%d, %d) = %d, want %d", tc.value, tc.align, got, tc.want) + } + } + }) + + t.Run("mapValueStride returns raw size for non-percpu", func(t *testing.T) { + meta := mapMeta{Type: bpfMapTypeHash, ValueSize: 10} + if got := mapValueStride(meta); got != 10 { + t.Fatalf("unexpected stride for hash map: %d", got) + } + }) + + t.Run("mapValueStride rounds up for percpu", func(t *testing.T) { + cases := []struct { + mapType uint32 + valueSize uint32 + want int + }{ + {bpfMapTypePerCPUHash, 10, 16}, + {bpfMapTypePerCPUArray, 8, 8}, + {bpfMapTypePerCPUHash, 1, 8}, + {bpfMapTypePerCPUArray, 17, 24}, + } + for _, tc := range cases { + meta := mapMeta{Type: tc.mapType, ValueSize: tc.valueSize} + if got := mapValueStride(meta); got != tc.want { + t.Fatalf("mapValueStride(type=%d, size=%d) = %d, want %d", tc.mapType, tc.valueSize, got, tc.want) + } + } + }) + + t.Run("mapValueLength ignores nprocesses for non-percpu", func(t *testing.T) { + meta := mapMeta{Type: bpfMapTypeHash, ValueSize: 10} + if got := mapValueLength(meta, 4); got != 10 { + t.Fatalf("unexpected length: %d", got) + } + }) + + t.Run("mapValueLength multiplies stride by nprocesses for percpu", func(t *testing.T) { + meta := mapMeta{Type: bpfMapTypePerCPUHash, ValueSize: 10} + // stride=16, nprocesses=4 → 64 + if got := mapValueLength(meta, 4); got != 64 { + t.Fatalf("unexpected percpu length: %d", got) + } + }) + + t.Run("mapValueLength clamps nprocesses below 1 to 1", func(t *testing.T) { + meta := mapMeta{Type: bpfMapTypePerCPUHash, ValueSize: 10} + // stride=16, nprocesses clamped to 1 → 16 + if got := mapValueLength(meta, 0); got != 16 { + t.Fatalf("unexpected clamped length: %d", got) + } + }) + + t.Run("controllerEntryLimit returns constant when MaxEntries exceeds it", func(t *testing.T) { + meta := mapMeta{MaxEntries: 100} + if got := controllerEntryLimit(meta); got != netdataControllerEnd { + t.Fatalf("unexpected limit: got %d want %d", got, netdataControllerEnd) + } + }) + + t.Run("controllerEntryLimit returns MaxEntries when smaller than constant", func(t *testing.T) { + meta := mapMeta{MaxEntries: 3} + if got := controllerEntryLimit(meta); got != 3 { + t.Fatalf("unexpected limit: got %d want 3", got) + } + }) + + t.Run("controllerEntryLimit treats zero MaxEntries as no constraint", func(t *testing.T) { + meta := mapMeta{MaxEntries: 0} + if got := controllerEntryLimit(meta); got != netdataControllerEnd { + t.Fatalf("unexpected limit for zero MaxEntries: got %d want %d", got, netdataControllerEnd) + } + }) + + t.Run("fillScalarValue writes 8-byte little-endian", func(t *testing.T) { + dst := make([]byte, 8) + fillScalarValue(dst, 8, 0x0102030405060708) + want := []byte{0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01} + if !bytes.Equal(dst, want) { + t.Fatalf("unexpected 8-byte encoding: got %v want %v", dst, want) + } + }) + + t.Run("fillScalarValue writes 4-byte little-endian when valueSize < 8", func(t *testing.T) { + dst := make([]byte, 4) + fillScalarValue(dst, 4, 0x12345678) + want := []byte{0x78, 0x56, 0x34, 0x12} + if !bytes.Equal(dst, want) { + t.Fatalf("unexpected 4-byte encoding: got %v want %v", dst, want) + } + }) + + t.Run("fillScalarValue falls through to 4-byte when buffer too short for 8", func(t *testing.T) { + dst := make([]byte, 4) + fillScalarValue(dst, 8, 0x12345678) + want := []byte{0x78, 0x56, 0x34, 0x12} + if !bytes.Equal(dst, want) { + t.Fatalf("unexpected fallback 4-byte encoding: got %v want %v", dst, want) + } + }) + + t.Run("fillScalarValue does not write when valueSize too small", func(t *testing.T) { + dst := make([]byte, 8) + fillScalarValue(dst, 2, 0xFFFF) + if !bytes.Equal(dst, make([]byte, 8)) { + t.Fatalf("expected no write for valueSize=2: %v", dst) + } + }) + + t.Run("allocateTableData sizes slices from mapValueLength", func(t *testing.T) { + meta := mapMeta{Type: bpfMapTypePerCPUHash, KeySize: 4, ValueSize: 10, MaxEntries: 8} + td := allocateTableData(meta, 2) + // stride=16, nprocesses=2 → valueLength=32 + if len(td.key) != 4 || len(td.nextKey) != 4 { + t.Fatalf("unexpected key lengths: key=%d nextKey=%d", len(td.key), len(td.nextKey)) + } + if len(td.value) != 32 || len(td.defValue) != 32 { + t.Fatalf("unexpected value lengths: value=%d defValue=%d", len(td.value), len(td.defValue)) + } + if td.keyLength != 4 || td.valueLength != 32 { + t.Fatalf("unexpected stored lengths: keyLength=%d valueLength=%d", td.keyLength, td.valueLength) + } + }) +} + +func TestMapTypePredicates(t *testing.T) { + percpuTypes := []uint32{bpfMapTypePerCPUHash, bpfMapTypePerCPUArray} + nonPercpuTypes := []uint32{bpfMapTypeHash, bpfMapTypeArray, bpfMapTypeRingBuf, bpfMapTypeUserRingBuf} + + for _, mt := range percpuTypes { + if !isPerCPUMapType(mt) { + t.Fatalf("expected map type %d to be percpu", mt) + } + } + for _, mt := range nonPercpuTypes { + if isPerCPUMapType(mt) { + t.Fatalf("expected map type %d to not be percpu", mt) + } + } + + ringbufTypes := []uint32{bpfMapTypeRingBuf, bpfMapTypeUserRingBuf} + nonRingbufTypes := []uint32{bpfMapTypeHash, bpfMapTypeArray, bpfMapTypePerCPUHash, bpfMapTypePerCPUArray} + + for _, mt := range ringbufTypes { + if !isRingBufferMapType(mt) { + t.Fatalf("expected map type %d to be ringbuf", mt) + } + } + for _, mt := range nonRingbufTypes { + if isRingBufferMapType(mt) { + t.Fatalf("expected map type %d to not be ringbuf", mt) + } + } + + if !isUserRingBufferMapType(bpfMapTypeUserRingBuf) { + t.Fatal("expected user_ringbuf to be user ringbuf type") + } + if isUserRingBufferMapType(bpfMapTypeRingBuf) { + t.Fatal("expected ringbuf to not be user ringbuf type") + } +} + +func TestModeSuffix(t *testing.T) { + if got := modeSuffix(false, false); got != "" { + t.Fatalf("unexpected plain mode suffix: %q", got) + } + if got := modeSuffix(true, false); got != "_buffer" { + t.Fatalf("unexpected buffer mode suffix: %q", got) + } + if got := modeSuffix(false, true); got != "_arena" { + t.Fatalf("unexpected arena mode suffix: %q", got) + } + // arena takes precedence over buffer + if got := modeSuffix(true, true); got != "_arena" { + t.Fatalf("arena must take precedence over buffer, got %q", got) + } +} + +func TestModuleModeLookup(t *testing.T) { + bufferArenaModules := []string{"cachestat", "dc", "fd", "oomkill", "process", "shm", "swap", "vfs", "dns"} + for _, name := range bufferArenaModules { + if !moduleHasBuffer(name) { + t.Fatalf("expected %q to have buffer support", name) + } + if !moduleHasArena(name) { + t.Fatalf("expected %q to have arena support", name) + } + } + + plainOnlyModules := []string{"btrfs", "disk", "ext4", "hardirq", "mdflush", "mount", "nfs", "network_viewer", "softirq", "socket", "xfs", "zfs"} + for _, name := range plainOnlyModules { + if moduleHasBuffer(name) { + t.Fatalf("expected %q to not have buffer support", name) + } + if moduleHasArena(name) { + t.Fatalf("expected %q to not have arena support", name) + } + } +} + +func TestFindOptionalName(t *testing.T) { + names := []specifyName{ + {programName: "netdata_foo", functionToAttach: "foo_fn"}, + {programName: "netdata_bar", functionToAttach: "bar_fn"}, + } + + got := findOptionalName(&names, "netdata_foo") + if got == nil || got.programName != "netdata_foo" { + t.Fatal("expected to find netdata_foo") + } + + if got := findOptionalName(&names, "netdata_baz"); got != nil { + t.Fatal("expected nil for absent name") + } + + if got := findOptionalName(nil, "netdata_foo"); got != nil { + t.Fatal("expected nil for nil slice") + } +} + +func TestSetCommonFlag(t *testing.T) { + got := setCommonFlag() + + included := []uint64{flagCachestat, flagDC, flagDisk, flagFD, flagSync, flagHardIRQ, + flagMount, flagNetworkViewer, flagOOMKill, flagProcess, flagSHM, flagSocket, + flagSoftIRQ, flagSwap, flagDNS} + for _, f := range included { + if got&f == 0 { + t.Fatalf("expected flag %#x to be included in setCommonFlag", f) + } + } + + excluded := []uint64{flagBtrfs, flagExt4, flagVFS, flagNFS, flagXFS, flagZFS, + flagMDFlush, flagContent, flagLoadBinary} + for _, f := range excluded { + if got&f != 0 { + t.Fatalf("expected flag %#x to be excluded from setCommonFlag", f) + } + } +} + +func TestDescribeError(t *testing.T) { + if got := describeError(0); got != "No error information" { + t.Fatalf("unexpected zero error description: %q", got) + } + + posDesc := describeError(int(syscall.ENOENT)) + negDesc := describeError(-int(syscall.ENOENT)) + if posDesc != negDesc { + t.Fatalf("positive and negative errno must yield the same description: %q vs %q", posDesc, negDesc) + } + if !strings.Contains(strings.ToLower(posDesc), "no such") { + t.Fatalf("unexpected ENOENT description: %q", posDesc) + } +} + +func TestResolveBinaryDir(t *testing.T) { + dir := t.TempDir() + if got := resolveBinaryDir(dir); got != dir { + t.Fatalf("unexpected resolved path: got %q want %q", got, dir) + } + + // empty input falls back to cwd — just verify it returns something non-empty + if got := resolveBinaryDir(""); got == "" { + t.Fatal("expected non-empty path for empty input") + } +} + +func TestWriteSupportedMapTypes(t *testing.T) { + supported := map[uint32]bool{ + bpfMapTypeHash: true, + bpfMapTypeArray: false, + bpfMapTypePerCPUHash: true, + bpfMapTypePerCPUArray: false, + bpfMapTypeRingBuf: false, + bpfMapTypeUserRingBuf: false, + } + + var out bytes.Buffer + writeSupportedMapTypes(&out, supported) + got := out.String() + + if !strings.HasPrefix(got, "[") || !strings.HasSuffix(got, "]") { + t.Fatalf("expected JSON array format, got %q", got) + } + if !strings.Contains(got, `"hash"`) { + t.Fatalf("expected hash in output: %s", got) + } + if !strings.Contains(got, `"percpu_hash"`) { + t.Fatalf("expected percpu_hash in output: %s", got) + } + if strings.Contains(got, `"array"`) { + t.Fatalf("array must not appear (disabled): %s", got) + } + if strings.Contains(got, `"ringbuf"`) { + t.Fatalf("ringbuf must not appear (disabled): %s", got) + } +} + +func TestCandidateVersionIndex(t *testing.T) { + cases := []struct { + name string + filename string + module string + rhf int + kernels uint32 + maxIndex uint32 + arenaMode bool + wantIndex int + }{ + { + name: "rhf 5.14 matches at index 7", + filename: "pnetdata_ebpf_swap.5.14.rhf.o", + module: "swap", rhf: 1, kernels: netdataV514, maxIndex: 7, + wantIndex: 7, + }, + { + name: "non-rhf masks out V514", + filename: "pnetdata_ebpf_swap.5.14.rhf.o", + module: "swap", rhf: -1, kernels: netdataV514, maxIndex: 10, + wantIndex: -1, + }, + { + name: "non-rhf 6.8 matches at index 10", + filename: "pnetdata_ebpf_swap.6.8.o", + module: "swap", rhf: -1, kernels: netdataV68, maxIndex: 10, + wantIndex: 10, + }, + { + name: "picks file version from multi-version kernel set", + filename: "pnetdata_ebpf_swap.5.4.o", + module: "swap", rhf: -1, kernels: netdataV54 | netdataV68, maxIndex: 10, + wantIndex: 4, + }, + { + name: "wrong module name returns -1", + filename: "pnetdata_ebpf_process.6.8.o", + module: "swap", rhf: -1, kernels: netdataV68, maxIndex: 10, + wantIndex: -1, + }, + { + name: "arena file matches with arenaMode enabled", + filename: "pnetdata_ebpf_swap_arena.6.12.o", + module: "swap", rhf: -1, kernels: netdataV612, maxIndex: 11, arenaMode: true, + wantIndex: 11, + }, + { + name: "arena file rejected without arenaMode", + filename: "pnetdata_ebpf_swap_arena.6.12.o", + module: "swap", rhf: -1, kernels: netdataV612, maxIndex: 11, arenaMode: false, + wantIndex: -1, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := candidateVersionIndex(tc.filename, tc.module, false, tc.rhf, tc.kernels, tc.maxIndex, false, tc.arenaMode) + if got != tc.wantIndex { + t.Fatalf("unexpected index: got %d want %d", got, tc.wantIndex) + } + }) + } +} From 4c5ea1eef9c844332a88239085d435d7c5cfe3a6 Mon Sep 17 00:00:00 2001 From: thiagoftsm Date: Mon, 4 May 2026 18:00:02 +0000 Subject: [PATCH 2/5] tests: Add socket tests --- gotests/main.go | 10 +- gotests/socket.go | 231 +++++++++++++++++++++++++++ gotests/socket_test.go | 337 ++++++++++++++++++++++++++++++++++++++ tests/Makefile | 2 +- tests/tester_socket.c | 355 +++++++++++++++++++++++++++++++++++++++++ tests/tester_socket.h | 11 ++ tests/tester_user.c | 13 +- 7 files changed, 951 insertions(+), 8 deletions(-) create mode 100644 gotests/socket.go create mode 100644 gotests/socket_test.go create mode 100644 tests/tester_socket.c create mode 100644 tests/tester_socket.h diff --git a/gotests/main.go b/gotests/main.go index 5f8dffa5..148882be 100644 --- a/gotests/main.go +++ b/gotests/main.go @@ -1223,10 +1223,14 @@ func ebpfTester(w io.Writer, filename string, names *[]specifyName, maps bool, c } if maps { - if ctrl != "" { - fillCtrl(obj, ctrl, opts.mapLevel, nprocesses) + if hasSocketTable(obj) { + runSocketTableTester(w, obj, opts.iterations) + } else { + if ctrl != "" { + fillCtrl(obj, ctrl, opts.mapLevel, nprocesses) + } + testMaps(w, obj, ctrl, opts.iterations, nprocesses) } - testMaps(w, obj, ctrl, opts.iterations, nprocesses) } for _, link := range summary.links { diff --git a/gotests/socket.go b/gotests/socket.go new file mode 100644 index 00000000..a6898cc0 --- /dev/null +++ b/gotests/socket.go @@ -0,0 +1,231 @@ +package main + +import ( + "encoding/binary" + "fmt" + "io" + "net" + "strings" + "syscall" + "time" +) + +const ( + socketSleepSec = 5 + socketNameLen = 16 + + // Binary offsets within netdata_socket_idx_t (40 bytes). + socketIdxSaddrOffset = 0 + socketIdxDaddrOffset = 16 + socketIdxDportOffset = 32 + socketIdxPidOffset = 36 + + // Binary offsets within netdata_socket_t (112 bytes). + socketValNameOffset = 0 + socketValProtocolOffset = 32 + socketValFamilyOffset = 34 + socketValTCPSentCalls = 40 + socketValTCPRecvCalls = 44 + socketValTCPBytesSent = 48 + socketValTCPBytesRecv = 56 + socketValTCPClose = 64 + socketValTCPRetransmit = 68 + socketValTCPIPv4Connect = 72 + socketValTCPIPv6Connect = 76 + socketValTCPState = 80 + socketValUDPSentCalls = 88 + socketValUDPRecvCalls = 92 + socketValUDPBytesSent = 96 + socketValUDPBytesRecv = 104 +) + +type socketEntry struct { + saddr [16]byte + daddr [16]byte + dport uint16 + pid uint32 + name string + protocol uint16 + family uint16 + tcpSentCalls uint32 + tcpRecvCalls uint32 + tcpBytesSent uint64 + tcpBytesRecv uint64 + tcpClose uint32 + tcpRetransmit uint32 + tcpIPv4Connect uint32 + tcpIPv6Connect uint32 + tcpState uint32 + udpSentCalls uint32 + udpRecvCalls uint32 + udpBytesSent uint64 + udpBytesRecv uint64 +} + +func socketDecodeKey(key []byte) (e socketEntry) { + copy(e.saddr[:], key[socketIdxSaddrOffset:socketIdxSaddrOffset+16]) + copy(e.daddr[:], key[socketIdxDaddrOffset:socketIdxDaddrOffset+16]) + e.dport = binary.LittleEndian.Uint16(key[socketIdxDportOffset:]) + e.pid = binary.LittleEndian.Uint32(key[socketIdxPidOffset:]) + return e +} + +func socketRoundUp8(n int) int { + return (n + 7) & ^7 +} + +/* + * socketAggregatePerCPU sums all numeric fields across ncpus CPU slots. + * Name, protocol, and family come from the first non-empty slot. + */ +func socketAggregatePerCPU(e *socketEntry, buf []byte, stride, ncpus int) { + metaFound := false + + for cpu := 0; cpu < ncpus; cpu++ { + slot := buf[cpu*stride:] + + if !metaFound && slot[socketValNameOffset] != 0 { + raw := slot[socketValNameOffset : socketValNameOffset+socketNameLen] + end := 0 + for end < len(raw) && raw[end] != 0 { + end++ + } + e.name = string(raw[:end]) + e.protocol = binary.LittleEndian.Uint16(slot[socketValProtocolOffset:]) + e.family = binary.LittleEndian.Uint16(slot[socketValFamilyOffset:]) + metaFound = true + } + + e.tcpSentCalls += binary.LittleEndian.Uint32(slot[socketValTCPSentCalls:]) + e.tcpRecvCalls += binary.LittleEndian.Uint32(slot[socketValTCPRecvCalls:]) + e.tcpBytesSent += binary.LittleEndian.Uint64(slot[socketValTCPBytesSent:]) + e.tcpBytesRecv += binary.LittleEndian.Uint64(slot[socketValTCPBytesRecv:]) + e.tcpClose += binary.LittleEndian.Uint32(slot[socketValTCPClose:]) + e.tcpRetransmit += binary.LittleEndian.Uint32(slot[socketValTCPRetransmit:]) + e.tcpIPv4Connect += binary.LittleEndian.Uint32(slot[socketValTCPIPv4Connect:]) + e.tcpIPv6Connect += binary.LittleEndian.Uint32(slot[socketValTCPIPv6Connect:]) + e.udpSentCalls += binary.LittleEndian.Uint32(slot[socketValUDPSentCalls:]) + e.udpRecvCalls += binary.LittleEndian.Uint32(slot[socketValUDPRecvCalls:]) + e.udpBytesSent += binary.LittleEndian.Uint64(slot[socketValUDPBytesSent:]) + e.udpBytesRecv += binary.LittleEndian.Uint64(slot[socketValUDPBytesRecv:]) + + if st := binary.LittleEndian.Uint32(slot[socketValTCPState:]); st != 0 { + e.tcpState = st + } + } +} + +func socketFormatIP(family uint16, raw [16]byte) string { + if family == syscall.AF_INET6 { + return net.IP(raw[:16]).String() + } + return net.IP(raw[:4]).String() +} + +func socketJSONEscapeName(name string) string { + name = strings.ReplaceAll(name, "\\", "\\\\") + name = strings.ReplaceAll(name, "\"", "\\\"") + return name +} + +func socketWriteEntryJSON(w io.Writer, e *socketEntry) { + srcIP := socketFormatIP(e.family, e.saddr) + dstIP := socketFormatIP(e.family, e.daddr) + + fmt.Fprintf(w, + " "+ + "{ \"src_ip\" : \"%s\", \"dst_ip\" : \"%s\", "+ + "\"dst_port\" : %d, \"pid\" : %d, \"name\" : \"%s\", "+ + "\"protocol\" : %d, \"family\" : %d, "+ + "\"tcp\" : { \"sent_calls\" : %d, \"recv_calls\" : %d, "+ + "\"bytes_sent\" : %d, \"bytes_recv\" : %d, "+ + "\"close\" : %d, \"retransmit\" : %d, "+ + "\"ipv4_connect\" : %d, \"ipv6_connect\" : %d, "+ + "\"state\" : %d }, "+ + "\"udp\" : { \"sent_calls\" : %d, \"recv_calls\" : %d, "+ + "\"bytes_sent\" : %d, \"bytes_recv\" : %d } }", + srcIP, dstIP, + e.dport, e.pid, socketJSONEscapeName(e.name), + e.protocol, e.family, + e.tcpSentCalls, e.tcpRecvCalls, + e.tcpBytesSent, e.tcpBytesRecv, + e.tcpClose, e.tcpRetransmit, + e.tcpIPv4Connect, e.tcpIPv6Connect, + e.tcpState, + e.udpSentCalls, e.udpRecvCalls, + e.udpBytesSent, e.udpBytesRecv) +} + +func hasSocketTable(obj *bpfObject) bool { + return obj.findMapByName("tbl_nd_socket") != nil +} + +func runSocketTableTester(w io.Writer, obj *bpfObject, iterations int) { + m := obj.findMapByName("tbl_nd_socket") + if m == nil { + fmt.Fprint(w, " \"Total tables\" : 0\n") + return + } + + meta := m.meta() + ncpus := libbpfNumPossibleCPUs() + if ncpus <= 0 { + ncpus = 1 + } + + stride := socketRoundUp8(int(meta.ValueSize)) + collectionSeconds := iterations * socketSleepSec + + fmt.Fprintf(w, + " \"socket_connections\" : {\n"+ + " \"Info\" : { \"Length\" : { \"Key\" : %d, \"Value\" : %d},\n"+ + " \"Type\" : %d,\n"+ + " \"FD\" : %d,\n"+ + " \"ncpus\" : %d,\n"+ + " \"Collection Seconds\" : %d,\n"+ + " \"Data\" : [\n", + meta.KeySize, meta.ValueSize, meta.Type, meta.FD, ncpus, collectionSeconds) + + time.Sleep(time.Duration(collectionSeconds) * time.Second) + + socketReadEntries(w, meta.FD, int(meta.KeySize), stride, ncpus) + + fmt.Fprint(w, + " ]\n"+ + " }\n"+ + " },\n"+ + " \"Total tables\" : 1\n") +} + +func socketReadEntries(w io.Writer, fd, keySize, stride, ncpus int) { + keyBuf := make([]byte, keySize) + nextKey := make([]byte, keySize) + percpuBuf := make([]byte, stride*ncpus) + first := true + + if bpfMapGetNextKey(fd, nil, nextKey) != 0 { + return + } + + for { + if bpfMapLookupElem(fd, nextKey, percpuBuf) == 0 { + e := socketDecodeKey(nextKey) + socketAggregatePerCPU(&e, percpuBuf, stride, ncpus) + + if !first { + fmt.Fprint(w, ",\n") + } + socketWriteEntryJSON(w, &e) + first = false + } + + copy(keyBuf, nextKey) + if bpfMapGetNextKey(fd, keyBuf, nextKey) != 0 { + break + } + } + + if !first { + fmt.Fprint(w, "\n") + } +} diff --git a/gotests/socket_test.go b/gotests/socket_test.go new file mode 100644 index 00000000..55c52758 --- /dev/null +++ b/gotests/socket_test.go @@ -0,0 +1,337 @@ +package main + +import ( + "bytes" + "encoding/binary" + "net" + "strings" + "syscall" + "testing" +) + +// buildSocketKey constructs a raw 40-byte netdata_socket_idx_t buffer. +func buildSocketKey(saddr, daddr [16]byte, dport uint16, pid uint32) []byte { + key := make([]byte, socketIdxPidOffset+4) // 40 bytes + copy(key[socketIdxSaddrOffset:], saddr[:]) + copy(key[socketIdxDaddrOffset:], daddr[:]) + binary.LittleEndian.PutUint16(key[socketIdxDportOffset:], dport) + binary.LittleEndian.PutUint32(key[socketIdxPidOffset:], pid) + return key +} + +// buildSocketValue constructs a raw netdata_socket_t buffer for one CPU slot. +func buildSocketValue(name string, protocol, family uint16, + tcpSent, tcpRecv uint32, tcpBytesSent, tcpBytesRecv uint64, + tcpClose, tcpRetransmit, tcpIPv4, tcpIPv6, tcpState uint32, + udpSent, udpRecv uint32, udpBytesSent, udpBytesRecv uint64) []byte { + + slot := make([]byte, socketValUDPBytesRecv+8) // 112 bytes + n := copy(slot[socketValNameOffset:], name) + if n < socketNameLen { + slot[socketValNameOffset+n] = 0 + } + binary.LittleEndian.PutUint16(slot[socketValProtocolOffset:], protocol) + binary.LittleEndian.PutUint16(slot[socketValFamilyOffset:], family) + binary.LittleEndian.PutUint32(slot[socketValTCPSentCalls:], tcpSent) + binary.LittleEndian.PutUint32(slot[socketValTCPRecvCalls:], tcpRecv) + binary.LittleEndian.PutUint64(slot[socketValTCPBytesSent:], tcpBytesSent) + binary.LittleEndian.PutUint64(slot[socketValTCPBytesRecv:], tcpBytesRecv) + binary.LittleEndian.PutUint32(slot[socketValTCPClose:], tcpClose) + binary.LittleEndian.PutUint32(slot[socketValTCPRetransmit:], tcpRetransmit) + binary.LittleEndian.PutUint32(slot[socketValTCPIPv4Connect:], tcpIPv4) + binary.LittleEndian.PutUint32(slot[socketValTCPIPv6Connect:], tcpIPv6) + binary.LittleEndian.PutUint32(slot[socketValTCPState:], tcpState) + binary.LittleEndian.PutUint32(slot[socketValUDPSentCalls:], udpSent) + binary.LittleEndian.PutUint32(slot[socketValUDPRecvCalls:], udpRecv) + binary.LittleEndian.PutUint64(slot[socketValUDPBytesSent:], udpBytesSent) + binary.LittleEndian.PutUint64(slot[socketValUDPBytesRecv:], udpBytesRecv) + return slot +} + +func TestSocketDecodeKey(t *testing.T) { + var saddr, daddr [16]byte + copy(saddr[:], net.ParseIP("192.168.1.1").To4()) + copy(daddr[:], net.ParseIP("8.8.8.8").To4()) + + raw := buildSocketKey(saddr, daddr, 53, 1234) + e := socketDecodeKey(raw) + + if e.saddr != saddr { + t.Errorf("saddr mismatch: got %v want %v", e.saddr, saddr) + } + if e.daddr != daddr { + t.Errorf("daddr mismatch: got %v want %v", e.daddr, daddr) + } + if e.dport != 53 { + t.Errorf("dport: got %d want 53", e.dport) + } + if e.pid != 1234 { + t.Errorf("pid: got %d want 1234", e.pid) + } +} + +func TestSocketDecodeKeyIPv6(t *testing.T) { + var saddr, daddr [16]byte + copy(saddr[:], net.ParseIP("2001:db8::1")) + copy(daddr[:], net.ParseIP("2001:4860:4860::8888")) + + raw := buildSocketKey(saddr, daddr, 443, 9999) + e := socketDecodeKey(raw) + + if e.saddr != saddr { + t.Errorf("IPv6 saddr mismatch") + } + if e.dport != 443 { + t.Errorf("dport: got %d want 443", e.dport) + } + if e.pid != 9999 { + t.Errorf("pid: got %d want 9999", e.pid) + } +} + +func TestSocketAggregatePerCPUSingleSlot(t *testing.T) { + slot := buildSocketValue("nginx", syscall.IPPROTO_TCP, syscall.AF_INET, + 10, 5, 1024, 2048, 1, 0, 3, 0, 1, + 0, 0, 0, 0) + + e := socketEntry{} + socketAggregatePerCPU(&e, slot, len(slot), 1) + + if e.name != "nginx" { + t.Errorf("name: got %q want %q", e.name, "nginx") + } + if e.protocol != syscall.IPPROTO_TCP { + t.Errorf("protocol: got %d want %d", e.protocol, syscall.IPPROTO_TCP) + } + if e.family != syscall.AF_INET { + t.Errorf("family: got %d want %d", e.family, syscall.AF_INET) + } + if e.tcpSentCalls != 10 { + t.Errorf("tcpSentCalls: got %d want 10", e.tcpSentCalls) + } + if e.tcpBytesRecv != 2048 { + t.Errorf("tcpBytesRecv: got %d want 2048", e.tcpBytesRecv) + } + if e.tcpIPv4Connect != 3 { + t.Errorf("tcpIPv4Connect: got %d want 3", e.tcpIPv4Connect) + } + if e.tcpState != 1 { + t.Errorf("tcpState: got %d want 1", e.tcpState) + } +} + +func TestSocketAggregatePerCPUMultipleSlots(t *testing.T) { + stride := socketRoundUp8(112) + + // CPU 0: has name and some stats + slot0 := buildSocketValue("sshd", syscall.IPPROTO_TCP, syscall.AF_INET, + 5, 3, 500, 600, 0, 1, 1, 0, 4, + 0, 0, 0, 0) + + // CPU 1: no name (zeroed), but has additional stats + slot1 := buildSocketValue("", syscall.IPPROTO_TCP, syscall.AF_INET, + 3, 2, 200, 400, 0, 0, 0, 0, 0, + 0, 0, 0, 0) + + buf := make([]byte, 2*stride) + copy(buf[0*stride:], slot0) + copy(buf[1*stride:], slot1) + + e := socketEntry{} + socketAggregatePerCPU(&e, buf, stride, 2) + + if e.name != "sshd" { + t.Errorf("name: got %q want %q", e.name, "sshd") + } + if e.tcpSentCalls != 8 { // 5 + 3 + t.Errorf("tcpSentCalls: got %d want 8", e.tcpSentCalls) + } + if e.tcpBytesSent != 700 { // 500 + 200 + t.Errorf("tcpBytesSent: got %d want 700", e.tcpBytesSent) + } + if e.tcpBytesRecv != 1000 { // 600 + 400 + t.Errorf("tcpBytesRecv: got %d want 1000", e.tcpBytesRecv) + } + if e.tcpRetransmit != 1 { + t.Errorf("tcpRetransmit: got %d want 1", e.tcpRetransmit) + } + if e.tcpState != 4 { + t.Errorf("tcpState: got %d want 4", e.tcpState) + } +} + +func TestSocketAggregatePerCPUNameFromSecondSlot(t *testing.T) { + stride := socketRoundUp8(112) + + // CPU 0: no name + slot0 := buildSocketValue("", 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + // CPU 1: has name + slot1 := buildSocketValue("curl", syscall.IPPROTO_TCP, syscall.AF_INET6, + 1, 1, 100, 100, 0, 0, 0, 1, 6, + 0, 0, 0, 0) + + buf := make([]byte, 2*stride) + copy(buf[0*stride:], slot0) + copy(buf[1*stride:], slot1) + + e := socketEntry{} + socketAggregatePerCPU(&e, buf, stride, 2) + + if e.name != "curl" { + t.Errorf("name from second slot: got %q want %q", e.name, "curl") + } + if e.family != syscall.AF_INET6 { + t.Errorf("family: got %d want AF_INET6=%d", e.family, syscall.AF_INET6) + } +} + +func TestSocketRoundUp8(t *testing.T) { + cases := []struct{ in, want int }{ + {0, 0}, {1, 8}, {8, 8}, {9, 16}, {112, 112}, {113, 120}, + } + for _, c := range cases { + if got := socketRoundUp8(c.in); got != c.want { + t.Errorf("socketRoundUp8(%d) = %d, want %d", c.in, got, c.want) + } + } +} + +func TestSocketFormatIPv4(t *testing.T) { + var raw [16]byte + copy(raw[:], net.ParseIP("10.0.0.1").To4()) + + got := socketFormatIP(syscall.AF_INET, raw) + if got != "10.0.0.1" { + t.Errorf("IPv4 format: got %q want %q", got, "10.0.0.1") + } +} + +func TestSocketFormatIPv6(t *testing.T) { + var raw [16]byte + copy(raw[:], net.ParseIP("2001:db8::1")) + + got := socketFormatIP(syscall.AF_INET6, raw) + if got != "2001:db8::1" { + t.Errorf("IPv6 format: got %q want %q", got, "2001:db8::1") + } +} + +func TestSocketJSONEscapeName(t *testing.T) { + cases := []struct{ in, want string }{ + {"nginx", "nginx"}, + {"my\"app", `my\"app`}, + {"back\\slash", `back\\slash`}, + {"", ""}, + } + for _, c := range cases { + got := socketJSONEscapeName(c.in) + if got != c.want { + t.Errorf("escape(%q) = %q, want %q", c.in, got, c.want) + } + } +} + +func TestSocketWriteEntryJSON(t *testing.T) { + var saddr, daddr [16]byte + copy(saddr[:], net.ParseIP("192.168.0.1").To4()) + copy(daddr[:], net.ParseIP("93.184.216.34").To4()) + + e := socketEntry{ + saddr: saddr, + daddr: daddr, + dport: 80, + pid: 1001, + name: "wget", + protocol: uint16(syscall.IPPROTO_TCP), + family: uint16(syscall.AF_INET), + tcpSentCalls: 7, + tcpRecvCalls: 3, + tcpBytesSent: 4096, + tcpBytesRecv: 8192, + tcpClose: 1, + tcpRetransmit: 0, + tcpIPv4Connect: 1, + tcpIPv6Connect: 0, + tcpState: 5, + udpSentCalls: 0, + udpRecvCalls: 0, + udpBytesSent: 0, + udpBytesRecv: 0, + } + + var buf bytes.Buffer + socketWriteEntryJSON(&buf, &e) + out := buf.String() + + checks := []string{ + `"src_ip" : "192.168.0.1"`, + `"dst_ip" : "93.184.216.34"`, + `"dst_port" : 80`, + `"pid" : 1001`, + `"name" : "wget"`, + `"sent_calls" : 7`, + `"recv_calls" : 3`, + `"bytes_sent" : 4096`, + `"bytes_recv" : 8192`, + `"close" : 1`, + `"ipv4_connect" : 1`, + `"state" : 5`, + } + for _, want := range checks { + if !strings.Contains(out, want) { + t.Errorf("JSON missing %q\ngot: %s", want, out) + } + } +} + +func TestSocketWriteEntryJSONIPv6(t *testing.T) { + var saddr, daddr [16]byte + copy(saddr[:], net.ParseIP("::1")) + copy(daddr[:], net.ParseIP("2001:db8::2")) + + e := socketEntry{ + saddr: saddr, + daddr: daddr, + dport: 443, + pid: 2002, + name: "curl", + protocol: uint16(syscall.IPPROTO_TCP), + family: uint16(syscall.AF_INET6), + } + + var buf bytes.Buffer + socketWriteEntryJSON(&buf, &e) + out := buf.String() + + if !strings.Contains(out, `"src_ip" : "::1"`) { + t.Errorf("expected IPv6 src_ip, got: %s", out) + } + if !strings.Contains(out, `"dst_port" : 443`) { + t.Errorf("expected dst_port 443, got: %s", out) + } +} + +func TestSocketWriteEntryJSONEscapedName(t *testing.T) { + e := socketEntry{ + name: `proc"name`, + family: uint16(syscall.AF_INET), + } + + var buf bytes.Buffer + socketWriteEntryJSON(&buf, &e) + out := buf.String() + + if !strings.Contains(out, `"name" : "proc\"name"`) { + t.Errorf("expected escaped name, got: %s", out) + } +} + +func TestSocketReadEntriesEmpty(t *testing.T) { + // socketReadEntries with no map data: verifies it produces no output. + // We pass fd=-1 which bpfMapGetNextKey will reject immediately (returns non-zero). + var buf bytes.Buffer + socketReadEntries(&buf, -1, socketIdxPidOffset+4, socketRoundUp8(112), 1) + if buf.Len() != 0 { + t.Errorf("expected empty output for invalid fd, got: %q", buf.String()) + } +} diff --git a/tests/Makefile b/tests/Makefile index 7324bc7e..c126528d 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -5,7 +5,7 @@ LIBBPF = ../libbpf LIBBPF_READY = ../.local_libbpf/libbpf.a TESTER_BIN = legacy_test GO_TESTER_BIN = ../gotests/go_tester -TESTER_SRCS = tester_user.c tester_dns.c +TESTER_SRCS = tester_user.c tester_dns.c tester_socket.c GO_TESTER_SRCS = $(wildcard ../gotests/*.go ../gotests/go.mod ../gotests/go.sum) TESTER_DEBUG_CFLAGS ?= -g GO_TESTER_GCFLAGS ?= all=-dwarf=true diff --git a/tests/tester_socket.c b/tests/tester_socket.c new file mode 100644 index 00000000..db8d2000 --- /dev/null +++ b/tests/tester_socket.c @@ -0,0 +1,355 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "tester_socket.h" + +/* + * Binary layout of netdata_socket_idx_t (40 bytes). + * Offsets must match kernel struct in includes/netdata_socket.h. + */ +#define SOCKET_IDX_SADDR_OFFSET 0 /* union netdata_ip: 16 bytes */ +#define SOCKET_IDX_DADDR_OFFSET 16 /* union netdata_ip: 16 bytes */ +#define SOCKET_IDX_DPORT_OFFSET 32 /* __u16: 2 bytes */ +#define SOCKET_IDX_PID_OFFSET 36 /* __u32: 4 bytes (after 2B pad) */ +#define SOCKET_IDX_SIZE 40 + +/* + * Binary layout of netdata_socket_t (112 bytes). + * Offsets must match kernel struct in includes/netdata_socket.h. + */ +#define SOCKET_VAL_NAME_OFFSET 0 /* char[16] */ +#define SOCKET_VAL_FIRST_OFFSET 16 /* __u64 */ +#define SOCKET_VAL_CT_OFFSET 24 /* __u64 */ +#define SOCKET_VAL_PROTOCOL_OFFSET 32 /* __u16 */ +#define SOCKET_VAL_FAMILY_OFFSET 34 /* __u16 */ +#define SOCKET_VAL_EXTORIGIN_OFFSET 36 /* __u32 */ +/* tcp sub-struct starts at offset 40 */ +#define SOCKET_VAL_TCP_SENT_CALLS 40 /* __u32 */ +#define SOCKET_VAL_TCP_RECV_CALLS 44 /* __u32 */ +#define SOCKET_VAL_TCP_BYTES_SENT 48 /* __u64 */ +#define SOCKET_VAL_TCP_BYTES_RECV 56 /* __u64 */ +#define SOCKET_VAL_TCP_CLOSE 64 /* __u32 */ +#define SOCKET_VAL_TCP_RETRANSMIT 68 /* __u32 */ +#define SOCKET_VAL_TCP_IPV4_CONNECT 72 /* __u32 */ +#define SOCKET_VAL_TCP_IPV6_CONNECT 76 /* __u32 */ +#define SOCKET_VAL_TCP_STATE 80 /* __u32 */ +/* 4 bytes implicit padding at 84 to align udp to 8-byte boundary */ +/* udp sub-struct starts at offset 88 */ +#define SOCKET_VAL_UDP_SENT_CALLS 88 /* __u32 */ +#define SOCKET_VAL_UDP_RECV_CALLS 92 /* __u32 */ +#define SOCKET_VAL_UDP_BYTES_SENT 96 /* __u64 */ +#define SOCKET_VAL_UDP_BYTES_RECV 104 /* __u64 */ +#define SOCKET_VAL_SIZE 112 + +#define SOCKET_NAME_LEN 16 +#define SOCKET_SLEEP_SEC 5 + +/* ------------------------------------------------------------------------- + * Low-level read helpers (little-endian, same as the kernel writes them). + * -------------------------------------------------------------------------*/ + +static uint16_t sock_read_u16(const uint8_t *src) +{ + uint16_t v; + memcpy(&v, src, sizeof(v)); + return v; +} + +static uint32_t sock_read_u32(const uint8_t *src) +{ + uint32_t v; + memcpy(&v, src, sizeof(v)); + return v; +} + +static uint64_t sock_read_u64(const uint8_t *src) +{ + uint64_t v; + memcpy(&v, src, sizeof(v)); + return v; +} + +/* ------------------------------------------------------------------------- + * Decoded, aggregated representation of one connection entry. + * -------------------------------------------------------------------------*/ + +typedef struct { + uint8_t saddr[16]; + uint8_t daddr[16]; + uint16_t dport; + uint32_t pid; + char name[SOCKET_NAME_LEN + 1]; + uint16_t protocol; + uint16_t family; + + uint32_t tcp_sent_calls; + uint32_t tcp_recv_calls; + uint64_t tcp_bytes_sent; + uint64_t tcp_bytes_recv; + uint32_t tcp_close; + uint32_t tcp_retransmit; + uint32_t tcp_ipv4_connect; + uint32_t tcp_ipv6_connect; + uint32_t tcp_state; + + uint32_t udp_sent_calls; + uint32_t udp_recv_calls; + uint64_t udp_bytes_sent; + uint64_t udp_bytes_recv; +} socket_entry_t; + +static void socket_decode_key(socket_entry_t *e, const uint8_t *key) +{ + memcpy(e->saddr, key + SOCKET_IDX_SADDR_OFFSET, 16); + memcpy(e->daddr, key + SOCKET_IDX_DADDR_OFFSET, 16); + e->dport = sock_read_u16(key + SOCKET_IDX_DPORT_OFFSET); + e->pid = sock_read_u32(key + SOCKET_IDX_PID_OFFSET); +} + +/* + * Aggregate ncpus per-CPU slots (each 'stride' bytes apart) into one entry. + * Non-name scalar fields are summed; name and family come from the first + * non-empty CPU slot encountered. + */ +static void socket_aggregate_percpu(socket_entry_t *e, const uint8_t *buf, + size_t stride, int ncpus) +{ + int cpu; + int meta_found = 0; + + memset(e->name, 0, sizeof(e->name)); + e->protocol = 0; + e->family = 0; + e->tcp_sent_calls = e->tcp_recv_calls = 0; + e->tcp_bytes_sent = e->tcp_bytes_recv = 0; + e->tcp_close = e->tcp_retransmit = 0; + e->tcp_ipv4_connect = e->tcp_ipv6_connect = 0; + e->tcp_state = 0; + e->udp_sent_calls = e->udp_recv_calls = 0; + e->udp_bytes_sent = e->udp_bytes_recv = 0; + + for (cpu = 0; cpu < ncpus; cpu++) { + const uint8_t *slot = buf + (size_t)cpu * stride; + + if (!meta_found && slot[SOCKET_VAL_NAME_OFFSET] != '\0') { + memcpy(e->name, slot + SOCKET_VAL_NAME_OFFSET, SOCKET_NAME_LEN); + e->name[SOCKET_NAME_LEN] = '\0'; + e->protocol = sock_read_u16(slot + SOCKET_VAL_PROTOCOL_OFFSET); + e->family = sock_read_u16(slot + SOCKET_VAL_FAMILY_OFFSET); + meta_found = 1; + } + + e->tcp_sent_calls += sock_read_u32(slot + SOCKET_VAL_TCP_SENT_CALLS); + e->tcp_recv_calls += sock_read_u32(slot + SOCKET_VAL_TCP_RECV_CALLS); + e->tcp_bytes_sent += sock_read_u64(slot + SOCKET_VAL_TCP_BYTES_SENT); + e->tcp_bytes_recv += sock_read_u64(slot + SOCKET_VAL_TCP_BYTES_RECV); + e->tcp_close += sock_read_u32(slot + SOCKET_VAL_TCP_CLOSE); + e->tcp_retransmit += sock_read_u32(slot + SOCKET_VAL_TCP_RETRANSMIT); + e->tcp_ipv4_connect += sock_read_u32(slot + SOCKET_VAL_TCP_IPV4_CONNECT); + e->tcp_ipv6_connect += sock_read_u32(slot + SOCKET_VAL_TCP_IPV6_CONNECT); + e->udp_sent_calls += sock_read_u32(slot + SOCKET_VAL_UDP_SENT_CALLS); + e->udp_recv_calls += sock_read_u32(slot + SOCKET_VAL_UDP_RECV_CALLS); + e->udp_bytes_sent += sock_read_u64(slot + SOCKET_VAL_UDP_BYTES_SENT); + e->udp_bytes_recv += sock_read_u64(slot + SOCKET_VAL_UDP_BYTES_RECV); + + { + uint32_t st = sock_read_u32(slot + SOCKET_VAL_TCP_STATE); + if (st) + e->tcp_state = st; + } + } +} + +static void socket_format_ip(char *buf, size_t buflen, const uint8_t *raw, uint16_t family) +{ + if (family == AF_INET6) + inet_ntop(AF_INET6, raw, buf, (socklen_t)buflen); + else + inet_ntop(AF_INET, raw, buf, (socklen_t)buflen); +} + +/* + * Escape a process name string for JSON. Process names (TASK_COMM_LEN) are + * ASCII, but we handle the two characters that would break JSON strings. + */ +static void socket_write_json_name(FILE *out, const char *name) +{ + const char *p; + fputc('"', out); + for (p = name; *p; p++) { + if (*p == '"') + fputs("\\\"", out); + else if (*p == '\\') + fputs("\\\\", out); + else + fputc(*p, out); + } + fputc('"', out); +} + +static void socket_write_entry_json(FILE *out, const socket_entry_t *e) +{ + char src_buf[INET6_ADDRSTRLEN]; + char dst_buf[INET6_ADDRSTRLEN]; + + src_buf[0] = dst_buf[0] = '\0'; + socket_format_ip(src_buf, sizeof(src_buf), e->saddr, e->family); + socket_format_ip(dst_buf, sizeof(dst_buf), e->daddr, e->family); + + fprintf(out, + " " + "{ \"src_ip\" : \"%s\", \"dst_ip\" : \"%s\", " + "\"dst_port\" : %u, \"pid\" : %u, \"name\" : ", + src_buf, dst_buf, + (unsigned)e->dport, (unsigned)e->pid); + socket_write_json_name(out, e->name); + fprintf(out, + ", \"protocol\" : %u, \"family\" : %u, " + "\"tcp\" : { \"sent_calls\" : %u, \"recv_calls\" : %u, " + "\"bytes_sent\" : %llu, \"bytes_recv\" : %llu, " + "\"close\" : %u, \"retransmit\" : %u, " + "\"ipv4_connect\" : %u, \"ipv6_connect\" : %u, " + "\"state\" : %u }, " + "\"udp\" : { \"sent_calls\" : %u, \"recv_calls\" : %u, " + "\"bytes_sent\" : %llu, \"bytes_recv\" : %llu } }", + (unsigned)e->protocol, (unsigned)e->family, + (unsigned)e->tcp_sent_calls, (unsigned)e->tcp_recv_calls, + (unsigned long long)e->tcp_bytes_sent, + (unsigned long long)e->tcp_bytes_recv, + (unsigned)e->tcp_close, (unsigned)e->tcp_retransmit, + (unsigned)e->tcp_ipv4_connect, (unsigned)e->tcp_ipv6_connect, + (unsigned)e->tcp_state, + (unsigned)e->udp_sent_calls, (unsigned)e->udp_recv_calls, + (unsigned long long)e->udp_bytes_sent, + (unsigned long long)e->udp_bytes_recv); +} + +/* ------------------------------------------------------------------------- + * Public interface. + * -------------------------------------------------------------------------*/ + +int ebpf_object_has_socket_table(struct bpf_object *obj) +{ + struct bpf_map *map; + + bpf_object__for_each_map(map, obj) { + if (!strcmp(bpf_map__name(map), "tbl_nd_socket")) + return 1; + } + + return 0; +} + +void ebpf_socket_table_tester(struct bpf_object *obj, FILE *out, int iterations) +{ + struct bpf_map *map = NULL; + struct bpf_map *m; + int fd; + uint32_t key_size, value_size, map_type; + int ncpus; + size_t stride; + uint8_t *key_buf = NULL; + uint8_t *next_key = NULL; + uint8_t *percpu_buf = NULL; + int entry_count = 0; + int first = 1; + int collection_seconds = iterations * SOCKET_SLEEP_SEC; + + bpf_object__for_each_map(m, obj) { + if (!strcmp(bpf_map__name(m), "tbl_nd_socket")) { + map = m; + break; + } + } + + if (!map) { + fprintf(out, " \"Total tables\" : 0\n"); + return; + } + + fd = bpf_map__fd(map); +#ifdef LIBBPF_MAJOR_VERSION + map_type = (uint32_t)bpf_map__type(map); + key_size = bpf_map__key_size(map); + value_size = bpf_map__value_size(map); +#else + { + const struct bpf_map_def *def = bpf_map__def(map); + map_type = (uint32_t)def->type; + key_size = def->key_size; + value_size = def->value_size; + } +#endif + + ncpus = libbpf_num_possible_cpus(); + if (ncpus <= 0) + ncpus = 1; + + /* PERCPU_HASH: each lookup returns ncpus * stride bytes. */ + stride = ((size_t)value_size + 7U) & ~7U; /* round up to 8-byte boundary */ + + key_buf = calloc(key_size, 1); + next_key = calloc(key_size, 1); + percpu_buf = calloc((size_t)ncpus * stride, 1); + if (!key_buf || !next_key || !percpu_buf) + goto cleanup; + + fprintf(out, + " \"socket_connections\" : {\n" + " \"Info\" : { \"Length\" : { \"Key\" : %u, \"Value\" : %u},\n" + " \"Type\" : %u,\n" + " \"FD\" : %d,\n" + " \"ncpus\" : %d,\n" + " \"Collection Seconds\" : %d,\n" + " \"Data\" : [\n", + key_size, value_size, map_type, fd, ncpus, collection_seconds); + + sleep((unsigned int)collection_seconds); + + if (bpf_map_get_next_key(fd, NULL, next_key)) + goto write_footer; + + do { + socket_entry_t entry; + + if (bpf_map_lookup_elem(fd, next_key, percpu_buf)) + goto advance; + + socket_decode_key(&entry, next_key); + socket_aggregate_percpu(&entry, percpu_buf, stride, ncpus); + + if (!first) + fprintf(out, ",\n"); + + socket_write_entry_json(out, &entry); + first = 0; + entry_count++; + +advance: + memcpy(key_buf, next_key, key_size); + } while (!bpf_map_get_next_key(fd, key_buf, next_key)); + +write_footer: + if (!first) + fprintf(out, "\n"); + + fprintf(out, + " ]\n" + " }\n" + " },\n" + " \"Total tables\" : 1\n"); + +cleanup: + free(key_buf); + free(next_key); + free(percpu_buf); +} diff --git a/tests/tester_socket.h b/tests/tester_socket.h new file mode 100644 index 00000000..38891e1c --- /dev/null +++ b/tests/tester_socket.h @@ -0,0 +1,11 @@ +#ifndef NETDATA_LEGACY_TESTER_SOCKET +#define NETDATA_LEGACY_TESTER_SOCKET 1 + +#include + +struct bpf_object; + +int ebpf_object_has_socket_table(struct bpf_object *obj); +void ebpf_socket_table_tester(struct bpf_object *obj, FILE *out, int iterations); + +#endif /* NETDATA_LEGACY_TESTER_SOCKET */ diff --git a/tests/tester_user.c b/tests/tester_user.c index 58e8de20..c0c61885 100644 --- a/tests/tester_user.c +++ b/tests/tester_user.c @@ -19,6 +19,7 @@ // Libbpf #include "tester_user.h" #include "tester_dns.h" +#include "tester_socket.h" static ebpf_specify_name_t dc_optional_name[] = { {.program_name = "netdata_lookup_fast", .function_to_attach = "lookup_fast", @@ -1804,11 +1805,15 @@ static char *ebpf_tester(char *filename, ebpf_specify_name_t *names, uint32_t ma } if (!errors && maps) { - if (ctrl) { - ebpf_fill_ctrl(obj, ctrl); - } + if (ebpf_object_has_socket_table(obj)) { + ebpf_socket_table_tester(obj, stdlog, end_iteration); + } else { + if (ctrl) { + ebpf_fill_ctrl(obj, ctrl); + } - ebpf_test_maps(obj, ctrl); + ebpf_test_maps(obj, ctrl); + } } if (!errors) { From ff073ab4bfe75809451e7856dbdf33621097d40d Mon Sep 17 00:00:00 2001 From: thiagoftsm Date: Mon, 4 May 2026 18:57:53 +0000 Subject: [PATCH 3/5] tests: Add socket buffer and arena --- gotests/cgo_helpers.go | 63 +++- gotests/main.go | 14 +- gotests/main_test.go | 4 +- gotests/socket.go | 70 +++++ gotests/socket_ringbuf.go | 208 +++++++++++++ includes/netdata_socket.h | 4 + includes/netdata_socket_arena.h | 11 + includes/netdata_socket_buffer.h | 15 + kernel/Makefile | 2 + kernel/socket_arena_kern.c | 29 ++ kernel/socket_buffer_kern.c | 493 +++++++++++++++++++++++++++++++ tests/tester_socket.c | 215 +++++++++++--- tests/tester_socket.h | 7 +- tests/tester_user.c | 27 +- 14 files changed, 1105 insertions(+), 57 deletions(-) create mode 100644 gotests/socket_ringbuf.go create mode 100644 includes/netdata_socket_arena.h create mode 100644 includes/netdata_socket_buffer.h create mode 100644 kernel/socket_arena_kern.c create mode 100644 kernel/socket_buffer_kern.c diff --git a/gotests/cgo_helpers.go b/gotests/cgo_helpers.go index 172a8d5a..86eabf2e 100644 --- a/gotests/cgo_helpers.go +++ b/gotests/cgo_helpers.go @@ -25,6 +25,13 @@ struct netdata_ringbuf_stats { uint64_t bytes; }; +struct netdata_socket_ringbuf_ctx { + struct netdata_ringbuf_stats *stats; + uint64_t handle; +}; + +extern void socketRingbufSample(uint64_t handle, void *data, size_t size); + #ifdef LIBBPF_MAJOR_VERSION static int netdata_libbpf_probe_bpf_map_type(unsigned int map_type) { @@ -132,37 +139,75 @@ static int netdata_ring_buffer_sample_cb(void *ctx, void *data, size_t size) return 0; } -static struct netdata_ringbuf_stats *netdata_ringbuf_stats_new(void) +static int netdata_socket_ring_buffer_sample_cb(void *ctx, void *data, size_t size) +{ + struct netdata_socket_ringbuf_ctx *socket_ctx = ctx; + + if (socket_ctx) { + if (socket_ctx->stats) { + socket_ctx->stats->samples++; + socket_ctx->stats->bytes += size; + } + socketRingbufSample(socket_ctx->handle, data, size); + } + + return 0; +} + +struct netdata_ringbuf_stats *netdata_ringbuf_stats_new(void) { return calloc(1, sizeof(struct netdata_ringbuf_stats)); } -static void netdata_ringbuf_stats_free(struct netdata_ringbuf_stats *stats) +void netdata_ringbuf_stats_free(struct netdata_ringbuf_stats *stats) { free(stats); } -static uint64_t netdata_ringbuf_stats_samples(const struct netdata_ringbuf_stats *stats) +uint64_t netdata_ringbuf_stats_samples(const struct netdata_ringbuf_stats *stats) { return stats ? stats->samples : 0; } -static uint64_t netdata_ringbuf_stats_bytes(const struct netdata_ringbuf_stats *stats) +uint64_t netdata_ringbuf_stats_bytes(const struct netdata_ringbuf_stats *stats) { return stats ? stats->bytes : 0; } -static struct ring_buffer *netdata_ring_buffer_new(int map_fd, struct netdata_ringbuf_stats *stats) +struct ring_buffer *netdata_ring_buffer_new(int map_fd, struct netdata_ringbuf_stats *stats) { return ring_buffer__new(map_fd, netdata_ring_buffer_sample_cb, stats, NULL); } -static int netdata_ring_buffer_poll(struct ring_buffer *rb, int timeout_ms) +struct netdata_socket_ringbuf_ctx *netdata_socket_ringbuf_ctx_new(struct netdata_ringbuf_stats *stats, uint64_t handle) +{ + struct netdata_socket_ringbuf_ctx *ctx = calloc(1, sizeof(*ctx)); + + if (!ctx) + return NULL; + + ctx->stats = stats; + ctx->handle = handle; + + return ctx; +} + +void netdata_socket_ringbuf_ctx_free(struct netdata_socket_ringbuf_ctx *ctx) +{ + free(ctx); +} + +struct ring_buffer *netdata_socket_ring_buffer_new(int map_fd, struct netdata_socket_ringbuf_ctx *ctx) +{ + return ring_buffer__new(map_fd, netdata_socket_ring_buffer_sample_cb, ctx, NULL); +} + +int netdata_ring_buffer_poll(struct ring_buffer *rb, int timeout_ms) { return ring_buffer__poll(rb, timeout_ms); } -static uint64_t netdata_ring_buffer_avail_data(const struct ring_buffer *rb) +uint64_t netdata_ring_buffer_avail_data(const struct ring_buffer *rb) { struct ring *ring = ring_buffer__ring((struct ring_buffer *)rb, 0); @@ -172,7 +217,7 @@ static uint64_t netdata_ring_buffer_avail_data(const struct ring_buffer *rb) return (uint64_t)ring__avail_data_size(ring); } -static uint64_t netdata_ring_buffer_size(const struct ring_buffer *rb) +uint64_t netdata_ring_buffer_size(const struct ring_buffer *rb) { struct ring *ring = ring_buffer__ring((struct ring_buffer *)rb, 0); @@ -182,7 +227,7 @@ static uint64_t netdata_ring_buffer_size(const struct ring_buffer *rb) return (uint64_t)ring__size(ring); } -static void netdata_ring_buffer_free(struct ring_buffer *rb) +void netdata_ring_buffer_free(struct ring_buffer *rb) { ring_buffer__free(rb); } diff --git a/gotests/main.go b/gotests/main.go index 148882be..48b772c8 100644 --- a/gotests/main.go +++ b/gotests/main.go @@ -256,7 +256,7 @@ var ( {kernels: netdataV310 | netdataV414 | netdataV416 | netdataV418 | netdataV54 | netdataV514, flags: flagMDFlush, name: "mdflush"}, {kernels: netdataV310 | netdataV414 | netdataV416 | netdataV418 | netdataV54 | netdataV514, flags: flagMount, name: "mount"}, {kernels: netdataV310 | netdataV414 | netdataV416 | netdataV418 | netdataV54 | netdataV514, flags: flagSync, name: "msync"}, - {kernels: netdataV310 | netdataV414 | netdataV416 | netdataV418 | netdataV54 | netdataV514, flags: flagSocket, name: "socket", ctrlTable: "socket_ctrl"}, + {kernels: netdataV310 | netdataV414 | netdataV416 | netdataV418 | netdataV54 | netdataV514, bufferKernels: netdataV510 | netdataV511 | netdataV514 | netdataV515 | netdataV516 | netdataV68 | netdataV612, arenaKernels: netdataV510 | netdataV511 | netdataV514 | netdataV515 | netdataV516 | netdataV68 | netdataV612, flags: flagSocket, name: "socket", ctrlTable: "socket_ctrl"}, {kernels: netdataV310 | netdataV414 | netdataV416 | netdataV418 | netdataV54 | netdataV514, bufferKernels: netdataV510 | netdataV511 | netdataV514 | netdataV515 | netdataV516 | netdataV68 | netdataV612, arenaKernels: netdataV510 | netdataV511 | netdataV514 | netdataV515 | netdataV516 | netdataV68 | netdataV612, flags: flagDNS, name: "dns"}, {kernels: netdataV310 | netdataV414 | netdataV416 | netdataV418 | netdataV54 | netdataV514, flags: flagNFS, name: "nfs", ctrlTable: "nfs_ctrl"}, {kernels: netdataV310 | netdataV414 | netdataV416 | netdataV418 | netdataV54 | netdataV514, flags: flagNetworkViewer, name: "network_viewer", ctrlTable: "nv_ctrl"}, @@ -483,8 +483,8 @@ func helpText(exe string) string { "--content Test content stored inside hash tables.\n"+ "--iteration Number of iterations when content is read, default value is 1.\n"+ "--pid Specify the number that identifies PID that will be monitored: 0 - Real Parent PID (Default), 1 - Parent PID, 2 - All PID, and 3 - Ignore PID (ring buffer mode).\n"+ - "--buffer Test ring buffer versions of collectors (cachestat, dc, fd, oomkill, process, shm, swap, vfs, dns).\n"+ - "--arena Test arena versions of collectors (cachestat, dc, fd, oomkill, process, shm, swap, vfs, dns).\n\n"+ + "--buffer Test ring buffer versions of collectors (cachestat, dc, fd, oomkill, process, shm, swap, vfs, dns, socket).\n"+ + "--arena Test arena versions of collectors (cachestat, dc, fd, oomkill, process, shm, swap, vfs, dns, socket).\n\n"+ "You can also specify an unique eBPF program developed by Netdata with the following\n"+ "options:\n"+ "--btrfs Latency for btrfs.\n"+ @@ -1025,6 +1025,7 @@ func moduleHasBuffer(name string) bool { "swap": true, "vfs": true, "dns": true, + "socket": true, } return bufferModules[name] } @@ -1040,6 +1041,7 @@ func moduleHasArena(name string) bool { "swap": true, "vfs": true, "dns": true, + "socket": true, } return arenaModules[name] } @@ -1545,6 +1547,12 @@ func testMaps(w io.Writer, obj *bpfObject, ctrl string, iterations int, nprocess for m := obj.firstMap(); m != nil; m = obj.nextMap(m) { meta := m.meta() if !supportsMapKeyValueIO(meta.Type) { + if meta.Name == "socket_events" { + runSocketRingBufferTester(w, obj, iterations) + fmt.Fprint(w, " ]\n }\n },\n") + tables++ + continue + } testRingBufferMap(w, meta, iterations) fmt.Fprint(w, " ]\n }\n },\n") tables++ diff --git a/gotests/main_test.go b/gotests/main_test.go index dc780b60..9256f1d0 100644 --- a/gotests/main_test.go +++ b/gotests/main_test.go @@ -764,7 +764,7 @@ func TestModeSuffix(t *testing.T) { } func TestModuleModeLookup(t *testing.T) { - bufferArenaModules := []string{"cachestat", "dc", "fd", "oomkill", "process", "shm", "swap", "vfs", "dns"} + bufferArenaModules := []string{"cachestat", "dc", "fd", "oomkill", "process", "shm", "swap", "vfs", "dns", "socket"} for _, name := range bufferArenaModules { if !moduleHasBuffer(name) { t.Fatalf("expected %q to have buffer support", name) @@ -774,7 +774,7 @@ func TestModuleModeLookup(t *testing.T) { } } - plainOnlyModules := []string{"btrfs", "disk", "ext4", "hardirq", "mdflush", "mount", "nfs", "network_viewer", "softirq", "socket", "xfs", "zfs"} + plainOnlyModules := []string{"btrfs", "disk", "ext4", "hardirq", "mdflush", "mount", "nfs", "network_viewer", "softirq", "xfs", "zfs"} for _, name := range plainOnlyModules { if moduleHasBuffer(name) { t.Fatalf("expected %q to not have buffer support", name) diff --git a/gotests/socket.go b/gotests/socket.go index a6898cc0..ee054716 100644 --- a/gotests/socket.go +++ b/gotests/socket.go @@ -229,3 +229,73 @@ func socketReadEntries(w io.Writer, fd, keySize, stride, ncpus int) { fmt.Fprint(w, "\n") } } + +func socketKeyString(key []byte) string { + return string(key[:socketEventKeySize]) +} + +func hasSocketEvents(obj *bpfObject) bool { + return obj.findMapByName("socket_events") != nil +} + +func socketWriteCollectedEntries(w io.Writer, coll *socketRingbufCollector) { + if coll == nil { + return + } + + coll.mu.Lock() + defer coll.mu.Unlock() + + first := true + for _, key := range coll.order { + entry := coll.entries[key] + if entry == nil { + continue + } + if !first { + fmt.Fprint(w, ",\n") + } + socketWriteEntryJSON(w, entry) + first = false + } + + if !first { + fmt.Fprint(w, "\n") + } +} + +func runSocketRingBufferTester(w io.Writer, obj *bpfObject, iterations int) { + m := obj.findMapByName("socket_events") + if m == nil { + return + } + + meta := m.meta() + rb, errCode := newSocketRingBuffer(meta.FD) + if rb != nil { + defer rb.free() + } + + collectionSeconds := iterations * socketSleepSec + + fmt.Fprintf(w, + " \"socket_connections\" : {\n"+ + " \"Info\" : { \"Length\" : { \"Key\" : %d, \"Value\" : %d},\n"+ + " \"Type\" : %d,\n"+ + " \"FD\" : %d,\n"+ + " \"Collection Seconds\" : %d,\n"+ + " \"Data\" : [\n", + socketEventKeySize, socketEventValueSize, meta.Type, meta.FD, collectionSeconds) + + if errCode == 0 && rb != nil { + for sec := 0; sec < collectionSeconds; sec++ { + time.Sleep(time.Second) + rb.poll(0) + } + rb.poll(0) + } + + if rb != nil { + socketWriteCollectedEntries(w, getSocketRingbufCollector(rb.handle)) + } +} diff --git a/gotests/socket_ringbuf.go b/gotests/socket_ringbuf.go new file mode 100644 index 00000000..f3024d37 --- /dev/null +++ b/gotests/socket_ringbuf.go @@ -0,0 +1,208 @@ +package main + +/* +#cgo CFLAGS: -I../.local_libbpf -I../libbpf/include -I../libbpf/include/uapi -I../libbpf/src +#cgo LDFLAGS: -L../.local_libbpf -lbpf -lz -lelf + +#include +#include +#include +#include + +struct ring_buffer; +struct netdata_ringbuf_stats; +struct netdata_socket_ringbuf_ctx; + +int netdata_libbpf_get_error(const void *ptr); +struct netdata_socket_ringbuf_ctx *netdata_socket_ringbuf_ctx_new(struct netdata_ringbuf_stats *stats, uint64_t handle); +void netdata_socket_ringbuf_ctx_free(struct netdata_socket_ringbuf_ctx *ctx); +struct ring_buffer *netdata_socket_ring_buffer_new(int map_fd, struct netdata_socket_ringbuf_ctx *ctx); +int netdata_ring_buffer_poll(struct ring_buffer *rb, int timeout_ms); +uint64_t netdata_ringbuf_stats_samples(const struct netdata_ringbuf_stats *stats); +uint64_t netdata_ringbuf_stats_bytes(const struct netdata_ringbuf_stats *stats); +uint64_t netdata_ring_buffer_avail_data(const struct ring_buffer *rb); +uint64_t netdata_ring_buffer_size(const struct ring_buffer *rb); +void netdata_ring_buffer_free(struct ring_buffer *rb); +struct netdata_ringbuf_stats *netdata_ringbuf_stats_new(void); +void netdata_ringbuf_stats_free(struct netdata_ringbuf_stats *stats); +*/ +import "C" + +import ( + "sync" + "sync/atomic" + "unsafe" +) + +const ( + socketEventKeySize = socketIdxPidOffset + 4 + socketEventValueSize = socketValUDPBytesRecv + 8 + socketEventDataOffset = socketEventKeySize +) + +type socketRingbufCollector struct { + mu sync.Mutex + entries map[string]*socketEntry + order []string +} + +var ( + socketRingbufCollectorsMu sync.Mutex + socketRingbufNextHandle uint64 = 1 + socketRingbufCollectors = map[uint64]*socketRingbufCollector{} +) + +type socketRingBuffer struct { + ptr *C.struct_ring_buffer + stats *C.struct_netdata_ringbuf_stats + ctx *C.struct_netdata_socket_ringbuf_ctx + handle uint64 +} + +func registerSocketRingbufCollector(c *socketRingbufCollector) uint64 { + handle := atomic.AddUint64(&socketRingbufNextHandle, 1) + + socketRingbufCollectorsMu.Lock() + socketRingbufCollectors[handle] = c + socketRingbufCollectorsMu.Unlock() + + return handle +} + +func unregisterSocketRingbufCollector(handle uint64) { + socketRingbufCollectorsMu.Lock() + delete(socketRingbufCollectors, handle) + socketRingbufCollectorsMu.Unlock() +} + +func getSocketRingbufCollector(handle uint64) *socketRingbufCollector { + socketRingbufCollectorsMu.Lock() + defer socketRingbufCollectorsMu.Unlock() + return socketRingbufCollectors[handle] +} + +func (c *socketRingbufCollector) add(raw []byte) { + if len(raw) < socketEventValueSize { + return + } + + sample := socketDecodeKey(raw[:socketEventKeySize]) + socketAggregatePerCPU(&sample, raw[socketEventDataOffset:], socketEventValueSize, 1) + + key := string(raw[:socketEventKeySize]) + + c.mu.Lock() + defer c.mu.Unlock() + + if c.entries == nil { + c.entries = make(map[string]*socketEntry) + } + + if existing, ok := c.entries[key]; ok { + socketMergeEntry(existing, &sample) + return + } + + entry := sample + c.entries[key] = &entry + c.order = append(c.order, key) +} + +func socketMergeEntry(dst, src *socketEntry) { + if dst.name == "" && src.name != "" { + dst.name = src.name + dst.protocol = src.protocol + dst.family = src.family + } + + dst.tcpSentCalls += src.tcpSentCalls + dst.tcpRecvCalls += src.tcpRecvCalls + dst.tcpBytesSent += src.tcpBytesSent + dst.tcpBytesRecv += src.tcpBytesRecv + dst.tcpClose += src.tcpClose + dst.tcpRetransmit += src.tcpRetransmit + dst.tcpIPv4Connect += src.tcpIPv4Connect + dst.tcpIPv6Connect += src.tcpIPv6Connect + dst.udpSentCalls += src.udpSentCalls + dst.udpRecvCalls += src.udpRecvCalls + dst.udpBytesSent += src.udpBytesSent + dst.udpBytesRecv += src.udpBytesRecv + + if src.tcpState != 0 { + dst.tcpState = src.tcpState + } +} + +//export socketRingbufSample +func socketRingbufSample(handle C.uint64_t, data unsafe.Pointer, size C.size_t) { + collector := getSocketRingbufCollector(uint64(handle)) + if collector == nil || data == nil { + return + } + + raw := C.GoBytes(data, C.int(size)) + collector.add(raw) +} + +func newSocketRingBuffer(mapFD int) (*socketRingBuffer, int) { + stats := C.netdata_ringbuf_stats_new() + if stats == nil { + return nil, -int(C.ENOMEM) + } + + collector := &socketRingbufCollector{entries: make(map[string]*socketEntry)} + handle := registerSocketRingbufCollector(collector) + ctx := C.netdata_socket_ringbuf_ctx_new(stats, C.uint64_t(handle)) + if ctx == nil { + C.netdata_ringbuf_stats_free(stats) + unregisterSocketRingbufCollector(handle) + return nil, -int(C.ENOMEM) + } + + rb := C.netdata_socket_ring_buffer_new(C.int(mapFD), ctx) + if err := int(C.libbpf_get_error(unsafe.Pointer(rb))); err != 0 { + C.netdata_socket_ringbuf_ctx_free(ctx) + C.netdata_ringbuf_stats_free(stats) + unregisterSocketRingbufCollector(handle) + return nil, err + } + + return &socketRingBuffer{ptr: rb, stats: stats, ctx: ctx, handle: handle}, 0 +} + +func (rb *socketRingBuffer) free() { + if rb == nil { + return + } + + unregisterSocketRingbufCollector(rb.handle) + if rb.ptr != nil { + C.netdata_ring_buffer_free(rb.ptr) + } + if rb.ctx != nil { + C.netdata_socket_ringbuf_ctx_free(rb.ctx) + } + if rb.stats != nil { + C.netdata_ringbuf_stats_free(rb.stats) + } +} + +func (rb *socketRingBuffer) poll(timeoutMS int) int { + return int(C.netdata_ring_buffer_poll(rb.ptr, C.int(timeoutMS))) +} + +func (rb *socketRingBuffer) samples() uint64 { + return uint64(C.netdata_ringbuf_stats_samples(rb.stats)) +} + +func (rb *socketRingBuffer) bytes() uint64 { + return uint64(C.netdata_ringbuf_stats_bytes(rb.stats)) +} + +func (rb *socketRingBuffer) availData() uint64 { + return uint64(C.netdata_ring_buffer_avail_data(rb.ptr)) +} + +func (rb *socketRingBuffer) size() uint64 { + return uint64(C.netdata_ring_buffer_size(rb.ptr)) +} diff --git a/includes/netdata_socket.h b/includes/netdata_socket.h index 5972577e..8f83af83 100644 --- a/includes/netdata_socket.h +++ b/includes/netdata_socket.h @@ -6,6 +6,10 @@ // Conflict with CO-RE code // #include +#ifndef TASK_COMM_LEN +#define TASK_COMM_LEN 16 +#endif + /** * SOCKET */ diff --git a/includes/netdata_socket_arena.h b/includes/netdata_socket_arena.h new file mode 100644 index 00000000..8b989e5c --- /dev/null +++ b/includes/netdata_socket_arena.h @@ -0,0 +1,11 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef _NETDATA_SOCKET_ARENA_H_ +#define _NETDATA_SOCKET_ARENA_H_ 1 + +#include "netdata_socket_buffer.h" +#include "netdata_arena_common.h" + +NETDATA_ARENA_QUEUE_DECL(socket, struct netdata_socket_event_t, NETDATA_ARENA_EVENT_SLOTS); + +#endif /* _NETDATA_SOCKET_ARENA_H_ */ diff --git a/includes/netdata_socket_buffer.h b/includes/netdata_socket_buffer.h new file mode 100644 index 00000000..64642855 --- /dev/null +++ b/includes/netdata_socket_buffer.h @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef _NETDATA_SOCKET_BUFFER_H_ +#define _NETDATA_SOCKET_BUFFER_H_ 1 + +#include "netdata_socket.h" + +#define NETDATA_SOCKET_RINGBUF_SIZE (1 << 20) + +struct netdata_socket_event_t { + netdata_socket_idx_t idx; + netdata_socket_t data; +}; + +#endif /* _NETDATA_SOCKET_BUFFER_H_ */ diff --git a/kernel/Makefile b/kernel/Makefile index 5dc2ff21..605c697e 100644 --- a/kernel/Makefile +++ b/kernel/Makefile @@ -113,6 +113,7 @@ NETDATA_RINGBUF_APPS= cachestat_buffer \ dc_buffer \ dns_buffer \ fd_buffer \ + socket_buffer \ oomkill_buffer \ process_buffer \ shm_buffer \ @@ -129,6 +130,7 @@ NETDATA_ARENA_APPS= cachestat_arena \ dc_arena \ dns_arena \ fd_arena \ + socket_arena \ oomkill_arena \ process_arena \ shm_arena \ diff --git a/kernel/socket_arena_kern.c b/kernel/socket_arena_kern.c new file mode 100644 index 00000000..fe0580d0 --- /dev/null +++ b/kernel/socket_arena_kern.c @@ -0,0 +1,29 @@ +#define KBUILD_MODNAME "socket_arena_kern" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if (LINUX_VERSION_CODE > KERNEL_VERSION(4,11,0)) +#include +#else +#include +#endif + +#include "bpf_tracing.h" +#include "bpf_helpers.h" +#include "netdata_arena_common.h" +#include "netdata_socket_arena.h" + +struct netdata_socket_arena_state_t socket_arena_state __arena_global; + +#define NETDATA_BPF_RINGBUF_DEF(NAME, MAX_ENTRIES) NETDATA_BPF_ARENA_DEF(NAME, MAX_ENTRIES) +#define bpf_ringbuf_reserve(MAP, SIZE, FLAGS) netdata_socket_arena_reserve() +#define bpf_ringbuf_submit(EV, FLAGS) netdata_socket_arena_submit(EV) + +#include "socket_buffer_kern.c" diff --git a/kernel/socket_buffer_kern.c b/kernel/socket_buffer_kern.c new file mode 100644 index 00000000..72e735f0 --- /dev/null +++ b/kernel/socket_buffer_kern.c @@ -0,0 +1,493 @@ +#define KBUILD_MODNAME "socket_buffer_netdata" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if (LINUX_VERSION_CODE > KERNEL_VERSION(4,11,0)) +#include +#else +#include +#endif +#include "bpf_tracing.h" +#include "bpf_endian.h" +#include "bpf_helpers.h" +#include "netdata_ebpf.h" +#include "netdata_arena_common.h" +#include "netdata_socket_buffer.h" + +/************************************************************************************ + * + * Hash Table Section + * + ***********************************************************************************/ + +NETDATA_BPF_RINGBUF_DEF(socket_events, NETDATA_SOCKET_RINGBUF_SIZE); +NETDATA_BPF_PERCPU_ARRAY_DEF(tbl_global_sock, __u32, __u64, NETDATA_SOCKET_COUNTER); +NETDATA_BPF_PERCPU_HASH_DEF(tbl_nv_udp, __u64, void *, 4096); +NETDATA_BPF_HASH_DEF(tbl_lports, netdata_passive_connection_idx_t, netdata_passive_connection_t, 1024); +NETDATA_BPF_ARRAY_DEF(socket_ctrl, __u32, __u64, NETDATA_CONTROLLER_END); + +/************************************************************************************ + * + * Common Section + * + ***********************************************************************************/ + +static __always_inline __u16 set_idx_value(netdata_socket_idx_t *nsi, struct inet_sock *is) +{ + __u16 family; + + bpf_probe_read(&family, sizeof(u16), &is->sk.__sk_common.skc_family); + if (family == AF_INET) { + bpf_probe_read(&nsi->saddr.addr32[0], sizeof(u32), &is->inet_saddr); + bpf_probe_read(&nsi->daddr.addr32[0], sizeof(u32), &is->inet_daddr); + + if ((nsi->saddr.addr32[0] == 16777343 || nsi->daddr.addr32[0] == 16777343) || + (nsi->saddr.addr32[0] == 0 || nsi->daddr.addr32[0] == 0)) + return AF_UNSPEC; + } +#if IS_ENABLED(CONFIG_IPV6) + else if (family == AF_INET6) { + __u8 (*addr6)[16] = &is->sk.__sk_common.skc_v6_rcv_saddr.s6_addr; + bpf_probe_read(&nsi->saddr.addr8, sizeof(__u8) * 16, addr6); + + addr6 = &is->sk.__sk_common.skc_v6_daddr.s6_addr; + bpf_probe_read(&nsi->daddr.addr8, sizeof(__u8) * 16, addr6); + + if (((nsi->saddr.addr64[0] == 0) && (nsi->saddr.addr64[1] == 72057594037927936)) || + ((nsi->daddr.addr64[0] == 0) && (nsi->daddr.addr64[1] == 72057594037927936))) + return AF_UNSPEC; + + if (((nsi->saddr.addr64[0] == 0) && (nsi->saddr.addr64[1] == 0)) || + ((nsi->daddr.addr64[0] == 0) && (nsi->daddr.addr64[1] == 0))) + return AF_UNSPEC; + } +#endif + else { + return AF_UNSPEC; + } + + bpf_probe_read(&nsi->dport, sizeof(u16), &is->inet_dport); + if (nsi->dport == 0) + return AF_UNSPEC; + + __u32 tgid = 0; + nsi->pid = netdata_get_pid(&socket_ctrl, &tgid); + + return family; +} + +static __always_inline void update_socket_stats(netdata_socket_t __arena *ptr, + __u64 sent, + __u64 received, + __u32 retransmitted, + __u16 protocol) +{ + ptr->ct = bpf_ktime_get_ns(); + + if (sent) { + if (protocol == IPPROTO_TCP) { + ptr->tcp.call_tcp_sent += 1; + ptr->tcp.tcp_bytes_sent += sent; + ptr->tcp.retransmit += retransmitted; + } else { + ptr->udp.call_udp_sent += 1; + ptr->udp.udp_bytes_sent += sent; + } + } + + if (received) { + if (protocol == IPPROTO_TCP) { + ptr->tcp.call_tcp_received += 1; + ptr->tcp.tcp_bytes_received += received; + } else { + ptr->udp.call_udp_received += 1; + ptr->udp.udp_bytes_received += received; + } + } +} + +static __always_inline void update_socket_common(netdata_socket_t __arena *data, __u16 protocol, __u16 family) +{ + char comm[TASK_COMM_LEN]; + +#if (LINUX_VERSION_CODE > KERNEL_VERSION(4,11,0)) + bpf_get_current_comm(comm, TASK_COMM_LEN); + #pragma unroll + for (int i = 0; i < TASK_COMM_LEN; i++) + data->name[i] = comm[i]; +#else + data->name[0] = '\0'; +#endif + + data->first = bpf_ktime_get_ns(); + data->protocol = protocol; + data->family = family; +} + +static __always_inline struct netdata_socket_event_t __arena * +socket_event_reserve(struct pt_regs *ctx, __u16 *family, netdata_socket_idx_t *idx, __u16 protocol) +{ + struct inet_sock *is = inet_sk((struct sock *)PT_REGS_PARM1(ctx)); + struct netdata_socket_event_t __arena *ev; + + if (!is) + return NULL; + + *family = set_idx_value(idx, is); + if (*family == AF_UNSPEC) + return NULL; + + ev = bpf_ringbuf_reserve(&socket_events, sizeof(*ev), 0); + if (!ev) + return NULL; + + __builtin_memset(ev, 0, sizeof(*ev)); + ev->idx = *idx; + update_socket_common(&ev->data, protocol, *family); + + return ev; +} + +static __always_inline void emit_socket_event(struct pt_regs *ctx, + __u64 sent, + __u64 received, + __u32 retransmitted, + __u16 protocol, + __u32 state) +{ + netdata_socket_idx_t idx = { }; + __u16 family; + struct netdata_socket_event_t __arena *ev = socket_event_reserve(ctx, &family, &idx, protocol); + + if (!ev) + return; + + ev->data.tcp.state = state; + update_socket_stats(&ev->data, sent, received, retransmitted, protocol); + bpf_ringbuf_submit(ev, 0); +} + +static __always_inline void emit_socket_close_event(struct pt_regs *ctx) +{ + netdata_socket_idx_t idx = { }; + __u16 family; + struct netdata_socket_event_t __arena *ev = socket_event_reserve(ctx, &family, &idx, IPPROTO_TCP); + + if (!ev) + return; + + ev->data.tcp.close = 1; + bpf_ringbuf_submit(ev, 0); +} + +static __always_inline void emit_socket_connect_event(struct pt_regs *ctx) +{ + netdata_socket_idx_t idx = { }; + __u16 family; + struct netdata_socket_event_t __arena *ev = socket_event_reserve(ctx, &family, &idx, IPPROTO_TCP); + + if (!ev) + return; + + if (family == AF_INET) + ev->data.tcp.ipv4_connect = 1; + else + ev->data.tcp.ipv6_connect = 1; + + bpf_ringbuf_submit(ev, 0); +} + +static __always_inline void emit_socket_external_origin_event(struct sock *sk, __u16 protocol) +{ + __u16 family; + netdata_socket_idx_t nv_idx = { }; + struct netdata_socket_event_t __arena *ev; + struct inet_sock *is = inet_sk(sk); + + if (!is) + return; + + family = set_idx_value(&nv_idx, is); + if (family == AF_UNSPEC) + return; + + ev = bpf_ringbuf_reserve(&socket_events, sizeof(*ev), 0); + if (!ev) + return; + + __builtin_memset(ev, 0, sizeof(*ev)); + ev->idx = nv_idx; + update_socket_common(&ev->data, protocol, family); + ev->data.external_origin = 1; + bpf_ringbuf_submit(ev, 0); +} + +#if (LINUX_VERSION_CODE < KERNEL_VERSION(5,6,0)) +static __always_inline u8 select_protocol(struct sock *sk) +{ + u8 protocol = 0; + + int gso_max_segs_offset = offsetof(struct sock, sk_gso_max_segs); + int sk_lingertime_offset = offsetof(struct sock, sk_lingertime); + + if (sk_lingertime_offset - gso_max_segs_offset == 4) +#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ + bpf_probe_read(&protocol, sizeof(u8), (void *)((long)&sk->sk_gso_max_segs) - 3); + else + bpf_probe_read(&protocol, sizeof(u8), (void *)((long)&sk->sk_wmem_queued) - 3); +#elif __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ + bpf_probe_read(&protocol, sizeof(u8), (void *)((long)&sk->sk_gso_max_segs) - 1); + else + bpf_probe_read(&protocol, sizeof(u8), (void *)((long)&sk->sk_wmem_queued) - 1); +#endif + + return protocol; +} +#endif // Kernel version 5.6.0 + +/************************************************************************************ + * + * General Socket Section + * + ***********************************************************************************/ + +SEC("kretprobe/inet_csk_accept") +int netdata_inet_csk_accept(struct pt_regs *ctx) +{ + struct sock *sk = (struct sock *)PT_REGS_RC(ctx); + if (!sk) + return 0; + + u16 protocol; +#if (LINUX_VERSION_CODE >= KERNEL_VERSION(5,6,0)) + bpf_probe_read(&protocol, sizeof(u16), &sk->sk_protocol); +#else + protocol = (u16) select_protocol(sk); +#endif + + if (protocol != IPPROTO_TCP && protocol != IPPROTO_UDP) + return 0; + + netdata_passive_connection_idx_t idx = { }; + idx.protocol = protocol; + bpf_probe_read(&idx.port, sizeof(u16), &sk->__sk_common.skc_num); + + __u64 pid_tgid = bpf_get_current_pid_tgid(); + __u32 tgid = pid_tgid >> 32; + __u32 pid = (__u32)pid_tgid; + + netdata_passive_connection_t *value = (netdata_passive_connection_t *)bpf_map_lookup_elem(&tbl_lports, &idx); + if (value) { + value->tgid = tgid; + value->pid = pid; + libnetdata_update_u64(&value->counter, 1); + } else { + netdata_passive_connection_t data = { }; + data.tgid = tgid; + data.pid = pid; + data.counter = 1; + bpf_map_update_elem(&tbl_lports, &idx, &data, BPF_ANY); + } + + emit_socket_external_origin_event(sk, protocol); + return 0; +} + +/************************************************************************************ + * + * TCP Section + * + ***********************************************************************************/ + +#if NETDATASEL < 2 +SEC("kretprobe/tcp_sendmsg") +#else +SEC("kprobe/tcp_sendmsg") +#endif +int netdata_tcp_sendmsg(struct pt_regs *ctx) +{ + libnetdata_update_global(&tbl_global_sock, NETDATA_KEY_CALLS_TCP_SENDMSG, 1); + + size_t sent; +#if NETDATASEL < 2 + int ret = (int)PT_REGS_RC(ctx); + if (ret < 0) { + libnetdata_update_global(&tbl_global_sock, NETDATA_KEY_ERROR_TCP_SENDMSG, 1); + return 0; + } + + sent = (size_t) ret; +#else + sent = (size_t)PT_REGS_PARM3(ctx); +#endif + + libnetdata_update_global(&tbl_global_sock, NETDATA_KEY_BYTES_TCP_SENDMSG, sent); + emit_socket_event(ctx, sent, 0, 0, IPPROTO_TCP, 0); + + return 0; +} + +SEC("kprobe/tcp_retransmit_skb") +int netdata_tcp_retransmit_skb(struct pt_regs *ctx) +{ + libnetdata_update_global(&tbl_global_sock, NETDATA_KEY_TCP_RETRANSMIT, 1); + emit_socket_event(ctx, 0, 0, 1, IPPROTO_TCP, 0); + + return 0; +} + +SEC("kprobe/tcp_set_state") +int netdata_tcp_set_state(struct pt_regs *ctx) +{ + libnetdata_update_global(&tbl_global_sock, NETDATA_KEY_CALLS_TCP_SET_STATE, 1); + int state = PT_REGS_PARM2(ctx); + + emit_socket_event(ctx, 0, 0, 1, IPPROTO_TCP, state); + return 0; +} + +// https://elixir.bootlin.com/linux/v5.6.14/source/net/ipv4/tcp.c#L1528 +SEC("kprobe/tcp_cleanup_rbuf") +int netdata_tcp_cleanup_rbuf(struct pt_regs *ctx) +{ + libnetdata_update_global(&tbl_global_sock, NETDATA_KEY_CALLS_TCP_CLEANUP_RBUF, 1); + + int copied = (int)PT_REGS_PARM2(ctx); + if (copied < 0) { + libnetdata_update_global(&tbl_global_sock, NETDATA_KEY_ERROR_TCP_CLEANUP_RBUF, 1); + return 0; + } + + __u64 received = (__u64)PT_REGS_PARM2(ctx); + libnetdata_update_global(&tbl_global_sock, NETDATA_KEY_BYTES_TCP_CLEANUP_RBUF, received); + emit_socket_event(ctx, 0, (__u64)copied, 1, IPPROTO_TCP, 0); + + return 0; +} + +SEC("kprobe/tcp_close") +int netdata_tcp_close(struct pt_regs *ctx) +{ + libnetdata_update_global(&tbl_global_sock, NETDATA_KEY_CALLS_TCP_CLOSE, 1); + struct inet_sock *is = inet_sk((struct sock *)PT_REGS_PARM1(ctx)); + if (!is) + return 0; + + emit_socket_close_event(ctx); + return 0; +} + +#if NETDATASEL < 2 +SEC("kretprobe/tcp_v4_connect") +#else +SEC("kprobe/tcp_v4_connect") +#endif +int netdata_tcp_v4_connect(struct pt_regs *ctx) +{ + libnetdata_update_global(&tbl_global_sock, NETDATA_KEY_CALLS_TCP_CONNECT_IPV4, 1); + +#if NETDATASEL < 2 + int ret = (int)PT_REGS_RC(ctx); + if (ret < 0) { + libnetdata_update_global(&tbl_global_sock, NETDATA_KEY_ERROR_TCP_CONNECT_IPV4, 1); + return 0; + } +#endif + + emit_socket_connect_event(ctx); + return 0; +} + +#if NETDATASEL < 2 +SEC("kretprobe/tcp_v6_connect") +#else +SEC("kprobe/tcp_v6_connect") +#endif +int netdata_tcp_v6_connect(struct pt_regs *ctx) +{ + libnetdata_update_global(&tbl_global_sock, NETDATA_KEY_CALLS_TCP_CONNECT_IPV6, 1); +#if NETDATASEL < 2 + int ret = (int)PT_REGS_RC(ctx); + if (ret < 0) { + libnetdata_update_global(&tbl_global_sock, NETDATA_KEY_ERROR_TCP_CONNECT_IPV6, 1); + return 0; + } +#endif + + emit_socket_connect_event(ctx); + return 0; +} + +/************************************************************************************ + * + * UDP Section + * + ***********************************************************************************/ + +SEC("kprobe/udp_recvmsg") +int trace_udp_recvmsg(struct pt_regs *ctx) +{ + libnetdata_update_global(&tbl_global_sock, NETDATA_KEY_CALLS_UDP_RECVMSG, 1); + + __u64 pid_tgid = bpf_get_current_pid_tgid(); + struct sock *sk = (struct sock *)PT_REGS_PARM1(ctx); + if (!sk) + return 0; + + bpf_map_update_elem(&tbl_nv_udp, &pid_tgid, &sk, BPF_ANY); + return 0; +} + +SEC("kretprobe/udp_recvmsg") +int trace_udp_ret_recvmsg(struct pt_regs *ctx) +{ + __u64 pid_tgid = bpf_get_current_pid_tgid(); + struct sock **skpp = bpf_map_lookup_elem(&tbl_nv_udp, &pid_tgid); + if (skpp == 0) + return 0; + + bpf_map_delete_elem(&tbl_nv_udp, &pid_tgid); + __u64 received = (__u64)PT_REGS_RC(ctx); + + libnetdata_update_global(&tbl_global_sock, NETDATA_KEY_BYTES_UDP_RECVMSG, received); + emit_socket_event(ctx, 0, received, 0, IPPROTO_UDP, 0); + + return 0; +} + +// https://elixir.bootlin.com/linux/v5.6.14/source/net/ipv4/udp.c#L965 +#if NETDATASEL < 2 +SEC("kretprobe/udp_sendmsg") +#else +SEC("kprobe/udp_sendmsg") +#endif +int trace_udp_sendmsg(struct pt_regs *ctx) +{ + libnetdata_update_global(&tbl_global_sock, NETDATA_KEY_CALLS_UDP_SENDMSG, 1); + + size_t sent; +#if NETDATASEL < 2 + int ret = (int)PT_REGS_RC(ctx); + if (ret < 0) { + libnetdata_update_global(&tbl_global_sock, NETDATA_KEY_ERROR_UDP_SENDMSG, 1); + sent = 0; + } else + sent = (size_t)ret; +#else + sent = (size_t)PT_REGS_PARM3(ctx); +#endif + + libnetdata_update_global(&tbl_global_sock, NETDATA_KEY_BYTES_UDP_SENDMSG, (__u64)sent); + emit_socket_event(ctx, sent, 0, 0, IPPROTO_UDP, 0); + + return 0; +} + +char _license[] SEC("license") = "GPL"; diff --git a/tests/tester_socket.c b/tests/tester_socket.c index db8d2000..39512fb7 100644 --- a/tests/tester_socket.c +++ b/tests/tester_socket.c @@ -10,28 +10,25 @@ #include +#include "../includes/netdata_socket_buffer.h" #include "tester_socket.h" /* * Binary layout of netdata_socket_idx_t (40 bytes). * Offsets must match kernel struct in includes/netdata_socket.h. */ -#define SOCKET_IDX_SADDR_OFFSET 0 /* union netdata_ip: 16 bytes */ -#define SOCKET_IDX_DADDR_OFFSET 16 /* union netdata_ip: 16 bytes */ -#define SOCKET_IDX_DPORT_OFFSET 32 /* __u16: 2 bytes */ -#define SOCKET_IDX_PID_OFFSET 36 /* __u32: 4 bytes (after 2B pad) */ -#define SOCKET_IDX_SIZE 40 +#define SOCKET_IDX_SADDR_OFFSET 0 /* union netdata_ip: 16 bytes */ +#define SOCKET_IDX_DADDR_OFFSET 16 /* union netdata_ip: 16 bytes */ +#define SOCKET_IDX_DPORT_OFFSET 32 /* __u16: 2 bytes */ +#define SOCKET_IDX_PID_OFFSET 36 /* __u32: 4 bytes (after 2B implicit pad) */ /* * Binary layout of netdata_socket_t (112 bytes). * Offsets must match kernel struct in includes/netdata_socket.h. */ #define SOCKET_VAL_NAME_OFFSET 0 /* char[16] */ -#define SOCKET_VAL_FIRST_OFFSET 16 /* __u64 */ -#define SOCKET_VAL_CT_OFFSET 24 /* __u64 */ #define SOCKET_VAL_PROTOCOL_OFFSET 32 /* __u16 */ #define SOCKET_VAL_FAMILY_OFFSET 34 /* __u16 */ -#define SOCKET_VAL_EXTORIGIN_OFFSET 36 /* __u32 */ /* tcp sub-struct starts at offset 40 */ #define SOCKET_VAL_TCP_SENT_CALLS 40 /* __u32 */ #define SOCKET_VAL_TCP_RECV_CALLS 44 /* __u32 */ @@ -48,7 +45,6 @@ #define SOCKET_VAL_UDP_RECV_CALLS 92 /* __u32 */ #define SOCKET_VAL_UDP_BYTES_SENT 96 /* __u64 */ #define SOCKET_VAL_UDP_BYTES_RECV 104 /* __u64 */ -#define SOCKET_VAL_SIZE 112 #define SOCKET_NAME_LEN 16 #define SOCKET_SLEEP_SEC 5 @@ -233,26 +229,134 @@ static void socket_write_entry_json(FILE *out, const socket_entry_t *e) (unsigned long long)e->udp_bytes_recv); } +typedef struct { + socket_entry_t *entries; + size_t size; + size_t capacity; +} socket_event_collection_t; + +static int socket_entries_match(const socket_entry_t *a, const socket_entry_t *b) +{ + return memcmp(a->saddr, b->saddr, sizeof(a->saddr)) == 0 && + memcmp(a->daddr, b->daddr, sizeof(a->daddr)) == 0 && + a->dport == b->dport && + a->pid == b->pid; +} + +static void socket_merge_entry(socket_entry_t *dst, const socket_entry_t *src) +{ + if (dst->name[0] == '\0' && src->name[0] != '\0') { + memcpy(dst->name, src->name, sizeof(dst->name)); + dst->protocol = src->protocol; + dst->family = src->family; + } + + dst->tcp_sent_calls += src->tcp_sent_calls; + dst->tcp_recv_calls += src->tcp_recv_calls; + dst->tcp_bytes_sent += src->tcp_bytes_sent; + dst->tcp_bytes_recv += src->tcp_bytes_recv; + dst->tcp_close += src->tcp_close; + dst->tcp_retransmit += src->tcp_retransmit; + dst->tcp_ipv4_connect += src->tcp_ipv4_connect; + dst->tcp_ipv6_connect += src->tcp_ipv6_connect; + dst->udp_sent_calls += src->udp_sent_calls; + dst->udp_recv_calls += src->udp_recv_calls; + dst->udp_bytes_sent += src->udp_bytes_sent; + dst->udp_bytes_recv += src->udp_bytes_recv; + + if (src->tcp_state) + dst->tcp_state = src->tcp_state; +} + +static socket_entry_t *socket_collection_get(socket_event_collection_t *coll, const socket_entry_t *sample, int *created) +{ + size_t i; + + for (i = 0; i < coll->size; i++) { + if (socket_entries_match(&coll->entries[i], sample)) + { + if (created) + *created = 0; + return &coll->entries[i]; + } + } + + if (coll->size == coll->capacity) { + size_t new_capacity = coll->capacity ? coll->capacity * 2 : 32; + socket_entry_t *new_entries = realloc(coll->entries, new_capacity * sizeof(*new_entries)); + if (!new_entries) + return NULL; + + coll->entries = new_entries; + coll->capacity = new_capacity; + } + + coll->entries[coll->size] = *sample; + if (created) + *created = 1; + return &coll->entries[coll->size++]; +} + +static int socket_ringbuf_sample_cb(void *ctx, void *data, size_t size) +{ + socket_event_collection_t *coll = ctx; + socket_entry_t sample = { }; + struct netdata_socket_event_t *ev = data; + + (void)size; + + if (!coll || !ev) + return 0; + + socket_decode_key(&sample, (const uint8_t *)&ev->idx); + socket_aggregate_percpu(&sample, (const uint8_t *)&ev->data, sizeof(ev->data), 1); + + int created = 0; + socket_entry_t *entry = socket_collection_get(coll, &sample, &created); + if (entry && !created) + socket_merge_entry(entry, &sample); + + return 0; +} + +static void socket_collection_free(socket_event_collection_t *coll) +{ + free(coll->entries); + coll->entries = NULL; + coll->size = 0; + coll->capacity = 0; +} + /* ------------------------------------------------------------------------- * Public interface. * -------------------------------------------------------------------------*/ -int ebpf_object_has_socket_table(struct bpf_object *obj) +struct bpf_map *ebpf_find_socket_table(struct bpf_object *obj) { struct bpf_map *map; bpf_object__for_each_map(map, obj) { if (!strcmp(bpf_map__name(map), "tbl_nd_socket")) - return 1; + return map; } - return 0; + return NULL; +} + +struct bpf_map *ebpf_find_socket_events(struct bpf_object *obj) +{ + struct bpf_map *map; + + bpf_object__for_each_map(map, obj) { + if (!strcmp(bpf_map__name(map), "socket_events")) + return map; + } + + return NULL; } -void ebpf_socket_table_tester(struct bpf_object *obj, FILE *out, int iterations) +void ebpf_socket_table_tester(struct bpf_map *map, FILE *out, int iterations) { - struct bpf_map *map = NULL; - struct bpf_map *m; int fd; uint32_t key_size, value_size, map_type; int ncpus; @@ -260,22 +364,9 @@ void ebpf_socket_table_tester(struct bpf_object *obj, FILE *out, int iterations) uint8_t *key_buf = NULL; uint8_t *next_key = NULL; uint8_t *percpu_buf = NULL; - int entry_count = 0; - int first = 1; + int first = 1; int collection_seconds = iterations * SOCKET_SLEEP_SEC; - bpf_object__for_each_map(m, obj) { - if (!strcmp(bpf_map__name(m), "tbl_nd_socket")) { - map = m; - break; - } - } - - if (!map) { - fprintf(out, " \"Total tables\" : 0\n"); - return; - } - fd = bpf_map__fd(map); #ifdef LIBBPF_MAJOR_VERSION map_type = (uint32_t)bpf_map__type(map); @@ -332,7 +423,6 @@ void ebpf_socket_table_tester(struct bpf_object *obj, FILE *out, int iterations) socket_write_entry_json(out, &entry); first = 0; - entry_count++; advance: memcpy(key_buf, next_key, key_size); @@ -342,14 +432,71 @@ void ebpf_socket_table_tester(struct bpf_object *obj, FILE *out, int iterations) if (!first) fprintf(out, "\n"); + cleanup: + free(key_buf); + free(next_key); + free(percpu_buf); +} + +void ebpf_socket_ringbuf_tester(struct bpf_map *map, FILE *out, int iterations) +{ + int fd; + uint32_t type; + int collection_seconds = iterations * SOCKET_SLEEP_SEC; + socket_event_collection_t coll = { }; + struct ring_buffer *rb = NULL; + uint32_t value_size = (uint32_t)sizeof(netdata_socket_t); + + fd = bpf_map__fd(map); +#ifdef LIBBPF_MAJOR_VERSION + type = (uint32_t)bpf_map__type(map); +#else + { + const struct bpf_map_def *def = bpf_map__def(map); + type = (uint32_t)def->type; + } +#endif + + fprintf(out, + " \"socket_connections\" : {\n" + " \"Info\" : { \"Length\" : { \"Key\" : %u, \"Value\" : %u},\n" + " \"Type\" : %u,\n" + " \"FD\" : %d,\n" + " \"Collection Seconds\" : %d,\n" + " \"Data\" : [\n", + (unsigned)sizeof(netdata_socket_idx_t), value_size, + type, fd, collection_seconds); + + rb = ring_buffer__new(fd, socket_ringbuf_sample_cb, &coll, NULL); + if (rb) { + int sec; + + for (sec = 0; sec < collection_seconds; sec++) { + sleep(1); + ring_buffer__poll(rb, 0); + } + + ring_buffer__poll(rb, 0); + } + + if (coll.size > 0) { + size_t i; + + for (i = 0; i < coll.size; i++) { + if (i) + fprintf(out, ",\n"); + socket_write_entry_json(out, &coll.entries[i]); + } + fprintf(out, "\n"); + } + fprintf(out, " ]\n" " }\n" " },\n" " \"Total tables\" : 1\n"); -cleanup: - free(key_buf); - free(next_key); - free(percpu_buf); + if (rb) + ring_buffer__free(rb); + socket_collection_free(&coll); } diff --git a/tests/tester_socket.h b/tests/tester_socket.h index 38891e1c..74dbf091 100644 --- a/tests/tester_socket.h +++ b/tests/tester_socket.h @@ -4,8 +4,11 @@ #include struct bpf_object; +struct bpf_map; -int ebpf_object_has_socket_table(struct bpf_object *obj); -void ebpf_socket_table_tester(struct bpf_object *obj, FILE *out, int iterations); +struct bpf_map *ebpf_find_socket_table(struct bpf_object *obj); +struct bpf_map *ebpf_find_socket_events(struct bpf_object *obj); +void ebpf_socket_table_tester(struct bpf_map *map, FILE *out, int iterations); +void ebpf_socket_ringbuf_tester(struct bpf_map *map, FILE *out, int iterations); #endif /* NETDATA_LEGACY_TESTER_SOCKET */ diff --git a/tests/tester_user.c b/tests/tester_user.c index c0c61885..90c00348 100644 --- a/tests/tester_user.c +++ b/tests/tester_user.c @@ -105,6 +105,8 @@ ebpf_module_t ebpf_modules[] = { { .kernels = NETDATA_V3_10 | NETDATA_V4_14 | NETDATA_V4_16 | NETDATA_V4_18 | NETDATA_V5_4 | NETDATA_V5_14, .flags = NETDATA_FLAG_SYNC, .name = "msync", .update_names = NULL, .ctrl_table = NULL }, { .kernels = NETDATA_V3_10 | NETDATA_V4_14 | NETDATA_V4_16 | NETDATA_V4_18 | NETDATA_V5_4 | NETDATA_V5_14, + .buffer_kernels = NETDATA_V5_10 | NETDATA_V5_11 | NETDATA_V5_14 | NETDATA_V5_15 | NETDATA_V5_16 | NETDATA_V6_8 | NETDATA_V6_12, + .arena_kernels = NETDATA_V5_10 | NETDATA_V5_11 | NETDATA_V5_14 | NETDATA_V5_15 | NETDATA_V5_16 | NETDATA_V6_8 | NETDATA_V6_12, .flags = NETDATA_FLAG_SOCKET, .name = "socket", .update_names = NULL, .ctrl_table = "socket_ctrl" }, { .kernels = NETDATA_V3_10 | NETDATA_V4_14 | NETDATA_V4_16 | NETDATA_V4_18 | NETDATA_V5_4 | NETDATA_V5_14, .buffer_kernels = NETDATA_V5_10 | NETDATA_V5_11 | NETDATA_V5_14 | NETDATA_V5_15 | NETDATA_V5_16 | NETDATA_V6_8 | NETDATA_V6_12, @@ -1513,7 +1515,7 @@ static void ebpf_controller_json(ebpf_table_data_t *values, int fd) value + zero, value, zero); } -static void ebpf_test_ringbuf_map(const char *name, int fd, uint32_t type, uint32_t key_size, uint32_t value_size) +static void ebpf_test_ringbuf_map(struct bpf_map *map, const char *name, int fd, uint32_t type, uint32_t key_size, uint32_t value_size) { char error_buffer[128]; const char *mode = ebpf_map_is_user_ringbuf(type) ? "user_ringbuf_producer" : "ringbuf_consumer"; @@ -1524,6 +1526,11 @@ static void ebpf_test_ringbuf_map(const char *name, int fd, uint32_t type, uint3 int setup_error = 0; int i; + if (!strcmp(name, "socket_events")) { + ebpf_socket_ringbuf_tester(map, stdlog, end_iteration); + return; + } + fprintf(stdlog, " \"%s\" : {\n" " \"Info\" : { \"Length\" : { \"Key\" : %u, \"Value\" : %u},\n" @@ -1631,7 +1638,7 @@ static void ebpf_test_maps(struct bpf_object *obj, char *ctrl) value_size = def->value_size; #endif if (!ebpf_map_supports_key_value_io(type)) { - ebpf_test_ringbuf_map(name, fd, type, key_size, value_size); + ebpf_test_ringbuf_map(map, name, fd, type, key_size, value_size); fprintf(stdlog, " ]\n" " }\n" " },\n"); @@ -1805,8 +1812,13 @@ static char *ebpf_tester(char *filename, ebpf_specify_name_t *names, uint32_t ma } if (!errors && maps) { - if (ebpf_object_has_socket_table(obj)) { - ebpf_socket_table_tester(obj, stdlog, end_iteration); + struct bpf_map *socket_events = ebpf_find_socket_events(obj); + struct bpf_map *socket_table = ebpf_find_socket_table(obj); + + if (socket_events) { + ebpf_socket_ringbuf_tester(socket_events, stdlog, end_iteration); + } else if (socket_table) { + ebpf_socket_table_tester(socket_table, stdlog, end_iteration); } else { if (ctrl) { ebpf_fill_ctrl(obj, ctrl); @@ -1844,7 +1856,7 @@ static char *ebpf_tester(char *filename, ebpf_specify_name_t *names, uint32_t ma static int ebpf_module_has_buffer(const char *name) { static const char *buffer_modules[] = { - "cachestat", "dc", "fd", "oomkill", "process", "shm", "swap", "vfs", "dns", NULL + "cachestat", "dc", "fd", "oomkill", "process", "shm", "swap", "vfs", "dns", "socket", NULL }; int i; for (i = 0; buffer_modules[i]; i++) { @@ -1866,6 +1878,7 @@ static int ebpf_module_has_arena(const char *name) "swap", "vfs", "dns", + "socket", NULL }; size_t i; @@ -2005,8 +2018,8 @@ static void ebpf_help() "--content Test content stored inside hash tables.\n" "--iteration Number of iterations when content is read, default value is 1.\n" "--pid Specify the number that identifies PID that will be monitored: 0 - Real Parent PID (Default), 1 - Parent PID, 2 - All PID, and 3 - Ignore PID (ring buffer mode).\n" - "--buffer Test ring buffer versions of collectors (cachestat, dc, fd, oomkill, process, shm, swap, vfs, dns).\n" - "--arena Test arena versions of collectors (cachestat, dc, fd, oomkill, process, shm, swap, vfs, dns).\n\n" + "--buffer Test ring buffer versions of collectors (cachestat, dc, fd, oomkill, process, shm, swap, vfs, dns, socket).\n" + "--arena Test arena versions of collectors (cachestat, dc, fd, oomkill, process, shm, swap, vfs, dns, socket).\n\n" "You can also specify an unique eBPF program developed by Netdata with the following\n" "options:\n" "--btrfs Latency for btrfs.\n" From 17c0cf5483b94c6a73f6f666d8bb354f85976aac Mon Sep 17 00:00:00 2001 From: thiagoftsm Date: Mon, 4 May 2026 21:01:36 +0000 Subject: [PATCH 4/5] tests: Fix issues with socket --- gotests/socket.go | 25 +++++++++--- gotests/socket_ringbuf.go | 60 +++++++++++++++++++++++++++ tests/tester_socket.c | 86 +++++++++++++++++++++++++++++++++------ 3 files changed, 152 insertions(+), 19 deletions(-) diff --git a/gotests/socket.go b/gotests/socket.go index ee054716..95013f2c 100644 --- a/gotests/socket.go +++ b/gotests/socket.go @@ -271,12 +271,17 @@ func runSocketRingBufferTester(w io.Writer, obj *bpfObject, iterations int) { } meta := m.meta() - rb, errCode := newSocketRingBuffer(meta.FD) - if rb != nil { - defer rb.free() - } - collectionSeconds := iterations * socketSleepSec + collector := &socketRingbufCollector{entries: make(map[string]*socketEntry)} + var rb *socketRingBuffer + errCode := 0 + + if meta.Type == bpfMapTypeRingBuf { + rb, errCode = newSocketRingBuffer(meta.FD) + if rb != nil { + defer rb.free() + } + } fmt.Fprintf(w, " \"socket_connections\" : {\n"+ @@ -293,9 +298,17 @@ func runSocketRingBufferTester(w io.Writer, obj *bpfObject, iterations int) { rb.poll(0) } rb.poll(0) + socketWriteCollectedEntries(w, getSocketRingbufCollector(rb.handle)) + } else { + for sec := 0; sec < collectionSeconds; sec++ { + time.Sleep(time.Second) + } + if collectSocketArenaEntries(meta.FD, collector) == 0 { + socketWriteCollectedEntries(w, collector) + } } if rb != nil { - socketWriteCollectedEntries(w, getSocketRingbufCollector(rb.handle)) + return } } diff --git a/gotests/socket_ringbuf.go b/gotests/socket_ringbuf.go index f3024d37..3b800ecc 100644 --- a/gotests/socket_ringbuf.go +++ b/gotests/socket_ringbuf.go @@ -29,8 +29,10 @@ void netdata_ringbuf_stats_free(struct netdata_ringbuf_stats *stats); import "C" import ( + "encoding/binary" "sync" "sync/atomic" + "syscall" "unsafe" ) @@ -38,6 +40,10 @@ const ( socketEventKeySize = socketIdxPidOffset + 4 socketEventValueSize = socketValUDPBytesRecv + 8 socketEventDataOffset = socketEventKeySize + socketArenaMapPages = 256 + socketArenaSlotCount = 1024 + socketArenaSlotSize = socketEventKeySize + socketEventValueSize + socketArenaStateHeader = 4 ) type socketRingbufCollector struct { @@ -108,6 +114,60 @@ func (c *socketRingbufCollector) add(raw []byte) { c.order = append(c.order, key) } +func socketArenaSlotEmpty(raw []byte) bool { + for _, b := range raw { + if b != 0 { + return false + } + } + return true +} + +func collectSocketArenaEntries(mapFD int, collector *socketRingbufCollector) int { + pageSize := syscall.Getpagesize() + arenaSize := socketArenaMapPages * pageSize + mapped, err := syscall.Mmap(mapFD, 0, arenaSize, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED) + if err != nil { + if errno, ok := err.(syscall.Errno); ok { + return -int(errno) + } + return -1 + } + defer syscall.Munmap(mapped) + + if len(mapped) < socketArenaStateHeader { + return -1 + } + + head := binary.LittleEndian.Uint32(mapped[:socketArenaStateHeader]) + if head == 0 { + return 0 + } + + start := uint32(0) + if head > socketArenaSlotCount { + start = head - socketArenaSlotCount + } + + for i := start; i < head; i++ { + slot := int(i % socketArenaSlotCount) + base := socketArenaStateHeader + slot*socketArenaSlotSize + end := base + socketArenaSlotSize + if end > len(mapped) { + break + } + + raw := mapped[base:end] + if socketArenaSlotEmpty(raw) { + continue + } + + collector.add(raw) + } + + return 0 +} + func socketMergeEntry(dst, src *socketEntry) { if dst.name == "" && src.name != "" { dst.name = src.name diff --git a/tests/tester_socket.c b/tests/tester_socket.c index 39512fb7..c6868e66 100644 --- a/tests/tester_socket.c +++ b/tests/tester_socket.c @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -46,6 +47,9 @@ #define SOCKET_VAL_UDP_BYTES_SENT 96 /* __u64 */ #define SOCKET_VAL_UDP_BYTES_RECV 104 /* __u64 */ +#define SOCKET_ARENA_MAP_PAGES 256 +#define SOCKET_ARENA_SLOT_COUNT 1024 + #define SOCKET_NAME_LEN 16 #define SOCKET_SLEEP_SEC 5 @@ -235,6 +239,11 @@ typedef struct { size_t capacity; } socket_event_collection_t; +typedef struct { + uint32_t head; + struct netdata_socket_event_t events[SOCKET_ARENA_SLOT_COUNT]; +} socket_arena_state_t; + static int socket_entries_match(const socket_entry_t *a, const socket_entry_t *b) { return memcmp(a->saddr, b->saddr, sizeof(a->saddr)) == 0 && @@ -297,25 +306,17 @@ static socket_entry_t *socket_collection_get(socket_event_collection_t *coll, co return &coll->entries[coll->size++]; } +static void socket_collect_event(socket_event_collection_t *coll, const struct netdata_socket_event_t *ev); + static int socket_ringbuf_sample_cb(void *ctx, void *data, size_t size) { socket_event_collection_t *coll = ctx; - socket_entry_t sample = { }; - struct netdata_socket_event_t *ev = data; (void)size; - - if (!coll || !ev) + if (!coll || !data) return 0; - socket_decode_key(&sample, (const uint8_t *)&ev->idx); - socket_aggregate_percpu(&sample, (const uint8_t *)&ev->data, sizeof(ev->data), 1); - - int created = 0; - socket_entry_t *entry = socket_collection_get(coll, &sample, &created); - if (entry && !created) - socket_merge_entry(entry, &sample); - + socket_collect_event(coll, (const struct netdata_socket_event_t *)data); return 0; } @@ -327,6 +328,47 @@ static void socket_collection_free(socket_event_collection_t *coll) coll->capacity = 0; } +static void socket_collect_event(socket_event_collection_t *coll, const struct netdata_socket_event_t *ev) +{ + socket_entry_t sample = { }; + int created = 0; + socket_entry_t *entry; + + if (!coll || !ev) + return; + + socket_decode_key(&sample, (const uint8_t *)&ev->idx); + socket_aggregate_percpu(&sample, (const uint8_t *)&ev->data, sizeof(ev->data), 1); + + entry = socket_collection_get(coll, &sample, &created); + if (entry && !created) + socket_merge_entry(entry, &sample); +} + +static void socket_collect_arena_state(socket_event_collection_t *coll, const socket_arena_state_t *state) +{ + uint32_t head; + uint32_t start; + uint32_t i; + + if (!coll || !state) + return; + + head = state->head; + if (head == 0) + return; + + start = (head > SOCKET_ARENA_SLOT_COUNT) ? (head - SOCKET_ARENA_SLOT_COUNT) : 0; + for (i = start; i < head; i++) { + const struct netdata_socket_event_t *ev = &state->events[i % SOCKET_ARENA_SLOT_COUNT]; + + if (ev->data.first == 0 && ev->data.ct == 0 && ev->idx.pid == 0) + continue; + + socket_collect_event(coll, ev); + } +} + /* ------------------------------------------------------------------------- * Public interface. * -------------------------------------------------------------------------*/ @@ -445,6 +487,8 @@ void ebpf_socket_ringbuf_tester(struct bpf_map *map, FILE *out, int iterations) int collection_seconds = iterations * SOCKET_SLEEP_SEC; socket_event_collection_t coll = { }; struct ring_buffer *rb = NULL; + socket_arena_state_t *arena = NULL; + size_t arena_size = (size_t)sysconf(_SC_PAGESIZE) * SOCKET_ARENA_MAP_PAGES; uint32_t value_size = (uint32_t)sizeof(netdata_socket_t); fd = bpf_map__fd(map); @@ -467,7 +511,14 @@ void ebpf_socket_ringbuf_tester(struct bpf_map *map, FILE *out, int iterations) (unsigned)sizeof(netdata_socket_idx_t), value_size, type, fd, collection_seconds); - rb = ring_buffer__new(fd, socket_ringbuf_sample_cb, &coll, NULL); + if (type == BPF_MAP_TYPE_RINGBUF) { + rb = ring_buffer__new(fd, socket_ringbuf_sample_cb, &coll, NULL); + } else { + arena = mmap(NULL, arena_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (arena == MAP_FAILED) + arena = NULL; + } + if (rb) { int sec; @@ -477,6 +528,13 @@ void ebpf_socket_ringbuf_tester(struct bpf_map *map, FILE *out, int iterations) } ring_buffer__poll(rb, 0); + } else if (arena) { + int sec; + + for (sec = 0; sec < collection_seconds; sec++) + sleep(1); + + socket_collect_arena_state(&coll, arena); } if (coll.size > 0) { @@ -498,5 +556,7 @@ void ebpf_socket_ringbuf_tester(struct bpf_map *map, FILE *out, int iterations) if (rb) ring_buffer__free(rb); + if (arena) + munmap(arena, arena_size); socket_collection_free(&coll); } From 914263b1c3079ed72732356b6a4aed0475de5327 Mon Sep 17 00:00:00 2001 From: thiagoftsm Date: Tue, 5 May 2026 02:20:07 +0000 Subject: [PATCH 5/5] tests: Create compatibility between testers --- gotests/cgo_helpers.go | 27 +++++++++++++++++++ gotests/main.go | 16 +++++++----- gotests/socket.go | 2 +- gotests/socket_ringbuf.go | 21 +++++++-------- includes/netdata_arena_common.h | 2 +- tests/tester_socket.c | 46 ++++++++++++++++++++++++++------- tests/tester_user.c | 9 +++---- 7 files changed, 88 insertions(+), 35 deletions(-) diff --git a/gotests/cgo_helpers.go b/gotests/cgo_helpers.go index 86eabf2e..4b817648 100644 --- a/gotests/cgo_helpers.go +++ b/gotests/cgo_helpers.go @@ -55,6 +55,11 @@ static int netdata_libbpf_num_possible_cpus(void) return libbpf_num_possible_cpus(); } +static long netdata_sc_pagesize(void) +{ + return sysconf(_SC_PAGESIZE); +} + static int netdata_open_capture_socket(int program_fd) { struct sockaddr_ll bind_addr = { 0 }; @@ -274,6 +279,7 @@ const ( bpfMapTypePerCPUArray = uint32(C.BPF_MAP_TYPE_PERCPU_ARRAY) bpfMapTypeRingBuf = uint32(C.BPF_MAP_TYPE_RINGBUF) bpfMapTypeUserRingBuf = uint32(C.BPF_MAP_TYPE_USER_RINGBUF) + bpfMapTypeArena = uint32(C.BPF_MAP_TYPE_ARENA) ) type bpfObject struct { @@ -471,6 +477,27 @@ func (m *bpfMap) name() string { return C.GoString(C.bpf_map__name(m.ptr)) } +// initialValue returns a pointer to the arena data section for BPF_MAP_TYPE_ARENA maps. +// libbpf places the data section at the END of the arena mmap: +// base + (total_sz - roundup(data_sz, PAGE_SIZE)) +// The caller must NOT munmap this pointer; libbpf owns the mapping. +func (m *bpfMap) initialValue() unsafe.Pointer { + var size C.size_t + base := unsafe.Pointer(C.bpf_map__initial_value(m.ptr, &size)) + if base == nil || size == 0 { + return nil + } + + pageSize := uintptr(C.netdata_sc_pagesize()) + totalSz := uintptr(C.bpf_map__max_entries(m.ptr)) * pageSize + dataSz := uintptr(size) + roundedDataSz := (dataSz + pageSize - 1) &^ (pageSize - 1) + if roundedDataSz > totalSz { + return nil + } + return unsafe.Add(base, totalSz-roundedDataSz) +} + func probeMapTypeSupport(mapType uint32) int { return int(C.netdata_libbpf_probe_bpf_map_type(C.uint(mapType))) } diff --git a/gotests/main.go b/gotests/main.go index 48b772c8..3bd56737 100644 --- a/gotests/main.go +++ b/gotests/main.go @@ -839,6 +839,8 @@ func mapTypeName(mapType uint32) string { return "ringbuf" case bpfMapTypeUserRingBuf: return "user_ringbuf" + case bpfMapTypeArena: + return "arena" default: return fmt.Sprintf("type_%d", mapType) } @@ -1344,7 +1346,7 @@ func isPerCPUMapType(mapType uint32) bool { } func isRingBufferMapType(mapType uint32) bool { - return mapType == bpfMapTypeRingBuf || mapType == bpfMapTypeUserRingBuf + return mapType == bpfMapTypeRingBuf || mapType == bpfMapTypeUserRingBuf || mapType == bpfMapTypeArena } func isUserRingBufferMapType(mapType uint32) bool { @@ -1546,13 +1548,13 @@ func testMaps(w io.Writer, obj *bpfObject, ctrl string, iterations int, nprocess tables := 0 for m := obj.firstMap(); m != nil; m = obj.nextMap(m) { meta := m.meta() + if meta.Name == "socket_events" { + runSocketRingBufferTester(w, obj, iterations) + fmt.Fprint(w, " ]\n }\n },\n") + tables++ + continue + } if !supportsMapKeyValueIO(meta.Type) { - if meta.Name == "socket_events" { - runSocketRingBufferTester(w, obj, iterations) - fmt.Fprint(w, " ]\n }\n },\n") - tables++ - continue - } testRingBufferMap(w, meta, iterations) fmt.Fprint(w, " ]\n }\n },\n") tables++ diff --git a/gotests/socket.go b/gotests/socket.go index 95013f2c..4657b23e 100644 --- a/gotests/socket.go +++ b/gotests/socket.go @@ -303,7 +303,7 @@ func runSocketRingBufferTester(w io.Writer, obj *bpfObject, iterations int) { for sec := 0; sec < collectionSeconds; sec++ { time.Sleep(time.Second) } - if collectSocketArenaEntries(meta.FD, collector) == 0 { + if collectSocketArenaEntries(m.initialValue(), collector) == 0 { socketWriteCollectedEntries(w, collector) } } diff --git a/gotests/socket_ringbuf.go b/gotests/socket_ringbuf.go index 3b800ecc..e566c98c 100644 --- a/gotests/socket_ringbuf.go +++ b/gotests/socket_ringbuf.go @@ -43,7 +43,7 @@ const ( socketArenaMapPages = 256 socketArenaSlotCount = 1024 socketArenaSlotSize = socketEventKeySize + socketEventValueSize - socketArenaStateHeader = 4 + socketArenaStateHeader = 8 // 4-byte head + 4-byte pad before 8-byte-aligned events[] ) type socketRingbufCollector struct { @@ -123,23 +123,22 @@ func socketArenaSlotEmpty(raw []byte) bool { return true } -func collectSocketArenaEntries(mapFD int, collector *socketRingbufCollector) int { - pageSize := syscall.Getpagesize() - arenaSize := socketArenaMapPages * pageSize - mapped, err := syscall.Mmap(mapFD, 0, arenaSize, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED) - if err != nil { - if errno, ok := err.(syscall.Errno); ok { - return -int(errno) - } +// collectSocketArenaEntries reads socket events from a BPF arena using the +// libbpf-managed pointer returned by bpf_map__initial_value. +func collectSocketArenaEntries(arenaPtr unsafe.Pointer, collector *socketRingbufCollector) int { + if arenaPtr == nil { return -1 } - defer syscall.Munmap(mapped) + + pageSize := syscall.Getpagesize() + arenaSize := socketArenaMapPages * pageSize + mapped := unsafe.Slice((*byte)(arenaPtr), arenaSize) if len(mapped) < socketArenaStateHeader { return -1 } - head := binary.LittleEndian.Uint32(mapped[:socketArenaStateHeader]) + head := binary.LittleEndian.Uint32(mapped[:4]) if head == 0 { return 0 } diff --git a/includes/netdata_arena_common.h b/includes/netdata_arena_common.h index 6624fe65..3cde8376 100644 --- a/includes/netdata_arena_common.h +++ b/includes/netdata_arena_common.h @@ -5,7 +5,7 @@ #if defined(__BPF_FEATURE_ADDR_SPACE_CAST) #define __arena __attribute__((address_space(1))) -#define __arena_global __attribute__((address_space(1))) +#define __arena_global __attribute__((address_space(1))) SEC(".addr_space.1") #else #define __arena #define __arena_global SEC(".addr_space.1") diff --git a/tests/tester_socket.c b/tests/tester_socket.c index c6868e66..07dcc56e 100644 --- a/tests/tester_socket.c +++ b/tests/tester_socket.c @@ -488,6 +488,7 @@ void ebpf_socket_ringbuf_tester(struct bpf_map *map, FILE *out, int iterations) socket_event_collection_t coll = { }; struct ring_buffer *rb = NULL; socket_arena_state_t *arena = NULL; + int own_arena = 0; size_t arena_size = (size_t)sysconf(_SC_PAGESIZE) * SOCKET_ARENA_MAP_PAGES; uint32_t value_size = (uint32_t)sizeof(netdata_socket_t); @@ -514,9 +515,38 @@ void ebpf_socket_ringbuf_tester(struct bpf_map *map, FILE *out, int iterations) if (type == BPF_MAP_TYPE_RINGBUF) { rb = ring_buffer__new(fd, socket_ringbuf_sample_cb, &coll, NULL); } else { - arena = mmap(NULL, arena_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - if (arena == MAP_FAILED) - arena = NULL; +#ifdef LIBBPF_MAJOR_VERSION + { + /* libbpf places the data section at the END of the arena mapping: + * state_ptr = base + (total_sz - roundup(data_sz, PAGE_SIZE)) */ + size_t page_size = arena_size / SOCKET_ARENA_MAP_PAGES; + size_t isize = 0; + void *iptr = bpf_map__initial_value(map, &isize); + fprintf(stderr, "[socket_arena_diag] iptr=%p isize=%zu page_size=%zu arena_size=%zu\n", + iptr, isize, page_size, arena_size); + if (iptr && isize > 0) { + size_t rounded = (isize + page_size - 1) & ~(page_size - 1); + size_t data_off = arena_size - rounded; + arena = (socket_arena_state_t *)((char *)iptr + data_off); + fprintf(stderr, "[socket_arena_diag] isize>0 data_off=%zu arena_state=%p head=%u\n", + data_off, (void *)arena, arena->head); + } else if (iptr) { + /* addr_space_cast path: arena globals at offset 0 */ + arena = (socket_arena_state_t *)iptr; + fprintf(stderr, "[socket_arena_diag] isize=0 arena_state=%p head@0=%u\n", + (void *)arena, arena->head); + } + } + if (!arena) { +#endif + arena = (socket_arena_state_t *)mmap(NULL, arena_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (arena == MAP_FAILED) + arena = NULL; + else + own_arena = 1; +#ifdef LIBBPF_MAJOR_VERSION + } +#endif } if (rb) { @@ -534,7 +564,9 @@ void ebpf_socket_ringbuf_tester(struct bpf_map *map, FILE *out, int iterations) for (sec = 0; sec < collection_seconds; sec++) sleep(1); + fprintf(stderr, "[socket_arena_diag] arena->head=%u before collect\n", arena->head); socket_collect_arena_state(&coll, arena); + fprintf(stderr, "[socket_arena_diag] coll.size=%zu after collect\n", coll.size); } if (coll.size > 0) { @@ -548,15 +580,9 @@ void ebpf_socket_ringbuf_tester(struct bpf_map *map, FILE *out, int iterations) fprintf(out, "\n"); } - fprintf(out, - " ]\n" - " }\n" - " },\n" - " \"Total tables\" : 1\n"); - if (rb) ring_buffer__free(rb); - if (arena) + if (arena && own_arena) munmap(arena, arena_size); socket_collection_free(&coll); } diff --git a/tests/tester_user.c b/tests/tester_user.c index 90c00348..a0c13fe7 100644 --- a/tests/tester_user.c +++ b/tests/tester_user.c @@ -215,7 +215,7 @@ static int ebpf_map_is_percpu(uint32_t type) static int ebpf_map_is_ringbuf(uint32_t type) { - return type == BPF_MAP_TYPE_RINGBUF || type == BPF_MAP_TYPE_USER_RINGBUF; + return type == BPF_MAP_TYPE_RINGBUF || type == BPF_MAP_TYPE_USER_RINGBUF || type == BPF_MAP_TYPE_ARENA; } static int ebpf_map_is_user_ringbuf(uint32_t type) @@ -661,6 +661,8 @@ static const char *ebpf_map_type_name(int map_type) return "ringbuf"; case BPF_MAP_TYPE_USER_RINGBUF: return "user_ringbuf"; + case BPF_MAP_TYPE_ARENA: + return "arena"; default: return "unknown"; } @@ -1812,12 +1814,9 @@ static char *ebpf_tester(char *filename, ebpf_specify_name_t *names, uint32_t ma } if (!errors && maps) { - struct bpf_map *socket_events = ebpf_find_socket_events(obj); struct bpf_map *socket_table = ebpf_find_socket_table(obj); - if (socket_events) { - ebpf_socket_ringbuf_tester(socket_events, stdlog, end_iteration); - } else if (socket_table) { + if (socket_table) { ebpf_socket_table_tester(socket_table, stdlog, end_iteration); } else { if (ctrl) {