From 9a8a3a60485edcba59f9a21884c9e328ee6f5fac Mon Sep 17 00:00:00 2001 From: Philippe Leduc Date: Tue, 2 Jun 2026 14:46:37 +0200 Subject: [PATCH 1/3] Add SharedRegion::attach_create/attach_open for caller-provided memory backing, with validation against corrupt headers and a documented threat model Compose shm names via a platform-aware helper that hashes on macOS (PSHMNAMLEN) and throws past NAME_MAX on Linux --- .github/workflows/ci.yml | 9 +- examples/CMakeLists.txt | 2 + examples/hello_inject.cc | 74 ++++++++++ include/kickmsg/Naming.h | 12 ++ include/kickmsg/Region.h | 73 +++++++++- src/Naming.cc | 43 ++++++ src/Node.cc | 17 ++- src/Region.cc | 235 +++++++++++++++++++++++++++++-- src/Registry.cc | 5 +- tests/unit/naming-t.cc | 84 +++++++++++ tests/unit/region-t.cc | 295 +++++++++++++++++++++++++++++++++++++++ 11 files changed, 820 insertions(+), 29 deletions(-) create mode 100644 examples/hello_inject.cc diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b77e61b..5a8e317 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,6 +32,11 @@ jobs: label: benchmarks configure_args: "--with=benchmarks" + # macOS (Apple Silicon) + - os: macos-14 + label: macos-tests + configure_args: "--with=unit_tests --with=examples" + # Windows - os: windows-latest label: windows @@ -40,8 +45,8 @@ jobs: - uses: actions/checkout@v4 - uses: ./.github/actions/setup-conan - - name: Configure and build (Linux) - if: runner.os == 'Linux' + - name: Configure and build (POSIX) + if: runner.os != 'Windows' run: | scripts/configure.sh build ${{ matrix.configure_args }} scripts/setup_build.sh build --build-type ${{ env.BUILD_TYPE }} diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 4c0ef4f..331e241 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -2,6 +2,7 @@ add_executable(hello_pubsub hello_pubsub.cc) add_executable(hello_zerocopy hello_zerocopy.cc) add_executable(hello_broadcast hello_broadcast.cc) add_executable(hello_lowlevel hello_lowlevel.cc) +add_executable(hello_inject hello_inject.cc) add_executable(hello_diagnose hello_diagnose.cc) add_executable(hello_schema hello_schema.cc) add_executable(hello_schema_late_publisher hello_schema_late_publisher.cc) @@ -10,6 +11,7 @@ target_link_libraries(hello_pubsub PRIVATE kickmsg) target_link_libraries(hello_zerocopy PRIVATE kickmsg) target_link_libraries(hello_broadcast PRIVATE kickmsg) target_link_libraries(hello_lowlevel PRIVATE kickmsg) +target_link_libraries(hello_inject PRIVATE kickmsg) target_link_libraries(hello_diagnose PRIVATE kickmsg) target_link_libraries(hello_schema PRIVATE kickmsg) target_link_libraries(hello_schema_late_publisher PRIVATE kickmsg) diff --git a/examples/hello_inject.cc b/examples/hello_inject.cc new file mode 100644 index 0000000..1cb4d3e --- /dev/null +++ b/examples/hello_inject.cc @@ -0,0 +1,74 @@ +/// @file hello_inject.cc +/// @brief Back a kickmsg region with caller-provided memory. +/// +/// Demonstrates SharedRegion::attach_create / attach_open: the library +/// stamps the region into a buffer the caller already owns. Use this +/// when the memory comes from somewhere other than POSIX shm — a +/// different shared-memory provider, hugepages, a hardware-mapped +/// region, or (as here) an in-process aligned heap buffer. +/// +/// The caller owns the buffer's lifetime; unlink() is a no-op for +/// injected regions. + +#include +#include +#include +#include +#include + +#include +#include + +int main() +{ + kickmsg::channel::Config cfg; + cfg.max_subscribers = 2; + cfg.sub_ring_capacity = 8; + cfg.pool_size = 16; + cfg.max_payload_size = 128; + + std::size_t const size = kickmsg::SharedRegion::required_size(cfg, "inject_example"); + + void* raw = nullptr; + if (::posix_memalign(&raw, kickmsg::CACHE_LINE, size) != 0) + { + std::cerr << "posix_memalign failed\n"; + return 1; + } + std::unique_ptr buffer{raw, &::free}; + + auto region = kickmsg::SharedRegion::attach_create( + buffer.get(), size, kickmsg::channel::PubSub, cfg, + "inject_example", "demo-inject"); + + kickmsg::Subscriber sub(region); + kickmsg::Publisher pub(region); + + for (uint32_t i = 0; i < 5; ++i) + { + if (pub.send(&i, sizeof(i)) < 0) + { + std::cerr << "Failed to send message " << i << "\n"; + } + } + + while (auto sample = sub.try_receive()) + { + uint32_t value = 0; + std::memcpy(&value, sample->data(), sizeof(value)); + std::cout << "Received: " << value << "\n"; + } + + // A second attach to the SAME buffer (e.g. another component in the + // same process that was handed the address by the memory provider) + // can use attach_open to validate magic/version and read info(). + auto reader = kickmsg::SharedRegion::attach_open( + buffer.get(), size, "demo-inject-reader"); + auto info = reader.info(); + std::cout << "Reader sees creator='" << info.creator_name + << "', label='" << info.shm_name + << "', pool_size=" << info.pool_size << "\n"; + + std::cout << "Done.\n"; + return 0; +} diff --git a/include/kickmsg/Naming.h b/include/kickmsg/Naming.h index 5647bbf..aca6978 100644 --- a/include/kickmsg/Naming.h +++ b/include/kickmsg/Naming.h @@ -1,11 +1,16 @@ #ifndef KICKMSG_NAMING_H #define KICKMSG_NAMING_H +#include #include #include namespace kickmsg { + /// Lowercase 16-char hex representation of a 64-bit value, zero-padded. + std::string to_hex(uint64_t v); + + /// Sanitize a user-supplied name component (namespace / topic / channel / /// owner / tag) into something POSIX shm_open will accept. /// @@ -29,6 +34,13 @@ namespace kickmsg /// like "/prefix_" that silently collide across callers. \p what is /// interpolated into the exception message ("namespace", "topic", etc.). std::string sanitize_shm_component(std::string_view s, char const* what); + + /// Compose the final shm name from a sanitized namespace and suffix. + /// macOS: "/" + hex(fnv1a64(ns)) + hex(fnv1a64(suffix)), capped at + /// PSHMNAMLEN - 1. Linux: "/" + ns + "_" + suffix, throws + /// std::system_error(ENAMETOOLONG) past NAME_MAX. + std::string compose_shm_name(std::string_view sanitized_namespace, + std::string_view sanitized_suffix); } #endif diff --git a/include/kickmsg/Region.h b/include/kickmsg/Region.h index e58c91a..0ae907b 100644 --- a/include/kickmsg/Region.h +++ b/include/kickmsg/Region.h @@ -65,8 +65,36 @@ namespace kickmsg SharedRegion(SharedRegion const&) = delete; SharedRegion& operator=(SharedRegion const&) = delete; - SharedRegion(SharedRegion&&) noexcept = default; - SharedRegion& operator=(SharedRegion&&) noexcept = default; + + // Hand-written move ops so the moved-from object's base_/size_ + // are reset to a default-constructed state. A defaulted move + // would leave them aliasing the destination's live memory — + // base() on the moved-from object would silently return a + // dangling-looking-live pointer instead of nullptr. + SharedRegion(SharedRegion&& other) noexcept + : shm_{std::move(other.shm_)} + , name_{std::move(other.name_)} + , base_{other.base_} + , size_{other.size_} + { + other.base_ = nullptr; + other.size_ = 0; + } + + SharedRegion& operator=(SharedRegion&& other) noexcept + { + if (this != &other) + { + shm_ = std::move(other.shm_); + name_ = std::move(other.name_); + base_ = other.base_; + size_ = other.size_; + other.base_ = nullptr; + other.size_ = 0; + } + return *this; + } + ~SharedRegion() = default; static SharedRegion create(char const* name, channel::Type type, @@ -85,13 +113,44 @@ namespace kickmsg channel::Config const& cfg, char const* creator_name = ""); + /// Number of bytes the caller must provide to back a region with + /// this config and creator name. The address passed to + /// attach_create() must be at least CACHE_LINE aligned and span + /// at least this many bytes. + static std::size_t required_size(channel::Config const& cfg, + char const* creator_name = ""); + + /// Stamp a fresh region into caller-provided memory. The library + /// does not take ownership: the caller's buffer must outlive the + /// returned SharedRegion and any Publisher/Subscriber attached to + /// it. unlink() is a no-op on the returned region. `label`, if + /// non-empty, is surfaced via info().shm_name for logging. + /// + /// Throws if address is not CACHE_LINE aligned or size is less + /// than required_size(cfg, creator_name). + static SharedRegion attach_create(void* address, std::size_t size, + channel::Type type, + channel::Config const& cfg, + char const* creator_name = "", + char const* label = ""); + + /// Attach to caller-provided memory that already contains a valid + /// region (validates MAGIC + VERSION, and that size is at least + /// the embedded total_size). No ownership taken; unlink() is a + /// no-op. `label` is surfaced via info().shm_name for logging. + /// + /// Throws if address is not CACHE_LINE aligned, magic/version do + /// not match, or size is smaller than the embedded total_size. + static SharedRegion attach_open(void* address, std::size_t size, + char const* label = ""); + void unlink(); - void* base() { return shm_.address(); } - void const* base() const { return shm_.address(); } + void* base() { return base_; } + void const* base() const { return base_; } - Header* header() { return static_cast(shm_.address()); } - Header const* header() const { return static_cast
(shm_.address()); } + Header* header() { return static_cast(base_); } + Header const* header() const { return static_cast
(base_); } channel::Type channel_type() const { return header()->channel_type; } @@ -228,6 +287,8 @@ namespace kickmsg SharedMemory shm_; std::string name_; + void* base_{nullptr}; + std::size_t size_{0}; }; } diff --git a/src/Naming.cc b/src/Naming.cc index 2b91ac4..7be4dc0 100644 --- a/src/Naming.cc +++ b/src/Naming.cc @@ -1,11 +1,29 @@ #include "kickmsg/Naming.h" +#include "kickmsg/Hash.h" #include +#include +#include +#include +#include #include #include +#include + +#if defined(__APPLE__) || defined(__DARWIN__) + #include +#endif namespace kickmsg { + std::string to_hex(uint64_t v) + { + char buf[17]; + std::snprintf(buf, sizeof(buf), "%016llx", + static_cast(v)); + return std::string(buf, 16); + } + std::string sanitize_shm_component(std::string_view s, char const* what) { std::string out; @@ -41,4 +59,29 @@ namespace kickmsg } return out; } + + std::string compose_shm_name(std::string_view ns, std::string_view suffix) + { +#if defined(__APPLE__) || defined(__DARWIN__) + // PSHMNAMLEN = 31 incl. NUL: cap visible name at PSHMNAMLEN - 1 chars. + std::string out = "/"; + out += to_hex(hash::fnv1a_64(ns)); + out += to_hex(hash::fnv1a_64(suffix)); + out.resize(PSHMNAMLEN - 1); + return out; +#else + // shm_open accepts '/' + up to NAME_MAX bytes for the filename portion on tmpfs. + std::string out = "/"; + out += ns; + out += '_'; + out += suffix; + // -1 because / is not taken into account + if (out.size() - 1 > static_cast(NAME_MAX)) + { + throw std::system_error(ENAMETOOLONG, std::generic_category(), + "kickmsg::compose_shm_name: shm name exceeds NAME_MAX"); + } + return out; +#endif + } } diff --git a/src/Node.cc b/src/Node.cc index 91be54f..897c792 100644 --- a/src/Node.cc +++ b/src/Node.cc @@ -318,21 +318,24 @@ namespace kickmsg std::string Node::make_topic_name(char const* topic) const { // namespace_ is pre-sanitized in the ctor; topic is user-supplied on - // each call and may be a ROS-style "/a/b/c" path. - return "/" + namespace_ + "_" + sanitize_shm_component(topic, "topic"); + // each call and may be a ROS-style "/a/b/c" path. compose_shm_name + // handles the platform shm-name limit (hash on macOS, readable on + // Linux, throw on overflow). + return compose_shm_name(namespace_, + sanitize_shm_component(topic, "topic")); } std::string Node::make_broadcast_name(char const* channel) const { - return "/" + namespace_ + "_broadcast_" - + sanitize_shm_component(channel, "channel"); + return compose_shm_name(namespace_, + "broadcast_" + sanitize_shm_component(channel, "channel")); } std::string Node::make_mailbox_name(char const* owner, char const* tag) const { - return "/" + namespace_ + "_" - + sanitize_shm_component(owner, "mailbox owner") + "_mbx_" - + sanitize_shm_component(tag, "mailbox tag"); + return compose_shm_name(namespace_, + sanitize_shm_component(owner, "mailbox owner") + "_mbx_" + + sanitize_shm_component(tag, "mailbox tag")); } SharedRegion* Node::find_region(std::string const& shm_name) diff --git a/src/Region.cc b/src/Region.cc index 5e25e4c..ed8e01c 100644 --- a/src/Region.cc +++ b/src/Region.cc @@ -1,3 +1,4 @@ +#include #include #include #include @@ -127,6 +128,208 @@ namespace kickmsg h->magic.store(MAGIC, std::memory_order_release); } + namespace + { + /// Validate that an already-attached Header has internally + /// consistent geometry. + /// + /// Threat model. attach_open() lets the caller hand kickmsg + /// arbitrary bytes. validate_opened()'s magic/version/total_size + /// checks filter obvious garbage, but every other Header field + /// (offsets, strides, counts, lengths) is then used unchecked by + /// Publisher, Subscriber, info(), diagnose(), stats() and the + /// repair paths to compute pointers into the buffer. A buffer + /// whose first 24 bytes happen to satisfy the trio but whose + /// geometry fields are junk (corruption, crash residue, hostile + /// input, a region produced by a different kickmsg build) would + /// cause those callers to compute wild pointers — at best SEGV + /// on first publish, at worst silent OOB reads via info()'s + /// std::string::assign(tail, creator_name_len) or sub_ring_at() + /// stride math. + /// + /// This function applies the invariants compute_layout() and + /// stamp_new_region() always satisfy: a region kickmsg itself + /// stamps ALWAYS passes. Only corrupt or hostile input can fail + /// it. Cheap (a handful of integer compares, no syscalls), runs + /// once at attach, never on the hot path. + /// + /// shm-backed open() runs this too — defense in depth in case a + /// peer process writes garbage into the segment after stamping, + /// or a stale segment is reused across version bumps. + void validate_header_geometry(Header const* h) + { + // Pre: validate_opened() has already ensured + // - buffer_size >= sizeof(Header) + // - h->magic == MAGIC, h->version == VERSION + // - buffer_size >= h->total_size + // Every offset comparison below uses h->total_size as the + // trusted upper bound; we re-check it here in case total_size + // itself is junk smaller than the struct. + if (h->total_size < sizeof(Header)) + { + throw std::runtime_error( + "Header geometry: total_size smaller than Header"); + } + + // No zero counts or strides — divide-by-zero protection for + // the bound checks below depends on these, and stamp_new_region + // never produces a zero here. + if (h->max_subs == 0 or h->pool_size == 0 + or h->slot_data_size == 0 or h->sub_ring_capacity == 0 + or h->slot_stride == 0 or h->sub_ring_stride == 0) + { + throw std::runtime_error( + "Header geometry: zero-cardinality field"); + } + + if (not is_power_of_two(h->sub_ring_capacity)) + { + throw std::runtime_error( + "Header geometry: sub_ring_capacity not a power of 2"); + } + if (h->sub_ring_mask != h->sub_ring_capacity - 1) + { + throw std::runtime_error( + "Header geometry: sub_ring_mask inconsistent with capacity"); + } + + // creator_name tail bytes live at offset sizeof(Header). + if (h->creator_name_len > h->total_size - sizeof(Header)) + { + throw std::runtime_error( + "Header geometry: creator_name_len exceeds region"); + } + + // Sub-rings span [sub_rings_offset, pool_offset); pool spans + // [pool_offset, total_size). + if (h->sub_rings_offset < sizeof(Header) + or h->sub_rings_offset >= h->pool_offset + or h->pool_offset >= h->total_size) + { + throw std::runtime_error( + "Header geometry: ring/pool offsets out of range"); + } + + // Bound sub_ring_capacity by total_size before multiplying so + // the min_ring_stride product can't overflow on a junk value. + if (h->sub_ring_capacity > h->total_size / sizeof(Entry)) + { + throw std::runtime_error( + "Header geometry: sub_ring_capacity exceeds region"); + } + std::size_t const min_ring_stride = + sizeof(SubRingHeader) + h->sub_ring_capacity * sizeof(Entry); + if (h->sub_ring_stride < min_ring_stride) + { + throw std::runtime_error( + "Header geometry: sub_ring_stride too small"); + } + std::size_t const min_slot_stride = + sizeof(SlotHeader) + h->slot_data_size; + if (h->slot_stride < min_slot_stride) + { + throw std::runtime_error( + "Header geometry: slot_stride too small"); + } + + // max_subs * sub_ring_stride must fit in the rings region. + // Division-based bound avoids mul-overflow on a junk max_subs. + std::size_t const rings_space = h->pool_offset - h->sub_rings_offset; + if (h->max_subs > rings_space / h->sub_ring_stride) + { + throw std::runtime_error( + "Header geometry: subscriber rings overflow pool_offset"); + } + + // pool_size * slot_stride must fit in the pool region. + std::size_t const pool_space = h->total_size - h->pool_offset; + if (h->pool_size > pool_space / h->slot_stride) + { + throw std::runtime_error( + "Header geometry: slot pool overflow total_size"); + } + } + + // Validate an already-mapped region: throws on a buffer too small + // to even hold a Header, bad magic, bad version, buffer too small + // for the embedded total_size, or geometry fields that would make + // downstream pointer math wild. + void validate_opened(void* address, std::size_t size) + { + if (size < sizeof(Header)) + { + throw std::runtime_error( + "Buffer smaller than region Header"); + } + auto* h = static_cast(address); + if (h->magic.load(std::memory_order_acquire) != MAGIC) + { + throw std::runtime_error("Invalid shared memory (bad magic)"); + } + if (h->version != VERSION) + { + throw std::runtime_error("Version mismatch"); + } + if (size < h->total_size) + { + throw std::runtime_error( + "Buffer smaller than embedded region total_size"); + } + validate_header_geometry(h); + } + } + + std::size_t SharedRegion::required_size(channel::Config const& cfg, + char const* creator_name) + { + validate_config(channel::PubSub, cfg); + return compute_layout(cfg, creator_name).total_size; + } + + SharedRegion SharedRegion::attach_create(void* address, std::size_t size, + channel::Type type, + channel::Config const& cfg, + char const* creator_name, + char const* label) + { + validate_config(type, cfg); + if (reinterpret_cast(address) % CACHE_LINE != 0) + { + throw std::runtime_error("attach_create: address not CACHE_LINE aligned"); + } + RegionLayout layout = compute_layout(cfg, creator_name); + if (size < layout.total_size) + { + throw std::runtime_error("attach_create: buffer smaller than required_size"); + } + + SharedRegion region; + region.base_ = address; + region.size_ = size; + region.name_ = label; + region.stamp_new_region(type, cfg, creator_name, + layout.total_size, layout.sub_rings_offset, + layout.pool_offset, layout.ring_stride, + layout.slot_stride, layout.creator_len); + return region; + } + + SharedRegion SharedRegion::attach_open(void* address, std::size_t size, + char const* label) + { + if (reinterpret_cast(address) % CACHE_LINE != 0) + { + throw std::runtime_error("attach_open: address not CACHE_LINE aligned"); + } + + SharedRegion region; + region.base_ = address; + region.size_ = size; + region.name_ = label; + validate_opened(region.base_, region.size_); + return region; + } + SharedRegion SharedRegion::create(char const* name, channel::Type type, channel::Config const& cfg, char const* creator_name) @@ -137,6 +340,8 @@ namespace kickmsg SharedRegion region; region.name_ = name; region.shm_.create(name, layout.total_size); + region.base_ = region.shm_.address(); + region.size_ = layout.total_size; region.stamp_new_region(type, cfg, creator_name, layout.total_size, layout.sub_rings_offset, layout.pool_offset, layout.ring_stride, @@ -149,17 +354,9 @@ namespace kickmsg SharedRegion region; region.name_ = name; region.shm_.open(name); - - auto* h = region.header(); - if (h->magic.load(std::memory_order_acquire) != MAGIC) - { - throw std::runtime_error("Invalid shared memory (bad magic)"); - } - if (h->version != VERSION) - { - throw std::runtime_error("Version mismatch"); - } - + region.base_ = region.shm_.address(); + region.size_ = region.shm_.size(); + validate_opened(region.base_, region.size_); return region; } @@ -181,6 +378,8 @@ namespace kickmsg region.name_ = name; if (region.shm_.try_create(name, layout.total_size)) { + region.base_ = region.shm_.address(); + region.size_ = layout.total_size; region.stamp_new_region(type, cfg, creator_name, layout.total_size, layout.sub_rings_offset, layout.pool_offset, layout.ring_stride, @@ -208,6 +407,8 @@ namespace kickmsg SharedRegion region; region.name_ = name; region.shm_ = std::move(shm); + region.base_ = region.shm_.address(); + region.size_ = region.shm_.size(); return region; } // SHM exists but magic/version not ready yet — creator @@ -223,7 +424,17 @@ namespace kickmsg void SharedRegion::unlink() { - if (not name_.empty()) + // Release the OS-level name backing this region. Existing + // mappings — this process and every peer that already opened + // the region — keep working until their last reference drops; + // only the region's discoverability by name is affected. Any + // holder, creator or opener, may call this. Future open-by- + // name behaviour is OS-dependent and intentionally left to the + // backend. + // + // Skipped for injected regions (shm_ never opened): the caller + // owns the memory; kickmsg has no OS-level name to release. + if (shm_.is_open() and not name_.empty()) { SharedMemory::unlink(name_); } diff --git a/src/Registry.cc b/src/Registry.cc index 4425bf0..705dbf0 100644 --- a/src/Registry.cc +++ b/src/Registry.cc @@ -19,8 +19,9 @@ namespace kickmsg std::string Registry::make_shm_name(std::string const& kmsg_namespace) { - return "/" + sanitize_shm_component(kmsg_namespace, "namespace") - + "_registry"; + return compose_shm_name( + sanitize_shm_component(kmsg_namespace, "namespace"), + "registry"); } RegistryHeader* Registry::header() diff --git a/tests/unit/naming-t.cc b/tests/unit/naming-t.cc index aff92ba..5d18bb3 100644 --- a/tests/unit/naming-t.cc +++ b/tests/unit/naming-t.cc @@ -1,3 +1,4 @@ +#include #include #include @@ -5,6 +6,10 @@ #include "kickmsg/Naming.h" +#if defined(__APPLE__) || defined(__DARWIN__) + #include +#endif + using kickmsg::sanitize_shm_component; // --- Pass-through for already-valid inputs ------------------------------- @@ -117,3 +122,82 @@ TEST(SanitizeShmComponent, WhatIsIncludedInErrorMessage) EXPECT_NE(msg.find("namespace"), std::string::npos) << msg; } } + +// --- compose_shm_name ---------------------------------------------------- + +using kickmsg::compose_shm_name; + +TEST(ComposeShmName, FitsPlatformLimit) +{ + auto name = compose_shm_name("ns", "topic"); + ASSERT_FALSE(name.empty()); + EXPECT_EQ(name[0], '/'); +#if defined(__APPLE__) || defined(__DARWIN__) + EXPECT_LE(name.size(), static_cast(PSHMNAMLEN) - 1); +#else + EXPECT_LE(name.size() - 1, static_cast(NAME_MAX)); +#endif +} + +TEST(ComposeShmName, Deterministic) +{ + // Same inputs must always produce the same shm name — peers in + // different processes need to agree on the name. + auto a = compose_shm_name("demo", "temperature"); + auto b = compose_shm_name("demo", "temperature"); + EXPECT_EQ(a, b); +} + +TEST(ComposeShmName, DistinctNamespacesProduceDistinctNames) +{ + // Two namespaces with the same suffix must NOT collide — that would + // let a process in namespace "alpha" stomp on namespace "beta"'s + // region. + auto a = compose_shm_name("alpha", "temperature"); + auto b = compose_shm_name("beta", "temperature"); + EXPECT_NE(a, b); +} + +TEST(ComposeShmName, DistinctSuffixesProduceDistinctNames) +{ + auto a = compose_shm_name("demo", "temperature"); + auto b = compose_shm_name("demo", "pressure"); + EXPECT_NE(a, b); +} + +TEST(ComposeShmName, FitsEvenWhenInputsAreLong) +{ + std::string long_ns(80, 'a'); + std::string long_sx(120, 'b'); +#if defined(__APPLE__) || defined(__DARWIN__) + auto name = compose_shm_name(long_ns, long_sx); + EXPECT_LE(name.size(), static_cast(PSHMNAMLEN) - 1); +#else + EXPECT_NO_THROW(compose_shm_name(long_ns, long_sx)); +#endif +} + +#if !defined(__APPLE__) && !defined(__DARWIN__) +TEST(ComposeShmName, ThrowsOnLinuxWhenExceedingNameMax) +{ + // Linux: pre-composed name longer than NAME_MAX must throw clearly + // (system_error/ENAMETOOLONG) instead of silently truncating or + // failing inside shm_open with an opaque OS error. + std::string ns(200, 'n'); + std::string sx(200, 's'); + EXPECT_THROW(compose_shm_name(ns, sx), std::system_error); +} +#endif + +#if defined(__APPLE__) || defined(__DARWIN__) +TEST(ComposeShmName, HashFormOnMacOS) +{ + // On macOS the composed name is hash-only (no embedded plaintext). + // Pin the shape so a future refactor doesn't accidentally regress + // back to a readable form that overflows PSHMNAMLEN. + auto name = compose_shm_name("ns", "topic"); + EXPECT_EQ(name[0], '/'); + EXPECT_EQ(name.find("ns"), std::string::npos); + EXPECT_EQ(name.find("topic"), std::string::npos); +} +#endif diff --git a/tests/unit/region-t.cc b/tests/unit/region-t.cc index b065ba0..6ae7bde 100644 --- a/tests/unit/region-t.cc +++ b/tests/unit/region-t.cc @@ -1031,3 +1031,298 @@ TEST_F(RegionTest, StatsPoolFreeTracksAllocations) // One slot is popped from the free stack and not yet returned. EXPECT_EQ(s.pool_free, cfg.pool_size - 1); } + +// ----------------------------------------------------------------------------- +// attach_create / attach_open — caller-provided memory +// ----------------------------------------------------------------------------- + +class InjectedRegionTest : public ::testing::Test +{ +public: + kickmsg::channel::Config default_cfg() + { + kickmsg::channel::Config cfg; + cfg.max_subscribers = 2; + cfg.sub_ring_capacity = 8; + cfg.pool_size = 16; + cfg.max_payload_size = 64; + return cfg; + } + + // Aligned heap buffer sized to fit a region with `cfg`. + struct Buffer + { + std::unique_ptr mem{nullptr, &::free}; + std::size_t size{0}; + void* get() { return mem.get(); } + }; + + Buffer make_buffer(kickmsg::channel::Config const& cfg, char const* creator = "") + { + Buffer b; + b.size = kickmsg::SharedRegion::required_size(cfg, creator); + void* raw = nullptr; + EXPECT_EQ(::posix_memalign(&raw, kickmsg::CACHE_LINE, b.size), 0); + b.mem.reset(raw); + return b; + } +}; + +TEST_F(InjectedRegionTest, RequiredSizeMatchesShmBackedTotalSize) +{ + auto cfg = default_cfg(); + kickmsg::SharedMemory::unlink("/kickmsg_test_inject_size"); + auto shm = kickmsg::SharedRegion::create( + "/kickmsg_test_inject_size", kickmsg::channel::PubSub, cfg, "x"); + EXPECT_EQ(kickmsg::SharedRegion::required_size(cfg, "x"), + shm.header()->total_size); + shm.unlink(); +} + +TEST_F(InjectedRegionTest, AttachCreateRoundtrip) +{ + auto cfg = default_cfg(); + auto buf = make_buffer(cfg, "inject"); + + auto region = kickmsg::SharedRegion::attach_create( + buf.get(), buf.size, kickmsg::channel::PubSub, cfg, "inject", "label"); + + EXPECT_EQ(region.header()->magic, kickmsg::MAGIC); + EXPECT_EQ(region.header()->version, kickmsg::VERSION); + EXPECT_EQ(region.info().shm_name, "label"); + EXPECT_EQ(region.info().creator_name, "inject"); + + kickmsg::Subscriber sub(region); + kickmsg::Publisher pub(region); + + for (uint32_t i = 0; i < 5; ++i) + { + ASSERT_GE(pub.send(&i, sizeof(i)), 0); + } + + int received = 0; + while (auto s = sub.try_receive()) + { + uint32_t got = 0; + std::memcpy(&got, s->data(), sizeof(got)); + EXPECT_EQ(got, static_cast(received)); + ++received; + } + EXPECT_EQ(received, 5); +} + +TEST_F(InjectedRegionTest, AttachOpenSeesStampedRegion) +{ + auto cfg = default_cfg(); + auto buf = make_buffer(cfg, "creator"); + + { + auto creator = kickmsg::SharedRegion::attach_create( + buf.get(), buf.size, kickmsg::channel::PubSub, cfg, "creator"); + + kickmsg::Publisher pub(creator); + uint32_t val = 0xC0FFEE; + ASSERT_GE(pub.send(&val, sizeof(val)), 0); + } + + // A second handle attaches to the same buffer and validates. + auto reader = kickmsg::SharedRegion::attach_open(buf.get(), buf.size, "ro"); + EXPECT_EQ(reader.info().shm_name, "ro"); + EXPECT_EQ(reader.info().creator_name, "creator"); + EXPECT_EQ(reader.header()->pool_size, cfg.pool_size); +} + +TEST_F(InjectedRegionTest, AttachCreateRejectsMisalignedAddress) +{ + auto cfg = default_cfg(); + auto buf = make_buffer(cfg, "x"); + auto* bad = static_cast(buf.get()) + 1; // off by one — not aligned + + EXPECT_THROW( + kickmsg::SharedRegion::attach_create( + bad, buf.size - 1, kickmsg::channel::PubSub, cfg, "x"), + std::runtime_error); +} + +TEST_F(InjectedRegionTest, AttachCreateRejectsUndersizedBuffer) +{ + auto cfg = default_cfg(); + auto buf = make_buffer(cfg, "x"); + + EXPECT_THROW( + kickmsg::SharedRegion::attach_create( + buf.get(), buf.size - 1, kickmsg::channel::PubSub, cfg, "x"), + std::runtime_error); +} + +TEST_F(InjectedRegionTest, AttachOpenRejectsZeroedBuffer) +{ + auto cfg = default_cfg(); + auto buf = make_buffer(cfg, "x"); + std::memset(buf.get(), 0, buf.size); + + EXPECT_THROW( + kickmsg::SharedRegion::attach_open(buf.get(), buf.size), + std::runtime_error); +} + +TEST_F(InjectedRegionTest, UnlinkOnInjectedRegionIsNoOp) +{ + auto cfg = default_cfg(); + auto buf = make_buffer(cfg, "x"); + auto region = kickmsg::SharedRegion::attach_create( + buf.get(), buf.size, kickmsg::channel::PubSub, cfg, "x", "should-not-be-unlinked"); + + // Must not call shm_unlink on the label, which would fail if it tried — + // the label is not a path. Just checks that the call returns cleanly. + EXPECT_NO_THROW(region.unlink()); + // And the region remains usable after a no-op unlink. + EXPECT_EQ(region.header()->magic, kickmsg::MAGIC); +} + +TEST_F(InjectedRegionTest, AttachOpenRejectsBufferSmallerThanHeader) +{ + // A buffer smaller than sizeof(Header) must be rejected BEFORE any + // dereference of magic/version/total_size — otherwise the load is + // an out-of-bounds read on hostile or accidentally-small input. + alignas(kickmsg::CACHE_LINE) std::byte tiny[kickmsg::CACHE_LINE]{}; + static_assert(sizeof(tiny) < sizeof(kickmsg::Header)); + + EXPECT_THROW( + kickmsg::SharedRegion::attach_open(tiny, sizeof(tiny)), + std::runtime_error); +} + +TEST_F(InjectedRegionTest, MoveLeavesSourceWithNullBase) +{ + auto cfg = default_cfg(); + auto buf = make_buffer(cfg, "x"); + auto src = kickmsg::SharedRegion::attach_create( + buf.get(), buf.size, kickmsg::channel::PubSub, cfg, "x"); + + void* live_base = src.base(); + ASSERT_NE(live_base, nullptr); + + auto dst = std::move(src); + EXPECT_EQ(dst.base(), live_base); + // After move, the source must NOT still alias the destination's + // live memory — otherwise base()/header() on the moved-from object + // returns a dangling-looking-live pointer instead of nullptr. + EXPECT_EQ(src.base(), nullptr); +} + +TEST_F(InjectedRegionTest, MoveAssignLeavesSourceWithNullBase) +{ + auto cfg = default_cfg(); + auto buf = make_buffer(cfg, "x"); + auto src = kickmsg::SharedRegion::attach_create( + buf.get(), buf.size, kickmsg::channel::PubSub, cfg, "x"); + + void* live_base = src.base(); + ASSERT_NE(live_base, nullptr); + + kickmsg::SharedRegion dst; + dst = std::move(src); + EXPECT_EQ(dst.base(), live_base); + EXPECT_EQ(src.base(), nullptr); +} + +// Threat-model tests for validate_header_geometry: a kickmsg-stamped +// buffer always passes; deliberately corrupting any geometry field must +// fail attach_open with a runtime_error, never let downstream code +// compute wild pointers. +class CorruptedHeaderTest : public InjectedRegionTest +{ +public: + // Make a valid stamped buffer the test can then deface. + Buffer make_stamped(kickmsg::channel::Config const& cfg) + { + auto buf = make_buffer(cfg, "x"); + auto r = kickmsg::SharedRegion::attach_create( + buf.get(), buf.size, kickmsg::channel::PubSub, cfg, "x"); + (void)r; // RAII drops; the bytes in `buf` stay stamped + return buf; + } + + kickmsg::Header* hdr(Buffer& b) { return static_cast(b.get()); } +}; + +TEST_F(CorruptedHeaderTest, RejectsZeroMaxSubs) +{ + auto buf = make_stamped(default_cfg()); + hdr(buf)->max_subs = 0; + EXPECT_THROW(kickmsg::SharedRegion::attach_open(buf.get(), buf.size), + std::runtime_error); +} + +TEST_F(CorruptedHeaderTest, RejectsNonPowerOfTwoRingCapacity) +{ + auto buf = make_stamped(default_cfg()); + hdr(buf)->sub_ring_capacity = 7; + EXPECT_THROW(kickmsg::SharedRegion::attach_open(buf.get(), buf.size), + std::runtime_error); +} + +TEST_F(CorruptedHeaderTest, RejectsInconsistentRingMask) +{ + auto buf = make_stamped(default_cfg()); + hdr(buf)->sub_ring_mask = 3; // capacity is 8, mask should be 7 + EXPECT_THROW(kickmsg::SharedRegion::attach_open(buf.get(), buf.size), + std::runtime_error); +} + +TEST_F(CorruptedHeaderTest, RejectsCreatorNameLenOverflow) +{ + auto buf = make_stamped(default_cfg()); + hdr(buf)->creator_name_len = UINT16_MAX; + EXPECT_THROW(kickmsg::SharedRegion::attach_open(buf.get(), buf.size), + std::runtime_error); +} + +TEST_F(CorruptedHeaderTest, RejectsPoolOffsetPastTotalSize) +{ + auto buf = make_stamped(default_cfg()); + hdr(buf)->pool_offset = UINT64_MAX; + EXPECT_THROW(kickmsg::SharedRegion::attach_open(buf.get(), buf.size), + std::runtime_error); +} + +TEST_F(CorruptedHeaderTest, RejectsTotalSizeSmallerThanHeader) +{ + auto buf = make_stamped(default_cfg()); + hdr(buf)->total_size = sizeof(kickmsg::Header) - 1; + EXPECT_THROW(kickmsg::SharedRegion::attach_open(buf.get(), buf.size), + std::runtime_error); +} + +TEST_F(CorruptedHeaderTest, RejectsRingsOverflowingPoolOffset) +{ + auto buf = make_stamped(default_cfg()); + // max_subs * sub_ring_stride must fit in [sub_rings_offset, pool_offset). + hdr(buf)->max_subs = UINT64_MAX; + EXPECT_THROW(kickmsg::SharedRegion::attach_open(buf.get(), buf.size), + std::runtime_error); +} + +TEST_F(CorruptedHeaderTest, RejectsPoolOverflowingTotalSize) +{ + auto buf = make_stamped(default_cfg()); + hdr(buf)->pool_size = UINT64_MAX; + EXPECT_THROW(kickmsg::SharedRegion::attach_open(buf.get(), buf.size), + std::runtime_error); +} + +TEST_F(CorruptedHeaderTest, RejectsTinySlotStride) +{ + auto buf = make_stamped(default_cfg()); + hdr(buf)->slot_stride = 1; // smaller than sizeof(SlotHeader) + EXPECT_THROW(kickmsg::SharedRegion::attach_open(buf.get(), buf.size), + std::runtime_error); +} + +TEST_F(CorruptedHeaderTest, StampedBufferStillValidates) +{ + auto buf = make_stamped(default_cfg()); + // Sanity: an unmodified stamped buffer must pass. + EXPECT_NO_THROW(kickmsg::SharedRegion::attach_open(buf.get(), buf.size)); +} From 1f752a662480f5ef5e52127221630b1d28eb1f53 Mon Sep 17 00:00:00 2001 From: Philippe Leduc Date: Tue, 2 Jun 2026 15:07:37 +0200 Subject: [PATCH 2/3] Cleanup headers --- AGENTS.md | 147 +++++++++++++++++ ARCHITECTURE.md | 97 ++++++++--- examples/hello_broadcast.cc | 4 +- examples/hello_diagnose.cc | 2 - examples/hello_inject.cc | 45 +++++- examples/hello_lowlevel.cc | 2 - examples/hello_pubsub.cc | 3 - examples/hello_schema.cc | 2 - examples/hello_schema_late_publisher.cc | 5 - examples/hello_zerocopy.cc | 2 - include/kickmsg/Naming.h | 13 +- include/kickmsg/Publisher.h | 1 - include/kickmsg/Region.h | 39 +++-- include/kickmsg/Subscriber.h | 3 - include/kickmsg/types.h | 13 +- scripts/validate.sh | 18 ++- src/Naming.cc | 23 ++- src/Node.cc | 22 +++ src/Publisher.cc | 18 ++- src/Region.cc | 178 +++++++++++++++----- src/Registry.cc | 12 +- src/Subscriber.cc | 62 ++++++- src/os/darwin/Futex.cc | 1 - src/os/darwin/SharedMemory.cc | 2 - src/os/darwin/Time.cc | 1 - src/os/linux/SharedMemory.cc | 1 - src/os/linux/Time.cc | 1 - src/os/posix/SharedMemory.cc | 10 +- src/os/posix/Time.cc | 1 - src/os/windows/SharedMemory.cc | 2 - tests/CMakeLists.txt | 10 ++ tests/crash_test.cc | 3 - tests/endurance.sh | 8 +- tests/registry_stress_test.cc | 4 - tests/stress/main.cc | 9 +- tests/unit/naming-t.cc | 16 +- tests/unit/node-t.cc | 37 ++++- tests/unit/publisher-t.cc | 1 - tests/unit/region-t.cc | 205 +++++++++++++++++++++++- tests/unit/registry-t.cc | 72 +++++++-- tests/unit/subscriber-t.cc | 16 +- tests/unit/types-t.cc | 1 - 42 files changed, 931 insertions(+), 181 deletions(-) create mode 100644 AGENTS.md diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..ef9907e --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,147 @@ +# Agent guide -- kickmsg + +Project-wide directives for any LLM working in this repository. Read this +before proposing changes. These override generic defaults. kickmsg is a +lock-free, shared-memory pub/sub library for inter-process messaging on +POSIX and Windows; correctness under concurrency, crashes, and corrupt +peer memory is the whole point. + +## 1. Don't be lazy + +If a fix takes 2--3 lines and visibly improves the code, write the 2--3 lines. +Do not dismiss real concerns with "premature optimization", "good enough", +"out of scope", or "we can revisit later" when the work in front of you is +small and the benefit is concrete. If a reviewer points at a problem, they +already paid the cost of finding it -- apply the fix instead of arguing why it +doesn't matter. When the right fix is genuinely larger than the local change, +do the local fix *and* call out the broader work as a follow-up. + +## 2. Keep It Simple, Stupid (KISS) + +Pick the simplest design that solves the problem. Avoid helpers called from +exactly one place (inline them), mirrors of state another component owns +(query the authority), dead defensive checks on values a constructor already +guarantees, and speculative abstractions with no concrete consumer. Three +similar lines beat a premature abstraction; factor only when the same shape +repeats 3+ times under the same constraints. + +## 3. Maintainability first, performance second + +Maintainable code that is also performant beats clever code that is hard to +follow -- but "maintainable first" does not mean "ignore performance". The +publish and receive paths are hot: per-publish heap allocations, avoidable +atomics, false sharing, and syscalls on the fast path are real costs. When the +maintainable design and the performant one diverge by a few lines (a scratch +buffer, a conditional wake), apply both. + +Order of operations: (1) correct, (2) clear, (3) fast on hot paths. + +## 4. Don't reinvent the wheel + +Grep before writing new infrastructure. What already exists: + +- Shared memory: `kickmsg/os/SharedMemory.h` (create/open/try_*/unlink). +- Region + channel: `kickmsg/Region.h` (`SharedRegion`, `channel::Config`, + recovery primitives, `validate_header_geometry`). +- Hashing: `kickmsg/Hash.h` -- `kickmsg::hash::fnv1a_64`. Don't add a new hash. +- Shm naming: `kickmsg/Naming.h` -- `sanitize_shm_component`, + `compose_shm_name`, `to_hex`. All shm names go through these. +- Lock-free helpers: `kickmsg/types.h` -- `treiber_push`/`treiber_pop`, + `tagged_pack`/`tagged_idx`, `slot_at`/`sub_ring_at`/`ring_entries`, + `align_up`, `is_power_of_two`. +- OS: `kickmsg/os/Time.h` (`since_epoch`, `sleep`), `kickmsg/os/Process.h` + (`current_pid`, `process_starttime`), `kickmsg/os/Futex.h` + (`futex_wait`/`futex_wake_all`). +- Tests: GoogleTest + gmock unit tests (`tests/unit/`), the stress harness + (`tests/stress/common.h`), fork+SIGKILL crash tests (`tests/crash_test.cc`). + +"I didn't know it existed" is not an answer -- grep, find, or Explore first. + +## 5. Coding style + +There is no `.clang-format` yet; the rules below describe the established +style. Match the surrounding file. + +### 5.1 Formatting + +- **Allman braces** -- opening brace on its own line for every block. +- **4-space indent, no tabs.** Case labels indented inside `switch`. +- **East const** -- `T const&`, `T const*`, `Header const*`. Not `const T&`. +- **Pointer alignment left** -- `T* p`, not `T *p`. +- **Member naming** -- `snake_case` with a trailing underscore for non-public + data members (`shm_`, `name_`, `base_`). Public/struct fields: no underscore. +- **Type naming** -- `PascalCase` for classes, structs, enums, aliases. +- **Namespace** -- single `namespace kickmsg`, contents indented, no closing + `// namespace` comment. +- **Constructor initializers** -- one per line, `,` leading the next line. + +### 5.2 Language rules + +- **Header guards, never `#pragma once`.** Format `KICKMSG__H` (e.g. + `kickmsg/os/SharedMemory.h` -> `KICKMSG_OS_SHARED_MEMORY_H`). Mirror an + adjacent header. +- **No ternary operator** in code you add or modify. Rewrite as `if`/`else`, + early return, or an `if`-assigned variable. Leave pre-existing ternaries + unless explicitly cleaning them up. +- **Prefer `not` / `and` / `or`** over `!` / `&&` / `||` -- the codebase uses + the keyword operators throughout. +- **`override` mandatory** on overriding virtuals. +- **`inline` only with a real rationale** (one-line accessors, `constexpr`, + templates, header-only by design) -- not merely to dodge a `.cc` file. + +### 5.3 Concurrency and shared memory (kickmsg-specific) + +- **Explicit `memory_order` on every atomic op.** No bare `.load()`/`.store()`. + When an op is `relaxed`, a one-line comment must say why it is safe (which + other fence or release-store covers it). The release-store of `MAGIC` is the + sole publication fence for a freshly-stamped region -- do not "fix" a + deliberate relaxed store to release in isolation. +- **Tolerate corrupt / hostile peer bytes.** The region is mapped by + independent processes and can be handed to `attach_open` by a caller. Any + field read from shared memory that drives pointer math (offsets, strides, + indices, lengths, counts) must be bounds-checked before use -- see + `validate_header_geometry`. A crashed peer can leave partial state. +- **Preserve the recovery contract.** `repair_locked_entries` and `stats` / + `diagnose` are safe under live traffic; `reset_retired_rings`, + `reclaim_orphaned_slots`, `reset_schema_claim`, `sweep_stale` are post-crash + only. Keep that distinction in the code and the docs; don't add a + live-traffic caller of a post-crash primitive. +- **ABI is versioned.** The shm layout (`Header`, `Entry`, `SubRingHeader`, + `SlotHeader`, `SchemaInfo`) is guarded by `MAGIC` + `VERSION`. Any layout + change requires a `VERSION` bump and updating the `static_assert`s in + `types.h`. + +### 5.4 Comments + +Default to writing nothing. Only add a comment when: + +- A hidden constraint or invariant is being maintained. +- The code works around a specific bug or platform quirk. +- A reader would otherwise be surprised by the behavior (especially a memory + ordering or crash-recovery subtlety -- those are worth a tight line). + +Never restate what the code does. Never reference the current PR / issue / +"this fixes...". One short line beats a paragraph; reserve multi-line blocks +for an ASCII diagram or numbered list that genuinely helps (e.g. the publish +flow in ARCHITECTURE.md). + +### 5.5 Includes + +- **Library headers**: IWYU strictly -- every type named in the public API + comes from an include in that header. +- **`.cc` files**: group standard headers and internal headers, separated by a + blank line. Match the file's existing grouping. +- **Tests, examples, callers**: rely on transitive includes from the public + headers you use. Don't re-include ``, ``, ``, + ``, etc. when they arrive via `kickmsg/Publisher.h`, + `kickmsg/Region.h`, `kickmsg/types.h`, and friends. + +### 5.6 Other defaults + +- **ASCII only in source** (code, comments, identifiers, docs): `--` for an + em-dash, `'` for an apostrophe, `...` for an ellipsis, plain ASCII quotes. + Exception: user-facing strings that genuinely need otherwise. Legacy files + predating this rule still contain em-dashes; leave them unless you are + already editing that line, and write new comments ASCII. +- **No emojis** in code, comments, commit messages, or docs unless asked. +- **No new docs files** (`.md`, `README`) unless the user asks. diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 83fe791..815c5dd 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -289,6 +289,18 @@ free_top [gen:17 | idx:3] - **Push** (release a slot): CAS `free_top` from `[gen|head]` to `[gen+1|slot]` after setting `slot.next = head`. +**Accepted ABA bound.** The generation is 32-bit, so the ABA window is not +formally closed: it requires a thread to read `free_top`, stall, and then +have exactly 2^32 push/pop operations complete (returning the head to the +same `[gen|idx]`) before it resumes its CAS. At realistic pool churn that is +on the order of tens of seconds of uninterrupted contention landing inside a +single preemption of one thread -- astronomically unlikely, and no real +workload approaches it. We accept this rather than pay for a 128-bit DWCAS +(`cmpxchg16b` / `LDXP`-`STXP`), which is not lock-free in the `std::atomic` +sense on every target and costs on the hot alloc/free path. If a future +target makes the window reachable, widen `free_top` to a 128-bit tagged +pointer (a `VERSION` bump). + ## Subscriber Ring @@ -510,18 +522,18 @@ Publisher | in_flight == 0. | '-- if has_waiter: Conditional wake: skip the syscall - futex_wake_all(write_pos) when no subscriber is blocking. - has_waiter uses relaxed ordering on - both sides (publisher load, subscriber - store). This can race: the publisher - may not see has_waiter=1 if the - subscriber just set it. But the race - is benign — futex_wait(write_pos, cur) - checks *addr != expected atomically - in the kernel; if write_pos already - advanced, it returns immediately. - Worst case: one unnecessary round-trip - through futex (not a lost wake). + futex_wake_all(write_pos) when no subscriber is blocking. A + seq_cst fence precedes the has_waiter + load and pairs with one before the + subscriber's futex_wait, ordering the + write_pos fetch_add ahead of the load. + This is a Dekker StoreLoad pair: with + relaxed ordering alone a weakly-ordered + CPU could read has_waiter == 0 stale + and drop the wake to an already-parked + subscriber (lost wakeup until timeout). + x86's locked RMW fences implicitly; + ARM does not, hence the explicit fence. 5. Batch excess: fetch_sub(excess) on slot refcount. One atomic RMW for all non-delivered rings, instead of N @@ -1000,12 +1012,17 @@ auto report = region.diagnose(); // report.locked_entries: entries stuck at LOCKED_SEQUENCE // report.retired_rings: Free rings with stale in_flight > 0 // report.draining_rings: Draining rings with in_flight > 0 (usually transient) +// report.dead_rings: Live/Draining rings whose owner process is gone // report.live_rings: active subscriber rings // Safe under live traffic — repairs poisoned ring entries. // Can be called freely on a health-check timer. std::size_t repaired = region.repair_locked_entries(); +// Reclaims rings whose owning subscriber process has died. Safe under +// live traffic: only provably-dead owners (pid + start time) are touched. +std::size_t dead = region.reclaim_dead_rings(); + // Resets retired rings (Free | in_flight>0) so new subscribers can // claim them. Only safe after confirming the crashed publisher is gone. // Deliberate post-crash action, not a routine maintenance call. @@ -1023,10 +1040,21 @@ periodically; persistent nonzero counts signal recovery is needed. `INVALID_SLOT`. Safe under live traffic (benign double-store if a slow publisher commits at the same time). Can run on a timer. +**`reclaim_dead_rings()`** — reclaims `Live`/`Draining` rings whose +owning subscriber process has died, identified by the `owner_pid` + +`owner_starttime` recorded on the ring at claim time and checked +against the OS (same liveness test as `Registry::sweep_stale`). Safe +under live traffic: a slow-but-alive subscriber is never touched. +`in_flight` is preserved (a mid-commit publisher must still +`fetch_sub`), so a reclaimed ring with leftover `in_flight` lands in +the retired state for `reset_retired_rings()` to finish. + **`reset_retired_rings()`** — resets stuck rings (`Free | in_flight>0` → `Free | in_flight=0`). Only safe after confirming the crashed publisher is gone. Unlike `repair_locked_entries()`, this is a -deliberate post-crash action. +deliberate post-crash action. For a dead *subscriber* prefer +`reclaim_dead_rings()`, which is liveness-checked and cannot underflow +`in_flight` against a slow publisher. **`reclaim_orphaned_slots()`** — walks all rings to build a referenced-slot set, then frees any unreferenced slot with @@ -1038,14 +1066,45 @@ quiesced and no outstanding `SampleView` objects. ``` 1. diagnose() → persistent nonzero counts (check twice, gap > commit_timeout) 2. repair_locked_entries() — safe under live traffic -3. reset_retired_rings() — after confirming crashed publisher is gone -4. (optional) pause all publishers -5. reclaim_orphaned_slots() — requires quiescence -6. resume publishers +3. reclaim_dead_rings() — safe under live traffic (dead subscribers) +4. reset_retired_rings() — after confirming crashed publisher is gone +5. (optional) pause all publishers +6. reclaim_orphaned_slots() — requires quiescence +7. resume publishers ``` -Steps 4–6 are only needed if the pool is exhausted from leaked slots. -In most cases, steps 2–3 restore the channel to full operation. +Steps 5–7 are only needed if the pool is exhausted from leaked slots. +In most cases, steps 2–4 restore the channel to full operation. + +### Recovery limits and residual windows + +The recovery primitives close the common crash classes but have a few +documented edges, all bounded: + +- **`reclaim_orphaned_slots()` is not crash-safe mid-pass.** It resets a + slot's refcount to 0 and pushes it to the free stack as two separate + stores. If the *recovering* process is killed between them, the slot is + neither referenced nor free-listed (refcount 0), and a re-run skips it + (the `refcount > 0` guard no longer matches) — a permanent leak of that + one slot. Run recovery from a supervisor that is not itself under the + OOM/kill pressure that triggered recovery. + +- **`reset_retired_rings()` trusts the operator.** It zeroes `in_flight`; + if called while a misclassified slow-but-alive publisher still owes a + `fetch_sub`, that decrement underflows the packed counter into the state + bits. It is a post-crash-only action. For a dead *subscriber*, + `reclaim_dead_rings()` is the liveness-checked alternative and avoids + this entirely. + +- **Claim-window residual (rings and registry).** Liveness recovery keys on + an owner identity (`owner_pid` for rings, `pid` for registry entries) + recorded *after* the claiming CAS. A process that crashes in the few + instructions between winning the CAS and recording its pid leaves the + slot owned by pid 0 — unattributable, so neither `reclaim_dead_rings()` + nor `sweep_stale()` reclaims it. The window is a handful of instructions + and closing it fully would need a separate intent log; in practice the + slot is lost only on a crash landing in that exact gap, and the pool / + registry tolerate many such losses before exhaustion. ## ABA Safety diff --git a/examples/hello_broadcast.cc b/examples/hello_broadcast.cc index 04eda6d..3c0fb47 100644 --- a/examples/hello_broadcast.cc +++ b/examples/hello_broadcast.cc @@ -7,13 +7,11 @@ /// - Multiple concurrent participants on the same channel /// - Each node sees messages from all other nodes (not its own) -#include #include -#include #include -#include #include +#include using namespace kickmsg; diff --git a/examples/hello_diagnose.cc b/examples/hello_diagnose.cc index 9adc1c9..31c4889 100644 --- a/examples/hello_diagnose.cc +++ b/examples/hello_diagnose.cc @@ -8,8 +8,6 @@ /// 4. Repair with repair_locked_entries() /// 5. Verify the channel is fully operational again -#include -#include #include #include diff --git a/examples/hello_inject.cc b/examples/hello_inject.cc index 1cb4d3e..89df105 100644 --- a/examples/hello_inject.cc +++ b/examples/hello_inject.cc @@ -10,15 +10,45 @@ /// The caller owns the buffer's lifetime; unlink() is a no-op for /// injected regions. -#include #include -#include #include #include #include #include +#if defined(_WIN32) + #include // _aligned_malloc / _aligned_free +#endif + +namespace +{ + // posix_memalign is POSIX-only; Windows uses _aligned_malloc, whose + // memory MUST be released with _aligned_free (not free()). + void* aligned_alloc_compat(std::size_t align, std::size_t size) + { +#if defined(_WIN32) + return _aligned_malloc(size, align); +#else + void* p = nullptr; + if (::posix_memalign(&p, align, size) != 0) + { + return nullptr; + } + return p; +#endif + } + + void aligned_free_compat(void* p) + { +#if defined(_WIN32) + _aligned_free(p); +#else + ::free(p); +#endif + } +} + int main() { kickmsg::channel::Config cfg; @@ -29,13 +59,13 @@ int main() std::size_t const size = kickmsg::SharedRegion::required_size(cfg, "inject_example"); - void* raw = nullptr; - if (::posix_memalign(&raw, kickmsg::CACHE_LINE, size) != 0) + void* raw = aligned_alloc_compat(kickmsg::CACHE_LINE, size); + if (raw == nullptr) { - std::cerr << "posix_memalign failed\n"; + std::cerr << "aligned allocation failed\n"; return 1; } - std::unique_ptr buffer{raw, &::free}; + std::unique_ptr buffer{raw, &aligned_free_compat}; auto region = kickmsg::SharedRegion::attach_create( buffer.get(), size, kickmsg::channel::PubSub, cfg, @@ -62,8 +92,7 @@ int main() // A second attach to the SAME buffer (e.g. another component in the // same process that was handed the address by the memory provider) // can use attach_open to validate magic/version and read info(). - auto reader = kickmsg::SharedRegion::attach_open( - buffer.get(), size, "demo-inject-reader"); + auto reader = kickmsg::SharedRegion::attach_open(buffer.get(), size, "demo-inject-reader"); auto info = reader.info(); std::cout << "Reader sees creator='" << info.creator_name << "', label='" << info.shm_name diff --git a/examples/hello_lowlevel.cc b/examples/hello_lowlevel.cc index 422d58d..c154964 100644 --- a/examples/hello_lowlevel.cc +++ b/examples/hello_lowlevel.cc @@ -10,8 +10,6 @@ /// (custom naming, config tuning, multi-process open/create patterns). /// For most use cases, prefer the Node API (see hello_pubsub.cc). -#include -#include #include #include diff --git a/examples/hello_pubsub.cc b/examples/hello_pubsub.cc index 361810d..d6c581e 100644 --- a/examples/hello_pubsub.cc +++ b/examples/hello_pubsub.cc @@ -8,10 +8,7 @@ /// Single-process for simplicity; in production, each node lives in /// its own process sharing the same namespace. -#include -#include #include -#include #include diff --git a/examples/hello_schema.cc b/examples/hello_schema.cc index ca1f5e0..6619a49 100644 --- a/examples/hello_schema.cc +++ b/examples/hello_schema.cc @@ -24,9 +24,7 @@ /// check works the same way across process boundaries because the /// descriptor lives in shared memory. -#include #include -#include #include #include diff --git a/examples/hello_schema_late_publisher.cc b/examples/hello_schema_late_publisher.cc index 01281cb..ce1cc80 100644 --- a/examples/hello_schema_late_publisher.cc +++ b/examples/hello_schema_late_publisher.cc @@ -24,13 +24,8 @@ /// is entirely policy. Replace it with whatever fits the deployment /// (timeout, exponential backoff, futex, etc). -#include -#include -#include #include -#include #include -#include #include #include diff --git a/examples/hello_zerocopy.cc b/examples/hello_zerocopy.cc index 5d144d0..71b5aa2 100644 --- a/examples/hello_zerocopy.cc +++ b/examples/hello_zerocopy.cc @@ -6,8 +6,6 @@ /// is alive, preventing reuse. Ideal for large payloads (camera frames, /// point clouds) where memcpy overhead matters. -#include -#include #include #include diff --git a/include/kickmsg/Naming.h b/include/kickmsg/Naming.h index aca6978..36596f5 100644 --- a/include/kickmsg/Naming.h +++ b/include/kickmsg/Naming.h @@ -37,8 +37,17 @@ namespace kickmsg /// Compose the final shm name from a sanitized namespace and suffix. /// macOS: "/" + hex(fnv1a64(ns)) + hex(fnv1a64(suffix)), capped at - /// PSHMNAMLEN - 1. Linux: "/" + ns + "_" + suffix, throws - /// std::system_error(ENAMETOOLONG) past NAME_MAX. + /// PSHMNAMLEN - 1. Linux / Windows: readable "/" + ns + "_" + suffix, + /// throws std::system_error(ENAMETOOLONG) past the platform limit + /// (Linux NAME_MAX, Windows MAX_PATH). + /// + /// macOS caveat: PSHMNAMLEN (31) leaves no room for a readable name, so + /// the result is a hash and the suffix hash is truncated to fit. Two + /// distinct (namespace, suffix) pairs can therefore collide onto the + /// same shm object — distinct topics would then share one region. + /// Linux names are exact and never collide. Collisions are astronomically + /// unlikely but not impossible; if it matters, keep names short enough to + /// stay readable (Linux) or verify topology out of band. std::string compose_shm_name(std::string_view sanitized_namespace, std::string_view sanitized_suffix); } diff --git a/include/kickmsg/Publisher.h b/include/kickmsg/Publisher.h index c57455a..9556e9d 100644 --- a/include/kickmsg/Publisher.h +++ b/include/kickmsg/Publisher.h @@ -3,7 +3,6 @@ #include "kickmsg/types.h" #include "kickmsg/Region.h" -#include "kickmsg/os/Futex.h" namespace kickmsg { diff --git a/include/kickmsg/Region.h b/include/kickmsg/Region.h index 0ae907b..06abd2b 100644 --- a/include/kickmsg/Region.h +++ b/include/kickmsg/Region.h @@ -1,8 +1,6 @@ #ifndef KICKMSG_REGION_H #define KICKMSG_REGION_H -#include -#include #include #include "kickmsg/types.h" @@ -197,8 +195,11 @@ namespace kickmsg /// action, not a routine maintenance call. bool reset_schema_claim(); - /// Lightweight read-only health check. Safe under live traffic. - /// Counts locked entries and ring states without any writes. + /// Read-only health check. Safe under live traffic; does NOT mutate + /// the region. Counts locked entries and ring states, and probes + /// per-ring owner liveness (a bounded number of cheap OS calls, one + /// per occupied ring -- intended for a periodic health timer, not a + /// hot path). /// /// Supervisor policy: /// - locked_entries > 0: crash residue, call repair_locked_entries() @@ -206,19 +207,22 @@ namespace kickmsg /// confirming the crashed publisher is gone /// - draining_rings > 0: usually transient (subscriber tearing down), /// persistent counts may indicate a stuck teardown + /// - dead_rings > 0: a subscriber holding a Live/Draining ring died; + /// call reclaim_dead_rings() to recover the ring slot /// - live_rings: normal occupancy - /// - schema_stuck: a claimant crashed between Unset → Claiming and - /// the Set release-store. Every future try_claim_schema() will - /// return false after its bounded wait until an operator calls - /// reset_schema_claim() (only safe after confirming the original - /// claimant is gone). + /// - schema_stuck: a claimant is in the Claiming state. This is a + /// point-in-time read, so a healthy in-progress try_claim_schema() + /// can transiently set it -- treat it as advisory and act + /// (reset_schema_claim()) only if it persists AND the claimant is + /// confirmed gone. struct HealthReport { uint32_t locked_entries; ///< Entries stuck at LOCKED_SEQUENCE uint32_t retired_rings; ///< Free rings with stale in_flight > 0 uint32_t draining_rings; ///< Draining rings with in_flight > 0 + uint32_t dead_rings; ///< Live/Draining rings whose owner process is gone uint32_t live_rings; ///< Active subscriber rings - bool schema_stuck; ///< schema_state wedged at Claiming (crashed claimant) + bool schema_stuck; ///< schema_state at Claiming (advisory; may be a live claim) }; HealthReport diagnose(); @@ -242,6 +246,21 @@ namespace kickmsg /// Returns the number of rings reset. std::size_t reset_retired_rings(); + /// Reclaim rings whose owning subscriber process has died (a Live or + /// Draining ring left behind by a crash). The owner pid + start time + /// recorded at claim time are checked against the OS; only rings with + /// a provably-dead owner are reclaimed, so this is safe under live + /// traffic -- a slow-but-alive subscriber is never touched. in_flight + /// is preserved (a mid-commit publisher must still fetch_sub), so a + /// reclaimed ring may land in the retired state for reset_retired_rings() + /// to finish; committed slot refs are recovered by + /// reclaim_orphaned_slots(). Returns the number of rings reclaimed. + /// + /// Residual: a subscriber that crashes in the few instructions between + /// winning the claim CAS and recording its pid leaves owner_pid == 0, + /// which this cannot attribute and so will not reclaim. + std::size_t reclaim_dead_rings(); + /// Runtime counter snapshot — safe under live traffic. /// /// Reads the cross-process per-ring counters (`write_pos`, diff --git a/include/kickmsg/Subscriber.h b/include/kickmsg/Subscriber.h index fd495fc..5db3cf0 100644 --- a/include/kickmsg/Subscriber.h +++ b/include/kickmsg/Subscriber.h @@ -1,12 +1,9 @@ #ifndef KICKMSG_SUBSCRIBER_H #define KICKMSG_SUBSCRIBER_H -#include -#include #include "kickmsg/types.h" #include "kickmsg/Region.h" -#include "kickmsg/os/Futex.h" namespace kickmsg { diff --git a/include/kickmsg/types.h b/include/kickmsg/types.h index e9933b4..19c39f8 100644 --- a/include/kickmsg/types.h +++ b/include/kickmsg/types.h @@ -20,7 +20,7 @@ namespace kickmsg "Kickmsg requires lock-free 32-bit atomics."); constexpr uint64_t MAGIC = 0x4B49434B4D534721ULL; // "KICKMSG!" - constexpr uint32_t VERSION = 5; + constexpr uint32_t VERSION = 6; constexpr uint32_t INVALID_SLOT = UINT32_MAX; constexpr uint64_t LOCKED_SEQUENCE = UINT64_MAX; constexpr std::size_t CACHE_LINE = 64; @@ -248,15 +248,20 @@ namespace kickmsg struct SubRingHeader { alignas(CACHE_LINE) std::atomic state_flight; ///< Packed [in_flight:30 | state:2] + std::atomic owner_pid; ///< Claiming subscriber's pid; 0 when Free (liveness recovery) + std::atomic owner_starttime; ///< owner_pid's start time; pid-reuse guard for reclaim_dead_rings alignas(CACHE_LINE) std::atomic write_pos; ///< Monotonically increasing position counter std::atomic has_waiter; ///< Set by subscriber before futex_wait std::atomic dropped_count; ///< Cumulative publisher drops on this ring (all publishers) std::atomic lost_count; ///< Cumulative subscriber losses on this ring (all subscribers) }; + // owner_pid/owner_starttime live in state_flight's cache-line padding, so + // the struct stays 2 lines and the ring-stride math is unchanged. They are + // cold (written once on claim, read only by reclaim_dead_rings), so sharing + // the line with the hot state_flight costs nothing in steady state. static_assert(sizeof(SubRingHeader) == 2 * CACHE_LINE, - "SubRingHeader must stay 2 cache lines — the counter fields fit in " - "the existing write_pos line padding; expanding this struct requires " - "reconsidering ring-stride math in Region.cc"); + "SubRingHeader must stay 2 cache lines — expanding it past the " + "write_pos line padding requires reconsidering ring-stride math in Region.cc"); /// Slot header: prepended to each payload buffer in the pool. /// Packed to guarantee binary layout across compilers. diff --git a/scripts/validate.sh b/scripts/validate.sh index e337f05..66047cb 100755 --- a/scripts/validate.sh +++ b/scripts/validate.sh @@ -58,14 +58,16 @@ step "Stress tests — single pass" "$BUILD_PATH/kickmsg_stress_test" success "Stress tests passed" -# Repeat the stress suite several times with shuffled order. The whole -# point of running on ARM64 is that memory-ordering bugs appear -# intermittently — a single run that happens to pass tells us nothing. -# Ten passes with shuffled ordering is the 80/20 of catching real races -# without blowing out the wall-clock budget. -step "Stress tests — shuffled repeat x10 (ARM64 memory-ordering probe)" -"$BUILD_PATH/kickmsg_stress_test" --gtest_repeat=10 --gtest_shuffle -success "Shuffled stress repeat passed" +# Repeat the stress suite several times. The whole point of running on +# ARM64 is that memory-ordering bugs appear intermittently -- a single +# run that happens to pass tells us nothing. kickmsg_stress_test is not +# a GoogleTest binary (it has its own main and already randomizes timing +# within each pass), so repetition is a shell loop, not --gtest_repeat. +step "Stress tests — repeat x10 (ARM64 memory-ordering probe)" +for _ in $(seq 1 10); do + "$BUILD_PATH/kickmsg_stress_test" +done +success "Stress repeat x10 passed" step "Crash / recovery tests" "$BUILD_PATH/kickmsg_crash_test" diff --git a/src/Naming.cc b/src/Naming.cc index 7be4dc0..dfdecbf 100644 --- a/src/Naming.cc +++ b/src/Naming.cc @@ -4,14 +4,17 @@ #include #include #include -#include #include #include -#include #include #if defined(__APPLE__) || defined(__DARWIN__) #include +#elif defined(_WIN32) + #ifndef WIN32_LEAN_AND_MEAN + #define WIN32_LEAN_AND_MEAN + #endif + #include // MAX_PATH #endif namespace kickmsg @@ -70,16 +73,24 @@ namespace kickmsg out.resize(PSHMNAMLEN - 1); return out; #else - // shm_open accepts '/' + up to NAME_MAX bytes for the filename portion on tmpfs. + // Readable "/ns_suffix" name. Linux shm_open counts NAME_MAX bytes for + // the filename portion (the leading '/' is not part of it); Windows + // CreateFileMapping caps the whole object name at MAX_PATH. std::string out = "/"; out += ns; out += '_'; out += suffix; - // -1 because / is not taken into account - if (out.size() - 1 > static_cast(NAME_MAX)) + #if defined(_WIN32) + std::size_t const len = out.size(); + std::size_t const limit = static_cast(MAX_PATH); + #else + std::size_t const len = out.size() - 1; + std::size_t const limit = static_cast(NAME_MAX); + #endif + if (len > limit) { throw std::system_error(ENAMETOOLONG, std::generic_category(), - "kickmsg::compose_shm_name: shm name exceeds NAME_MAX"); + "kickmsg::compose_shm_name: shm name exceeds platform limit"); } return out; #endif diff --git a/src/Node.cc b/src/Node.cc index 897c792..6b9da80 100644 --- a/src/Node.cc +++ b/src/Node.cc @@ -141,6 +141,17 @@ namespace kickmsg { auto shm_name = make_topic_name(topic); auto topic_path = with_leading_slash(topic); + // Guard the create: without it, a second advertise() of the same + // topic re-evaluates SharedRegion::create() (shm_open O_TRUNC + + // memset) on the already-mapped live segment before emplace + // discards the duplicate — wiping the header/rings/pool under any + // existing Publisher and remote peers. + if (auto* r = find_region(shm_name)) + { + touch_registry(shm_name, topic_path, channel::PubSub, + registry::Pubsub, registry::Publisher); + return Publisher(*r); + } auto [it, _] = regions_.emplace( shm_name, SharedRegion::create(shm_name.c_str(), channel::PubSub, cfg, name_.c_str())); @@ -226,6 +237,17 @@ namespace kickmsg mbx_cfg.max_subscribers = 1; auto shm_name = make_mailbox_name(name_.c_str(), tag); auto topic_path = mailbox_topic(name_.c_str(), tag); + // Guard the create (see advertise): a second create_mailbox() of + // the same tag must not O_TRUNC+memset the live segment. Returning + // a handle to the existing region makes the duplicate claim fail + // loudly in the Subscriber ctor (the single ring is already Live) + // instead of silently wiping the mailbox. + if (auto* r = find_region(shm_name)) + { + touch_registry(shm_name, topic_path, channel::PubSub, + registry::Mailbox, registry::Subscriber); + return Subscriber(*r); + } auto [it, _] = regions_.emplace( shm_name, SharedRegion::create(shm_name.c_str(), channel::PubSub, mbx_cfg, name_.c_str())); diff --git a/src/Publisher.cc b/src/Publisher.cc index 9626e97..068f873 100644 --- a/src/Publisher.cc +++ b/src/Publisher.cc @@ -1,6 +1,6 @@ #include "kickmsg/Publisher.h" - -#include +#include "kickmsg/os/Futex.h" +#include "kickmsg/os/Time.h" namespace kickmsg { @@ -208,7 +208,13 @@ namespace kickmsg ring->state_flight.fetch_sub(ring::IN_FLIGHT_ONE, std::memory_order_release); - // Conditional wake: skip the syscall when no subscriber is blocking. + // seq_cst fence orders the write_pos fetch_add before the + // has_waiter load: without it a weakly-ordered CPU can read + // has_waiter == 0 stale and skip the wake to a subscriber already + // parked in futex_wait (a lost wakeup until its timeout). Pairs + // with the subscriber's fence. x86's locked RMW already fences, + // which is why this never surfaced on x86. + std::atomic_thread_fence(std::memory_order_seq_cst); if (ring->has_waiter.load(std::memory_order_relaxed)) { futex_wake_all(ring->write_pos); @@ -278,6 +284,12 @@ namespace kickmsg void Publisher::release_slot(uint32_t idx) { + // idx is read from a ring entry a peer wrote; a crashed or hostile + // publisher can leave it out of range (this also covers INVALID_SLOT). + if (idx >= header_->pool_size) + { + return; + } auto* s = slot_at(base_, header_, idx); uint32_t prev = s->refcount.fetch_sub(1, std::memory_order_acq_rel); if (prev == 1) diff --git a/src/Region.cc b/src/Region.cc index ed8e01c..dd367a5 100644 --- a/src/Region.cc +++ b/src/Region.cc @@ -1,8 +1,5 @@ -#include -#include #include #include -#include #include "kickmsg/Region.h" #include "kickmsg/os/Process.h" @@ -49,15 +46,39 @@ namespace kickmsg RegionLayout compute_layout(channel::Config const& cfg, char const* creator_name) { + if (creator_name == nullptr) + { + throw std::runtime_error("creator_name must not be null"); + } + std::size_t name_len = std::strlen(creator_name); + // creator_name_len is a uint16_t header field; a silent cast + // would truncate the name and desync required_size/attach_create. + if (name_len > UINT16_MAX) + { + throw std::runtime_error("creator_name exceeds 65535 bytes"); + } + RegionLayout layout; - layout.creator_len = static_cast(std::strlen(creator_name)); + layout.creator_len = static_cast(name_len); layout.header_size = align_up(sizeof(Header) + layout.creator_len, CACHE_LINE); layout.ring_stride = align_up( sizeof(SubRingHeader) + cfg.sub_ring_capacity * sizeof(Entry), CACHE_LINE); layout.slot_stride = align_up(sizeof(SlotHeader) + cfg.max_payload_size, CACHE_LINE); layout.sub_rings_offset = layout.header_size; - layout.pool_offset = layout.sub_rings_offset + cfg.max_subscribers * layout.ring_stride; - layout.total_size = layout.pool_offset + cfg.pool_size * layout.slot_stride; + + // Overflow guards: a cfg with huge counts must not wrap total_size + // into a small value that maps a tiny region while publishers and + // subscribers stride off the end. + if (cfg.max_subscribers > (SIZE_MAX - layout.sub_rings_offset) / layout.ring_stride) + { + throw std::runtime_error("Config too large: subscriber rings overflow"); + } + layout.pool_offset = layout.sub_rings_offset + cfg.max_subscribers * layout.ring_stride; + if (cfg.pool_size > (SIZE_MAX - layout.pool_offset) / layout.slot_stride) + { + throw std::runtime_error("Config too large: slot pool overflow"); + } + layout.total_size = layout.pool_offset + cfg.pool_size * layout.slot_stride; return layout; } } @@ -133,38 +154,16 @@ namespace kickmsg /// Validate that an already-attached Header has internally /// consistent geometry. /// - /// Threat model. attach_open() lets the caller hand kickmsg - /// arbitrary bytes. validate_opened()'s magic/version/total_size - /// checks filter obvious garbage, but every other Header field - /// (offsets, strides, counts, lengths) is then used unchecked by - /// Publisher, Subscriber, info(), diagnose(), stats() and the - /// repair paths to compute pointers into the buffer. A buffer - /// whose first 24 bytes happen to satisfy the trio but whose - /// geometry fields are junk (corruption, crash residue, hostile - /// input, a region produced by a different kickmsg build) would - /// cause those callers to compute wild pointers — at best SEGV - /// on first publish, at worst silent OOB reads via info()'s - /// std::string::assign(tail, creator_name_len) or sub_ring_at() - /// stride math. - /// - /// This function applies the invariants compute_layout() and - /// stamp_new_region() always satisfy: a region kickmsg itself - /// stamps ALWAYS passes. Only corrupt or hostile input can fail - /// it. Cheap (a handful of integer compares, no syscalls), runs - /// once at attach, never on the hot path. - /// - /// shm-backed open() runs this too — defense in depth in case a - /// peer process writes garbage into the segment after stamping, - /// or a stale segment is reused across version bumps. + /// Reject a Header whose geometry fields are not self-consistent. + /// attach_open() trusts caller-supplied bytes, and every offset / + /// stride / count / length below drives later pointer math in + /// Publisher, Subscriber, info() and the repair paths -- junk here + /// means wild pointers. A region kickmsg itself stamped always + /// passes; only corrupt or hostile input fails. open() runs it too + /// as defense in depth. Caller has already checked magic, version, + /// and size >= total_size. void validate_header_geometry(Header const* h) { - // Pre: validate_opened() has already ensured - // - buffer_size >= sizeof(Header) - // - h->magic == MAGIC, h->version == VERSION - // - buffer_size >= h->total_size - // Every offset comparison below uses h->total_size as the - // trusted upper bound; we re-check it here in case total_size - // itself is junk smaller than the struct. if (h->total_size < sizeof(Header)) { throw std::runtime_error( @@ -193,13 +192,6 @@ namespace kickmsg "Header geometry: sub_ring_mask inconsistent with capacity"); } - // creator_name tail bytes live at offset sizeof(Header). - if (h->creator_name_len > h->total_size - sizeof(Header)) - { - throw std::runtime_error( - "Header geometry: creator_name_len exceeds region"); - } - // Sub-rings span [sub_rings_offset, pool_offset); pool spans // [pool_offset, total_size). if (h->sub_rings_offset < sizeof(Header) @@ -210,6 +202,14 @@ namespace kickmsg "Header geometry: ring/pool offsets out of range"); } + // creator_name tail lives in [sizeof(Header), sub_rings_offset); + // bound it there so info() can't read into the ring/pool area. + if (h->creator_name_len > h->sub_rings_offset - sizeof(Header)) + { + throw std::runtime_error( + "Header geometry: creator_name_len exceeds tail"); + } + // Bound sub_ring_capacity by total_size before multiplying so // the min_ring_stride product can't overflow on a junk value. if (h->sub_ring_capacity > h->total_size / sizeof(Entry)) @@ -277,6 +277,31 @@ namespace kickmsg } validate_header_geometry(h); } + + // True if the ring's recorded owner is provably gone. owner_pid == 0 + // means unowned or a claim in progress -> not dead. Mirrors + // Registry::sweep_stale: a matching boot-relative starttime confirms + // the same process; if either starttime is 0 the platform can't + // disambiguate, so trust pid-alive. + bool ring_owner_dead(SubRingHeader const* ring) + { + uint64_t pid = ring->owner_pid.load(std::memory_order_acquire); + if (pid == 0) + { + return false; + } + if (not process_exists(pid)) + { + return true; + } + uint64_t stored = ring->owner_starttime.load(std::memory_order_relaxed); + uint64_t live = process_starttime(pid); + if (stored == 0 or live == 0) + { + return false; + } + return stored != live; + } } std::size_t SharedRegion::required_size(channel::Config const& cfg, @@ -409,6 +434,11 @@ namespace kickmsg region.shm_ = std::move(shm); region.base_ = region.shm_.address(); region.size_ = region.shm_.size(); + // config_hash covers the cfg fields but NOT total_size, + // offsets, or strides — validate the geometry like + // open()/attach_open() so a corrupt or partially-stamped + // creator can't hand us junk that later pointer math trusts. + validate_opened(region.base_, region.size_); return region; } // SHM exists but magic/version not ready yet — creator @@ -494,6 +524,15 @@ namespace kickmsg { ++report.draining_rings; } + + // A Live or Draining ring whose owner process is gone is an + // orphan no other count surfaces (a dead Live ring otherwise + // reads as healthy). reclaim_dead_rings() recovers it. + if ((state == ring::Live or state == ring::Draining) + and ring_owner_dead(ring)) + { + ++report.dead_rings; + } } return report; @@ -577,6 +616,8 @@ namespace kickmsg { ring->state_flight.store(ring::make_packed(ring::Free), std::memory_order_release); + ring->owner_pid.store(0, std::memory_order_relaxed); + ring->owner_starttime.store(0, std::memory_order_relaxed); ++reset; } } @@ -584,6 +625,53 @@ namespace kickmsg return reset; } + std::size_t SharedRegion::reclaim_dead_rings() + { + auto* b = base(); + auto* h = header(); + std::size_t reclaimed = 0; + + for (uint64_t i = 0; i < h->max_subs; ++i) + { + auto* ring = sub_ring_at(b, h, static_cast(i)); + uint32_t packed = ring->state_flight.load(std::memory_order_acquire); + ring::State state = ring::get_state(packed); + + if (state != ring::Live and state != ring::Draining) + { + continue; + } + if (not ring_owner_dead(ring)) + { + continue; + } + + // Owner is gone. Flip the state bits to Free but PRESERVE + // in_flight: a publisher mid-commit to this ring will still + // fetch_sub, and zeroing in_flight here would underflow it into + // the state bits. A leftover Free | in_flight>0 is then a job + // for reset_retired_rings() once the publisher is confirmed gone; + // committed entries' slot refs are recovered by + // reclaim_orphaned_slots(). Clear owner last (Free + stale owner + // is harmless; the next claimer overwrites it). + uint32_t old = packed; + while (true) + { + uint32_t desired = (old & ~ring::STATE_MASK) | ring::Free; + if (ring->state_flight.compare_exchange_weak(old, desired, + std::memory_order_release, std::memory_order_acquire)) + { + break; + } + } + ring->owner_pid.store(0, std::memory_order_relaxed); + ring->owner_starttime.store(0, std::memory_order_relaxed); + ++reclaimed; + } + + return reclaimed; + } + std::optional SharedRegion::schema() const { auto const* h = header(); @@ -595,6 +683,10 @@ namespace kickmsg } SchemaInfo out; std::memcpy(&out, &h->schema_data, sizeof(SchemaInfo)); + // name is a C string consumers stream with operator<<; a hostile or + // corrupt region may leave it unterminated. Force a terminator so a + // reader can't run off the array. + out.name[sizeof(out.name) - 1] = '\0'; return out; } diff --git a/src/Registry.cc b/src/Registry.cc index 705dbf0..4cfcb79 100644 --- a/src/Registry.cc +++ b/src/Registry.cc @@ -1,7 +1,6 @@ #include "kickmsg/Registry.h" #include -#include #include #include @@ -78,6 +77,17 @@ namespace kickmsg throw std::runtime_error( "Registry version mismatch on " + name); } + // capacity is read from shared memory and drives every + // entries[0..capacity) walk; a corrupt value would send + // snapshot()/sweep_stale() off the mapping. Bound it + // (overflow-safe) against the actual segment size. + std::size_t avail = shm.size() - sizeof(RegistryHeader); + if (shm.size() < sizeof(RegistryHeader) + or h->capacity > avail / sizeof(ParticipantEntry)) + { + throw std::runtime_error( + "Registry capacity exceeds segment on " + name); + } Registry out; out.name_ = name; out.shm_ = std::move(shm); diff --git a/src/Subscriber.cc b/src/Subscriber.cc index c901bf6..493564b 100644 --- a/src/Subscriber.cc +++ b/src/Subscriber.cc @@ -1,7 +1,8 @@ -#include #include #include "kickmsg/Subscriber.h" +#include "kickmsg/os/Futex.h" +#include "kickmsg/os/Process.h" #include "kickmsg/os/Time.h" namespace kickmsg @@ -34,6 +35,16 @@ namespace kickmsg ring::make_packed(ring::Live), std::memory_order_acq_rel)) { + // Record owner liveness so reclaim_dead_rings() can recover + // this ring if we crash without releasing it. starttime + // first; owner_pid (release) last, so a sweeper that reads a + // non-zero pid also sees the matching starttime. owner_pid + // stays 0 until here, so a sweep racing the claim sees 0 and + // skips (treats it as a claim in progress). + uint64_t pid = current_pid(); + ring->owner_starttime.store(process_starttime(pid), + std::memory_order_relaxed); + ring->owner_pid.store(pid, std::memory_order_release); ring_idx_ = i; start_pos_ = wp; read_pos_ = start_pos_; @@ -112,6 +123,13 @@ namespace kickmsg } } + // Clear owner AFTER reaching Free. Order matters: clearing earlier + // would leave a crash window where a Draining ring shows owner == 0 + // and reclaim_dead_rings skips it. The worst case here is a brief + // Free + stale-owner, which the next claimer simply overwrites. + ring->owner_pid.store(0, std::memory_order_relaxed); + ring->owner_starttime.store(0, std::memory_order_relaxed); + ring_idx_ = UINT32_MAX; } @@ -155,6 +173,12 @@ namespace kickmsg std::optional Subscriber::try_receive() { + // Moved-from Subscriber: ring_idx_ is the UINT32_MAX sentinel, so + // sub_ring_at would compute a wild pointer. + if (ring_idx_ == UINT32_MAX) + { + return std::nullopt; + } auto* ring = sub_ring_at(base_, header_, ring_idx_); for (int retries = 0; retries < 64; ++retries) @@ -214,7 +238,10 @@ namespace kickmsg auto* slot = slot_at(base_, header_, slot_idx); uint32_t rc = slot->refcount.load(std::memory_order_acquire); bool pinned = false; - while (rc > 0) + // rc == UINT32_MAX is unreachable for a healthy slot (refcount is + // bounded by max_subs + live views); treat it as corrupt residue + // so rc + 1 can't wrap to 0 and make a pinned slot look freeable. + while (rc > 0 and rc != UINT32_MAX) { if (slot->refcount.compare_exchange_weak(rc, rc + 1, std::memory_order_acq_rel, std::memory_order_acquire)) @@ -226,7 +253,7 @@ namespace kickmsg if (not pinned) { - // refcount == 0: slot already freed, count as lost. + // refcount == 0 (or corrupt): slot not pinnable, count as lost. ++lost_; ring->lost_count.fetch_add(1, std::memory_order_relaxed); ++read_pos_; @@ -267,6 +294,12 @@ namespace kickmsg std::optional Subscriber::receive(nanoseconds timeout) { + // Moved-from Subscriber: ring_idx_ is the UINT32_MAX sentinel, so + // sub_ring_at would compute a wild pointer. + if (ring_idx_ == UINT32_MAX) + { + return std::nullopt; + } auto* ring = sub_ring_at(base_, header_, ring_idx_); nanoseconds start = kickmsg::monotonic_ns(); @@ -293,6 +326,10 @@ namespace kickmsg return std::nullopt; } ring->has_waiter.store(1, std::memory_order_relaxed); + // Pairs with the publisher's seq_cst fence: orders this store + // before futex_wait's kernel read of write_pos so a concurrent + // publish can't be missed on a weakly-ordered CPU. + std::atomic_thread_fence(std::memory_order_seq_cst); futex_wait(ring->write_pos, cur, remaining); ring->has_waiter.store(0, std::memory_order_relaxed); } @@ -301,6 +338,12 @@ namespace kickmsg std::optional Subscriber::try_receive_view() { + // Moved-from Subscriber: ring_idx_ is the UINT32_MAX sentinel, so + // sub_ring_at would compute a wild pointer. + if (ring_idx_ == UINT32_MAX) + { + return std::nullopt; + } auto* ring = sub_ring_at(base_, header_, ring_idx_); for (int retries = 0; retries < 64; ++retries) @@ -352,7 +395,8 @@ namespace kickmsg auto* slot = slot_at(base_, header_, slot_idx); uint32_t rc = slot->refcount.load(std::memory_order_acquire); bool pinned = false; - while (rc > 0) + // rc == UINT32_MAX is corrupt residue; skip so rc + 1 can't wrap. + while (rc > 0 and rc != UINT32_MAX) { if (slot->refcount.compare_exchange_weak(rc, rc + 1, std::memory_order_acq_rel, std::memory_order_acquire)) @@ -394,6 +438,12 @@ namespace kickmsg std::optional Subscriber::receive_view(nanoseconds timeout) { + // Moved-from Subscriber: ring_idx_ is the UINT32_MAX sentinel, so + // sub_ring_at would compute a wild pointer. + if (ring_idx_ == UINT32_MAX) + { + return std::nullopt; + } auto* ring = sub_ring_at(base_, header_, ring_idx_); nanoseconds start = kickmsg::monotonic_ns(); @@ -420,6 +470,10 @@ namespace kickmsg return std::nullopt; } ring->has_waiter.store(1, std::memory_order_relaxed); + // Pairs with the publisher's seq_cst fence: orders this store + // before futex_wait's kernel read of write_pos so a concurrent + // publish can't be missed on a weakly-ordered CPU. + std::atomic_thread_fence(std::memory_order_seq_cst); futex_wait(ring->write_pos, cur, remaining); ring->has_waiter.store(0, std::memory_order_relaxed); } diff --git a/src/os/darwin/Futex.cc b/src/os/darwin/Futex.cc index 2e1c1f1..5563260 100644 --- a/src/os/darwin/Futex.cc +++ b/src/os/darwin/Futex.cc @@ -1,7 +1,6 @@ #include "kickmsg/os/Futex.h" #include -#include // macOS kernel ulock API. // Used by libc++ (std::atomic::wait) and libdispatch since macOS 10.12. diff --git a/src/os/darwin/SharedMemory.cc b/src/os/darwin/SharedMemory.cc index 5efeae4..d7bf780 100644 --- a/src/os/darwin/SharedMemory.cc +++ b/src/os/darwin/SharedMemory.cc @@ -4,8 +4,6 @@ #include #include -#include -#include #include #include diff --git a/src/os/darwin/Time.cc b/src/os/darwin/Time.cc index d7348b4..324c1ab 100644 --- a/src/os/darwin/Time.cc +++ b/src/os/darwin/Time.cc @@ -4,7 +4,6 @@ #include #include -#include #include namespace kickmsg diff --git a/src/os/linux/SharedMemory.cc b/src/os/linux/SharedMemory.cc index 3503a37..6a64296 100644 --- a/src/os/linux/SharedMemory.cc +++ b/src/os/linux/SharedMemory.cc @@ -4,7 +4,6 @@ #include #include -#include #include #include #include diff --git a/src/os/linux/Time.cc b/src/os/linux/Time.cc index efb8b97..4e2ed4c 100644 --- a/src/os/linux/Time.cc +++ b/src/os/linux/Time.cc @@ -4,7 +4,6 @@ #include #include -#include #include namespace kickmsg diff --git a/src/os/posix/SharedMemory.cc b/src/os/posix/SharedMemory.cc index b98a85e..08a1bde 100644 --- a/src/os/posix/SharedMemory.cc +++ b/src/os/posix/SharedMemory.cc @@ -4,7 +4,6 @@ #include #include -#include #include #include #include @@ -122,6 +121,15 @@ namespace kickmsg } size_ = static_cast(st.st_size); + if (size_ == 0) + { + // Creator reached shm_open(O_CREAT) but not ftruncate() yet. + // mmap(., 0, .) would fail EINVAL; report not-ready so the + // create_or_open / spin_open retry loops keep spinning. + ::close(fd_); + fd_ = INVALID_SHM_HANDLE; + return false; + } address_ = ::mmap(nullptr, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0); if (address_ == MAP_FAILED) { diff --git a/src/os/posix/Time.cc b/src/os/posix/Time.cc index 69c0906..e8d0421 100644 --- a/src/os/posix/Time.cc +++ b/src/os/posix/Time.cc @@ -5,7 +5,6 @@ #include #include #include -#include #include namespace kickmsg diff --git a/src/os/windows/SharedMemory.cc b/src/os/windows/SharedMemory.cc index 5727b4f..728d31f 100644 --- a/src/os/windows/SharedMemory.cc +++ b/src/os/windows/SharedMemory.cc @@ -1,7 +1,5 @@ #include "kickmsg/os/SharedMemory.h" -#include -#include #include namespace kickmsg diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index fe81f25..1db3550 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -28,6 +28,12 @@ target_link_libraries(kickmsg_stress_test PRIVATE kickmsg) set_target_properties(kickmsg_stress_test PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) +# Registered so ctest (and the TSAN job) actually exercise the lock-free +# contention paths -- the Treiber free-stack and pool exhaustion races only +# show up here, not in the unit suite. Generous timeout: TSAN scales the +# iteration counts up ~100x. +add_test(NAME stress COMMAND kickmsg_stress_test WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) +set_tests_properties(stress PROPERTIES TIMEOUT 900) # Crash test uses fork/waitpid/SIGKILL — POSIX only if(NOT WIN32) @@ -36,10 +42,14 @@ if(NOT WIN32) set_target_properties(kickmsg_crash_test PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) + add_test(NAME crash COMMAND kickmsg_crash_test WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) + set_tests_properties(crash PROPERTIES TIMEOUT 300) add_executable(kickmsg_registry_stress_test registry_stress_test.cc) target_link_libraries(kickmsg_registry_stress_test PRIVATE kickmsg) set_target_properties(kickmsg_registry_stress_test PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) + add_test(NAME registry_stress COMMAND kickmsg_registry_stress_test WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) + set_tests_properties(registry_stress PROPERTIES TIMEOUT 300) endif() diff --git a/tests/crash_test.cc b/tests/crash_test.cc index 0847ff4..b4fccd3 100644 --- a/tests/crash_test.cc +++ b/tests/crash_test.cc @@ -15,17 +15,14 @@ /// sequences) and Case-B (retired rings) residue coexist at multiple /// ring positions. -#include #include #include #include #include #include #include -#include #include #include -#include #include "kickmsg/os/Time.h" #include "kickmsg/Publisher.h" diff --git a/tests/endurance.sh b/tests/endurance.sh index 1c2c9aa..72a36cb 100755 --- a/tests/endurance.sh +++ b/tests/endurance.sh @@ -15,9 +15,13 @@ echo "" while [ "$(date +%s)" -lt "$END_TIME" ]; do OUTPUT=$("$BINARY" 2>&1) RUNS=$((RUNS + 1)) + if [ "$RUNS" -eq 1 ]; then + echo "$OUTPUT" | grep -iE "harness built" || true + echo "" + fi SUMMARY=$(echo "$OUTPUT" | grep "Summary:" | tail -1) - RUN_PASS=$(echo "$SUMMARY" | grep -oP '\d+ passed' | grep -oP '\d+') - RUN_FAIL=$(echo "$SUMMARY" | grep -oP '\d+ failed' | grep -oP '\d+') + RUN_PASS=$(echo "$SUMMARY" | grep -oE '[0-9]+ passed' | grep -oE '[0-9]+') + RUN_FAIL=$(echo "$SUMMARY" | grep -oE '[0-9]+ failed' | grep -oE '[0-9]+') RUN_REORDER=$(echo "$OUTPUT" | { grep -c "REORDER" || true; }) PASS=$((PASS + RUN_PASS)) FAIL=$((FAIL + RUN_FAIL)) diff --git a/tests/registry_stress_test.cc b/tests/registry_stress_test.cc index 5a318bb..4199379 100644 --- a/tests/registry_stress_test.cc +++ b/tests/registry_stress_test.cc @@ -15,11 +15,8 @@ /// Exits 0 on success, non-zero on any assertion failure. #include -#include #include #include -#include -#include #include #include #include @@ -27,7 +24,6 @@ #include "kickmsg/Registry.h" #include "kickmsg/os/Process.h" -#include "kickmsg/os/Time.h" using namespace kickmsg; using namespace std::chrono_literals; diff --git a/tests/stress/main.cc b/tests/stress/main.cc index 8cbaacd..9f4ccf2 100644 --- a/tests/stress/main.cc +++ b/tests/stress/main.cc @@ -1,4 +1,5 @@ #include "common.h" +#include "kickmsg/version.h" // Forward declarations bool run_treiber_stress(); @@ -13,7 +14,13 @@ bool run_subscriber_saturation(); int main() { - std::printf("=== Kickmsg Lock-Free Stress Tests ===\n\n"); + std::printf("=== Kickmsg Lock-Free Stress Tests ===\n"); + // Build stamp: confirm which binary is running. __DATE__/__TIME__ is this + // harness TU's compile time; shm ABI version confirms the layout in use. + std::printf("kickmsg %s | shm ABI v%u | harness built %s %s\n\n", + KICKMSG_VERSION_STRING, + static_cast(kickmsg::VERSION), + __DATE__, __TIME__); TestRunner runner; diff --git a/tests/unit/naming-t.cc b/tests/unit/naming-t.cc index 5d18bb3..424878c 100644 --- a/tests/unit/naming-t.cc +++ b/tests/unit/naming-t.cc @@ -8,6 +8,11 @@ #if defined(__APPLE__) || defined(__DARWIN__) #include +#elif defined(_WIN32) + #ifndef WIN32_LEAN_AND_MEAN + #define WIN32_LEAN_AND_MEAN + #endif + #include // MAX_PATH #endif using kickmsg::sanitize_shm_component; @@ -134,6 +139,8 @@ TEST(ComposeShmName, FitsPlatformLimit) EXPECT_EQ(name[0], '/'); #if defined(__APPLE__) || defined(__DARWIN__) EXPECT_LE(name.size(), static_cast(PSHMNAMLEN) - 1); +#elif defined(_WIN32) + EXPECT_LE(name.size(), static_cast(MAX_PATH)); #else EXPECT_LE(name.size() - 1, static_cast(NAME_MAX)); #endif @@ -178,11 +185,12 @@ TEST(ComposeShmName, FitsEvenWhenInputsAreLong) } #if !defined(__APPLE__) && !defined(__DARWIN__) -TEST(ComposeShmName, ThrowsOnLinuxWhenExceedingNameMax) +TEST(ComposeShmName, ThrowsWhenExceedingPlatformLimit) { - // Linux: pre-composed name longer than NAME_MAX must throw clearly - // (system_error/ENAMETOOLONG) instead of silently truncating or - // failing inside shm_open with an opaque OS error. + // Non-macOS (readable names): a pre-composed name longer than the + // platform limit (Linux NAME_MAX, Windows MAX_PATH) must throw clearly + // instead of truncating or failing with an opaque OS error. 400+ chars + // exceeds both. macOS hashes to a fixed length, so it never throws here. std::string ns(200, 'n'); std::string sx(200, 's'); EXPECT_THROW(compose_shm_name(ns, sx), std::system_error); diff --git a/tests/unit/node-t.cc b/tests/unit/node-t.cc index 97cf9cf..848c9c8 100644 --- a/tests/unit/node-t.cc +++ b/tests/unit/node-t.cc @@ -1,5 +1,3 @@ -#include -#include #include @@ -58,6 +56,41 @@ TEST_F(NodeTest, AdvertiseAndSubscribe) EXPECT_EQ(got, 42u); } +TEST_F(NodeTest, AdvertiseTwiceDoesNotWipeLiveRegion) +{ + // Regression: a second advertise() of the same topic must NOT re-run + // SharedRegion::create() (O_TRUNC + memset) on the live segment. A + // subscriber that joined after the first advertise must keep working + // across the second advertise. + track("/test_dup"); + + kickmsg::Node node("node", "test"); + auto pub1 = node.advertise("dup", small_cfg()); + + kickmsg::Node sub_node("subnode", "test"); + auto sub = sub_node.subscribe("dup"); // claims a ring; region Live + + // Second advertise on the same node — must reuse, not wipe. + auto pub2 = node.advertise("dup", small_cfg()); + + uint32_t val = 7; + ASSERT_GE(pub2.send(&val, sizeof(val)), 0); + + // If the second advertise had memset the region, sub's ring claim and + // the header magic would be gone and this receive would fail. + auto sample = sub.try_receive(); + ASSERT_TRUE(sample.has_value()); + uint32_t got = 0; + std::memcpy(&got, sample->data(), sizeof(got)); + EXPECT_EQ(got, 7u); + + // The original publisher handle still targets the same intact region. + val = 8; + ASSERT_GE(pub1.send(&val, sizeof(val)), 0); + auto s2 = sub.try_receive(); + ASSERT_TRUE(s2.has_value()); +} + TEST_F(NodeTest, NamingConventions) { kickmsg::Node node("mynode", "app"); diff --git a/tests/unit/publisher-t.cc b/tests/unit/publisher-t.cc index 6c50693..d9dd7b3 100644 --- a/tests/unit/publisher-t.cc +++ b/tests/unit/publisher-t.cc @@ -3,7 +3,6 @@ #include "kickmsg/Publisher.h" #include "kickmsg/Subscriber.h" -#include class PublisherTest : public ::testing::Test { diff --git a/tests/unit/region-t.cc b/tests/unit/region-t.cc index 6ae7bde..718ea62 100644 --- a/tests/unit/region-t.cc +++ b/tests/unit/region-t.cc @@ -5,12 +5,47 @@ #include "kickmsg/Subscriber.h" #include -#include #include -#include #include "kickmsg/os/Process.h" +#ifndef _WIN32 + #include + #include + #include +#else + #include // _aligned_malloc / _aligned_free +#endif + +namespace +{ + // CACHE_LINE-aligned heap buffer for the injected-region tests. + // posix_memalign is POSIX-only; Windows uses _aligned_malloc, whose + // memory MUST be released with _aligned_free (not free()). + void* aligned_buffer_alloc(std::size_t align, std::size_t size) + { +#if defined(_WIN32) + return _aligned_malloc(size, align); +#else + void* p = nullptr; + if (::posix_memalign(&p, align, size) != 0) + { + return nullptr; + } + return p; +#endif + } + + void aligned_buffer_free(void* p) + { +#if defined(_WIN32) + _aligned_free(p); +#else + ::free(p); +#endif + } +} + class RegionTest : public ::testing::Test { public: @@ -72,6 +107,25 @@ TEST_F(RegionTest, OpenNonexistentThrows) EXPECT_THROW(kickmsg::SharedRegion::open("/kickmsg_nonexistent_42"), std::runtime_error); } +#ifndef _WIN32 +TEST(SharedMemoryTest, TryOpenOnSizeZeroSegmentReturnsFalse) +{ + // A creator that did shm_open(O_CREAT) but not yet ftruncate() leaves a + // size-0 object. try_open must report not-ready (so create_or_open / + // spin_open retry) rather than mmap(., 0, .) -> EINVAL -> throw. + char const* name = "/kickmsg_test_size0"; + ::shm_unlink(name); + int fd = ::shm_open(name, O_RDWR | O_CREAT, 0666); + ASSERT_GE(fd, 0); + + kickmsg::SharedMemory sm; + EXPECT_FALSE(sm.try_open(name)); + + ::close(fd); + ::shm_unlink(name); +} +#endif + TEST_F(RegionTest, CreateOrOpenFirstCreates) { auto cfg = default_cfg(); @@ -109,6 +163,55 @@ TEST_F(RegionTest, CreateOrOpenConfigMismatchThrows) std::runtime_error); } +TEST_F(RegionTest, CreateOrOpenValidatesGeometryOnOpenBranch) +{ + auto cfg = default_cfg(); + auto creator = kickmsg::SharedRegion::create_or_open( + SHM_NAME, kickmsg::channel::PubSub, cfg, "creator"); + + // Corrupt a geometry field that config_hash does NOT cover (pool_offset + // is computed layout, not a cfg field). A second create_or_open hits + // the open branch with a matching config_hash, so only the geometry + // validation can catch it. + creator.header()->pool_offset = UINT64_MAX; + + EXPECT_THROW( + kickmsg::SharedRegion::create_or_open( + SHM_NAME, kickmsg::channel::PubSub, cfg, "opener"), + std::runtime_error); +} + +TEST_F(RegionTest, CreateRejectsOverlongCreatorName) +{ + std::string huge(70000, 'x'); // exceeds the uint16_t creator_name_len + EXPECT_THROW( + kickmsg::SharedRegion::create( + SHM_NAME, kickmsg::channel::PubSub, default_cfg(), huge.c_str()), + std::runtime_error); +} + +TEST_F(RegionTest, RequiredSizeRejectsOverflowingConfig) +{ + auto cfg = default_cfg(); + cfg.pool_size = SIZE_MAX / 2; // pool_size * slot_stride wraps total_size + EXPECT_THROW(kickmsg::SharedRegion::required_size(cfg), std::runtime_error); +} + +TEST_F(RegionTest, SchemaNameForcedNulTerminatedOnRead) +{ + auto region = kickmsg::SharedRegion::create( + SHM_NAME, kickmsg::channel::PubSub, default_cfg()); + auto* h = region.header(); + + // Corrupt: fill name with non-zero bytes and publish Set directly. + std::memset(h->schema_data.name, 'A', sizeof(h->schema_data.name)); + h->schema_state.store(kickmsg::schema::Set, std::memory_order_release); + + auto got = region.schema(); + ASSERT_TRUE(got.has_value()); + EXPECT_EQ(got->name[sizeof(got->name) - 1], '\0'); +} + TEST_F(RegionTest, HeaderStoresCreatorMetadata) { auto cfg = default_cfg(); @@ -510,6 +613,66 @@ TEST_F(RegionTest, ResetRetiredRingsLeavesDrainingUntouched) EXPECT_EQ(kickmsg::ring::get_in_flight(packed1), 1u); } +TEST_F(RegionTest, ReclaimDeadRingsRecoversCrashedOwnerRing) +{ + auto cfg = default_cfg(); + auto region = kickmsg::SharedRegion::create(SHM_NAME, kickmsg::channel::PubSub, cfg); + auto* h = region.header(); + + // Ring 0: a subscriber crashed holding it Live (guaranteed-dead pid, + // same sentinel the registry sweep test uses). + auto* ring0 = kickmsg::sub_ring_at(region.base(), h, 0); + ring0->owner_starttime.store(0, std::memory_order_relaxed); + ring0->owner_pid.store(0x7fffffff, std::memory_order_release); + ring0->state_flight.store(kickmsg::ring::make_packed(kickmsg::ring::Live), + std::memory_order_release); + + // Ring 1: a LIVE owner (this process) must never be reclaimed. + auto* ring1 = kickmsg::sub_ring_at(region.base(), h, 1); + ring1->owner_starttime.store( + kickmsg::process_starttime(kickmsg::current_pid()), std::memory_order_relaxed); + ring1->owner_pid.store(kickmsg::current_pid(), std::memory_order_release); + ring1->state_flight.store(kickmsg::ring::make_packed(kickmsg::ring::Live), + std::memory_order_release); + + EXPECT_EQ(region.diagnose().dead_rings, 1u); + EXPECT_EQ(region.reclaim_dead_rings(), 1u); + + // Ring 0 reclaimed to Free with owner cleared. + uint32_t p0 = ring0->state_flight.load(std::memory_order_acquire); + EXPECT_EQ(kickmsg::ring::get_state(p0), kickmsg::ring::Free); + EXPECT_EQ(ring0->owner_pid.load(std::memory_order_acquire), 0u); + + // Ring 1 (live owner) untouched. + uint32_t p1 = ring1->state_flight.load(std::memory_order_acquire); + EXPECT_EQ(kickmsg::ring::get_state(p1), kickmsg::ring::Live); + + // Idempotent. + EXPECT_EQ(region.reclaim_dead_rings(), 0u); + EXPECT_EQ(region.diagnose().dead_rings, 0u); +} + +TEST_F(RegionTest, ReclaimDeadRingsPreservesInFlight) +{ + auto cfg = default_cfg(); + auto region = kickmsg::SharedRegion::create(SHM_NAME, kickmsg::channel::PubSub, cfg); + auto* h = region.header(); + + // Dead owner holding a Live ring with a publisher still admitted: the + // reclaim must flip state to Free but keep in_flight (so the mid-commit + // publisher's fetch_sub can't underflow into the state bits). + auto* ring = kickmsg::sub_ring_at(region.base(), h, 0); + ring->owner_pid.store(0x7fffffff, std::memory_order_release); + ring->state_flight.store(kickmsg::ring::make_packed(kickmsg::ring::Live, 1), + std::memory_order_release); + + EXPECT_EQ(region.reclaim_dead_rings(), 1u); + + uint32_t p = ring->state_flight.load(std::memory_order_acquire); + EXPECT_EQ(kickmsg::ring::get_state(p), kickmsg::ring::Free); + EXPECT_EQ(kickmsg::ring::get_in_flight(p), 1u); // preserved +} + TEST_F(RegionTest, CollectGarbageDoesNotReclaimLiveSlots) { kickmsg::channel::Config cfg; @@ -1052,8 +1215,8 @@ class InjectedRegionTest : public ::testing::Test // Aligned heap buffer sized to fit a region with `cfg`. struct Buffer { - std::unique_ptr mem{nullptr, &::free}; - std::size_t size{0}; + std::unique_ptr mem{nullptr, &aligned_buffer_free}; + std::size_t size{0}; void* get() { return mem.get(); } }; @@ -1061,8 +1224,8 @@ class InjectedRegionTest : public ::testing::Test { Buffer b; b.size = kickmsg::SharedRegion::required_size(cfg, creator); - void* raw = nullptr; - EXPECT_EQ(::posix_memalign(&raw, kickmsg::CACHE_LINE, b.size), 0); + void* raw = aligned_buffer_alloc(kickmsg::CACHE_LINE, b.size); + EXPECT_NE(raw, nullptr); b.mem.reset(raw); return b; } @@ -1320,9 +1483,39 @@ TEST_F(CorruptedHeaderTest, RejectsTinySlotStride) std::runtime_error); } +TEST_F(CorruptedHeaderTest, RejectsTinyRingStride) +{ + auto buf = make_stamped(default_cfg()); + hdr(buf)->sub_ring_stride = 1; // smaller than a SubRingHeader + entries + EXPECT_THROW(kickmsg::SharedRegion::attach_open(buf.get(), buf.size), + std::runtime_error); +} + +TEST_F(CorruptedHeaderTest, RejectsRingCapacityOverflowingRegion) +{ + auto buf = make_stamped(default_cfg()); + // Huge power-of-two capacity with a consistent mask: passes the + // power-of-two and mask checks, must trip the pre-multiply overflow guard. + hdr(buf)->sub_ring_capacity = uint64_t{1} << 60; + hdr(buf)->sub_ring_mask = (uint64_t{1} << 60) - 1; + EXPECT_THROW(kickmsg::SharedRegion::attach_open(buf.get(), buf.size), + std::runtime_error); +} + TEST_F(CorruptedHeaderTest, StampedBufferStillValidates) { auto buf = make_stamped(default_cfg()); // Sanity: an unmodified stamped buffer must pass. EXPECT_NO_THROW(kickmsg::SharedRegion::attach_open(buf.get(), buf.size)); } + +TEST_F(CorruptedHeaderTest, RejectsCreatorNameLenPastTail) +{ + auto buf = make_stamped(default_cfg()); + // Within total_size but past the creator-name tail (would let info() + // read into the subscriber rings / pool). + hdr(buf)->creator_name_len = + static_cast(hdr(buf)->sub_rings_offset); + EXPECT_THROW(kickmsg::SharedRegion::attach_open(buf.get(), buf.size), + std::runtime_error); +} diff --git a/tests/unit/registry-t.cc b/tests/unit/registry-t.cc index 0bd5389..f3258ed 100644 --- a/tests/unit/registry-t.cc +++ b/tests/unit/registry-t.cc @@ -1,9 +1,9 @@ -#include #include #include #include +#include "kickmsg/Naming.h" #include "kickmsg/Node.h" #include "kickmsg/Registry.h" @@ -12,6 +12,32 @@ class RegistryTest : public ::testing::Test protected: static constexpr char const* KMSG_NAMESPACE = "kickmsg_regtest"; + // Mirror Node::make_topic_name / make_broadcast_name so test + // expectations match what Node actually composes on every platform + // (readable on Linux, hashed on macOS to fit PSHMNAMLEN). + static std::string topic_shm(char const* topic) + { + return kickmsg::compose_shm_name( + kickmsg::sanitize_shm_component(KMSG_NAMESPACE, "namespace"), + kickmsg::sanitize_shm_component(topic, "topic")); + } + + static std::string broadcast_shm(char const* channel) + { + return kickmsg::compose_shm_name( + kickmsg::sanitize_shm_component(KMSG_NAMESPACE, "namespace"), + "broadcast_" + kickmsg::sanitize_shm_component(channel, "channel")); + } + + // Mirror Registry::make_shm_name (private) so tests stay aligned with + // whatever the Registry actually composes per platform. + static std::string registry_shm() + { + return kickmsg::compose_shm_name( + kickmsg::sanitize_shm_component(KMSG_NAMESPACE, "namespace"), + "registry"); + } + void SetUp() override { kickmsg::Registry::unlink(KMSG_NAMESPACE); @@ -38,7 +64,7 @@ class RegistryTest : public ::testing::Test TEST_F(RegistryTest, OpenOrCreateIsIdempotent) { auto r1 = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); - EXPECT_EQ(r1.name(), std::string{"/"} + KMSG_NAMESPACE + "_registry"); + EXPECT_EQ(r1.name(), registry_shm()); // Second call opens the existing region — same name, same capacity. auto r2 = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); @@ -136,6 +162,22 @@ TEST_F(RegistryTest, VersionMismatchOnSmallerExistingRegionThrows) EXPECT_EQ(opened.capacity(), 8u); } +TEST_F(RegistryTest, OpenRejectsCorruptCapacity) +{ + // Establish an 8-slot registry, then corrupt capacity in the raw segment + // to far exceed the mapping. A fresh open must reject it instead of + // letting snapshot()/sweep_stale() walk entries past the mapped pages. + auto reg = kickmsg::Registry::open_or_create(KMSG_NAMESPACE, 8); + + kickmsg::SharedMemory raw; + raw.open(registry_shm()); + auto* h = static_cast(raw.address()); + h->capacity = 0xFFFFFFFF; + + EXPECT_THROW(kickmsg::Registry::open_or_create(KMSG_NAMESPACE, 8), + std::runtime_error); +} + TEST_F(RegistryTest, SweepStaleRemovesDeadPidEntries) { auto reg = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); @@ -173,7 +215,7 @@ TEST_F(RegistryTest, SweepStaleReclaimsWedgedClaimingSlot) kickmsg::INVALID_SLOT); // Open the registry SHM directly to install a wedged Claiming slot. - auto shm_name = std::string{"/"} + KMSG_NAMESPACE + "_registry"; + auto shm_name = registry_shm(); kickmsg::SharedMemory raw; raw.open(shm_name); auto* entries = reinterpret_cast( @@ -197,7 +239,7 @@ TEST_F(RegistryTest, SweepStaleSkipsClaimingSlotsWithoutPid) // first field write. Reclaiming would race with its stores. Must skip. auto reg = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); - auto shm_name = std::string{"/"} + KMSG_NAMESPACE + "_registry"; + auto shm_name = registry_shm(); kickmsg::SharedMemory raw; raw.open(shm_name); auto* entries = reinterpret_cast( @@ -231,7 +273,7 @@ TEST_F(RegistryTest, NodeAdvertiseRegistersPublisher) { kickmsg::Node n("pub_node", KMSG_NAMESPACE); auto pub = n.advertise("topicX", cfg); - track("/" + std::string{KMSG_NAMESPACE} + "_topicX"); + track(topic_shm("topicX")); auto reg = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); auto snap = reg.snapshot(); @@ -239,7 +281,7 @@ TEST_F(RegistryTest, NodeAdvertiseRegistersPublisher) EXPECT_EQ(snap[0].node_name, "pub_node"); EXPECT_EQ(snap[0].role, kickmsg::registry::Publisher); EXPECT_EQ(snap[0].shm_name, - std::string{"/"} + KMSG_NAMESPACE + "_topicX"); + topic_shm("topicX")); } // Node went out of scope — entry should be gone. @@ -257,7 +299,7 @@ TEST_F(RegistryTest, NodeBroadcastRegistersBoth) kickmsg::Node n("bcast_node", KMSG_NAMESPACE); auto bh = n.join_broadcast("chanX", cfg); - track("/" + std::string{KMSG_NAMESPACE} + "_broadcast_chanX"); + track(broadcast_shm("chanX")); auto reg = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); auto snap = reg.snapshot(); @@ -279,7 +321,7 @@ TEST_F(RegistryTest, NodeAdvertiseThenSubscribeUpgradesToBoth) kickmsg::Node n("dual_node", KMSG_NAMESPACE); auto pub = n.advertise("dualtopic", cfg); auto sub = n.subscribe("dualtopic"); - track("/" + std::string{KMSG_NAMESPACE} + "_dualtopic"); + track(topic_shm("dualtopic")); auto reg = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); auto snap = reg.snapshot(); @@ -298,7 +340,7 @@ TEST_F(RegistryTest, MultipleNodesEachAppearOnce) kickmsg::Node pub("pub_a", KMSG_NAMESPACE); auto p = pub.advertise("shared", cfg); - track("/" + std::string{KMSG_NAMESPACE} + "_shared"); + track(topic_shm("shared")); kickmsg::Node s1("sub_a", KMSG_NAMESPACE); auto s1_h = s1.subscribe("shared"); @@ -333,7 +375,7 @@ TEST_F(RegistryTest, ListTopicsGroupsByShmName) kickmsg::Node pub("pub_a", KMSG_NAMESPACE); auto p = pub.advertise("telemetry", cfg); - track("/" + std::string{KMSG_NAMESPACE} + "_telemetry"); + track(topic_shm("telemetry")); kickmsg::Node s1("sub_a", KMSG_NAMESPACE); auto s1_h = s1.subscribe("telemetry"); @@ -345,7 +387,7 @@ TEST_F(RegistryTest, ListTopicsGroupsByShmName) ASSERT_EQ(topics.size(), 1u); auto const& t = topics[0]; - EXPECT_EQ(t.shm_name, std::string{"/"} + KMSG_NAMESPACE + "_telemetry"); + EXPECT_EQ(t.shm_name, topic_shm("telemetry")); EXPECT_EQ(t.channel_type, kickmsg::channel::PubSub); EXPECT_EQ(t.producers.size(), 1u); EXPECT_EQ(t.consumers.size(), 2u); @@ -364,7 +406,7 @@ TEST_F(RegistryTest, ListTopicsBroadcastRoleBothInEveryLane) kickmsg::Node node("bcast", KMSG_NAMESPACE); auto bh = node.join_broadcast("events", cfg); - track("/" + std::string{KMSG_NAMESPACE} + "_broadcast_events"); + track(broadcast_shm("events")); auto reg = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); auto topics = reg.list_topics(); @@ -386,11 +428,11 @@ TEST_F(RegistryTest, ListTopicsSortedByShmName) kickmsg::Node node("n", KMSG_NAMESPACE); auto pc = node.advertise("c_topic", cfg); - track("/" + std::string{KMSG_NAMESPACE} + "_c_topic"); + track(topic_shm("c_topic")); auto pa = node.advertise("a_topic", cfg); - track("/" + std::string{KMSG_NAMESPACE} + "_a_topic"); + track(topic_shm("a_topic")); auto pb = node.advertise("b_topic", cfg); - track("/" + std::string{KMSG_NAMESPACE} + "_b_topic"); + track(topic_shm("b_topic")); auto reg = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); auto topics = reg.list_topics(); diff --git a/tests/unit/subscriber-t.cc b/tests/unit/subscriber-t.cc index bb46de6..5da4545 100644 --- a/tests/unit/subscriber-t.cc +++ b/tests/unit/subscriber-t.cc @@ -4,7 +4,6 @@ #include "kickmsg/Publisher.h" #include "kickmsg/Subscriber.h" -#include #include using namespace kickmsg; @@ -45,6 +44,21 @@ TEST_F(SubscriberTest, ReceiveEmptyReturnsNullopt) EXPECT_FALSE(sub.try_receive_view().has_value()); } +TEST_F(SubscriberTest, MovedFromReceiveReturnsNulloptNotCrash) +{ + auto cfg = default_cfg(); + auto region = kickmsg::SharedRegion::create(SHM_NAME, kickmsg::channel::PubSub, cfg); + kickmsg::Subscriber src(region); + kickmsg::Subscriber dst(std::move(src)); + + // src is moved-from (ring_idx_ == UINT32_MAX). All four receive paths + // must return nullopt instead of dereferencing a wild ring pointer. + EXPECT_FALSE(src.try_receive().has_value()); + EXPECT_FALSE(src.try_receive_view().has_value()); + EXPECT_FALSE(src.receive(std::chrono::milliseconds{0}).has_value()); + EXPECT_FALSE(src.receive_view(std::chrono::milliseconds{0}).has_value()); +} + TEST_F(SubscriberTest, ZeroCopyReceive) { auto cfg = default_cfg(); diff --git a/tests/unit/types-t.cc b/tests/unit/types-t.cc index 0b92f89..489818e 100644 --- a/tests/unit/types-t.cc +++ b/tests/unit/types-t.cc @@ -1,6 +1,5 @@ #include -#include #include #include "kickmsg/types.h" From 35e6880121874fd1879ad6bb6bdaa71fd3cf7302 Mon Sep 17 00:00:00 2001 From: Philippe Leduc Date: Tue, 2 Jun 2026 18:20:06 +0200 Subject: [PATCH 3/3] disable stress test under the CI for now --- tests/CMakeLists.txt | 13 +++++++------ tests/stress/common.h | 17 ++++++++++++++++- tests/stress/main.cc | 16 ++++++++-------- tests/stress/mpmc.cc | 14 +++++++------- 4 files changed, 38 insertions(+), 22 deletions(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 1db3550..e5d935a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -28,12 +28,13 @@ target_link_libraries(kickmsg_stress_test PRIVATE kickmsg) set_target_properties(kickmsg_stress_test PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) -# Registered so ctest (and the TSAN job) actually exercise the lock-free -# contention paths -- the Treiber free-stack and pool exhaustion races only -# show up here, not in the unit suite. Generous timeout: TSAN scales the -# iteration counts up ~100x. -add_test(NAME stress COMMAND kickmsg_stress_test WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) -set_tests_properties(stress PROPERTIES TIMEOUT 900) +# Deliberately NOT registered with add_test. The suite oversubscribes (up to +# 32 threads) and the publisher spin-waits do not yield, so on the 2-4 core +# CI runners the spinners starve the very threads they wait on and the suite +# fails to finish within any sane timeout (observed: >900s vs ~10s on a +# 24-core host). Run it manually via scripts/validate.sh or tests/endurance.sh. +# TODO: re-register once the spin-wait backoff/yield lands (tracked: the +# busy-spin starvation under oversubscription is a real bug, not a test flake). # Crash test uses fork/waitpid/SIGKILL — POSIX only if(NOT WIN32) diff --git a/tests/stress/common.h b/tests/stress/common.h index f958644..c2ac8b0 100644 --- a/tests/stress/common.h +++ b/tests/stress/common.h @@ -352,8 +352,20 @@ struct TestRunner int pass = 0; int fail = 0; - void run(bool result) + // Takes the scenario as a callable (not a pre-computed bool) so we can + // print its name BEFORE running and time it. The pre-run flush means a + // scenario that hangs leaves its "[ RUN ]" line on screen -- the last + // one without a matching "[ OK ]" is the culprit. + template + void run(char const* name, Fn&& fn) { + std::printf("[ RUN ] %s\n", name); + std::fflush(stdout); + auto const start = kickmsg::monotonic_ns(); + bool const result = fn(); + auto const ms = duration_cast( + kickmsg::elapsed_time(start)).count(); + char const* tag = "[ OK ]"; if (result) { ++pass; @@ -361,7 +373,10 @@ struct TestRunner else { ++fail; + tag = "[ FAIL ]"; } + std::printf("%s %s (%lld ms)\n", tag, name, static_cast(ms)); + std::fflush(stdout); } int summary() diff --git a/tests/stress/main.cc b/tests/stress/main.cc index 9f4ccf2..5a0452b 100644 --- a/tests/stress/main.cc +++ b/tests/stress/main.cc @@ -24,15 +24,15 @@ int main() TestRunner runner; - runner.run(run_treiber_stress()); - runner.run(run_subscriber_churn()); - runner.run(run_gc_recovery()); - runner.run(run_fairness_test()); + runner.run("treiber_stress", run_treiber_stress); + runner.run("subscriber_churn", run_subscriber_churn); + runner.run("gc_recovery", run_gc_recovery); + runner.run("fairness", run_fairness_test); run_all_mpmc(runner); - runner.run(run_pool_exhaustion()); - runner.run(run_live_repair()); - runner.run(run_single_slot_ring()); - runner.run(run_subscriber_saturation()); + runner.run("pool_exhaustion", run_pool_exhaustion); + runner.run("live_repair", run_live_repair); + runner.run("single_slot_ring", run_single_slot_ring); + runner.run("subscriber_saturation", run_subscriber_saturation); return runner.summary(); } diff --git a/tests/stress/mpmc.cc b/tests/stress/mpmc.cc index a76c808..05d2701 100644 --- a/tests/stress/mpmc.cc +++ b/tests/stress/mpmc.cc @@ -165,7 +165,7 @@ void run_all_mpmc(TestRunner& runner) tc.pool_size = 256; tc.ring_capacity = 64; tc.max_subs = 8; - runner.run(run_stress_test(tc)); + runner.run("mpmc 2p/4s", [&]{ return run_stress_test(tc); }); } { @@ -176,7 +176,7 @@ void run_all_mpmc(TestRunner& runner) tc.pool_size = 128; tc.ring_capacity = 32; tc.max_subs = 16; - runner.run(run_stress_test(tc)); + runner.run("mpmc 8p/8s", [&]{ return run_stress_test(tc); }); } { @@ -187,7 +187,7 @@ void run_all_mpmc(TestRunner& runner) tc.pool_size = 64; tc.ring_capacity = 16; tc.max_subs = 2; - runner.run(run_stress_test(tc)); + runner.run("mpmc 1p/1s", [&]{ return run_stress_test(tc); }); } // High contention: many pubs, small pool, heavy overflow @@ -199,7 +199,7 @@ void run_all_mpmc(TestRunner& runner) tc.pool_size = 32; tc.ring_capacity = 8; tc.max_subs = 16; - runner.run(run_stress_test(tc)); + runner.run("mpmc 16p/16s hi-contention", [&]{ return run_stress_test(tc); }); } // Zero-copy receive tests -- exercises SampleView pin CAS, @@ -213,7 +213,7 @@ void run_all_mpmc(TestRunner& runner) tc.ring_capacity = 64; tc.max_subs = 8; tc.use_zerocopy = true; - runner.run(run_stress_test(tc)); + runner.run("mpmc 2p/4s zerocopy", [&]{ return run_stress_test(tc); }); } { @@ -225,7 +225,7 @@ void run_all_mpmc(TestRunner& runner) tc.ring_capacity = 32; tc.max_subs = 16; tc.use_zerocopy = true; - runner.run(run_stress_test(tc)); + runner.run("mpmc 8p/8s zerocopy", [&]{ return run_stress_test(tc); }); } // High contention zero-copy @@ -238,6 +238,6 @@ void run_all_mpmc(TestRunner& runner) tc.ring_capacity = 8; tc.max_subs = 16; tc.use_zerocopy = true; - runner.run(run_stress_test(tc)); + runner.run("mpmc 16p/16s zerocopy hi-contention", [&]{ return run_stress_test(tc); }); } }