From b54382fcb8a8af06cbebf02c74ac54f00faebdfd Mon Sep 17 00:00:00 2001 From: Philippe Leduc Date: Fri, 17 Apr 2026 15:54:19 +0200 Subject: [PATCH 1/2] Add cross-process drop/loss counters and SharedRegion::stats() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Expose per-ring `dropped_count` / `lost_count` atomics in SubRingHeader (placed inside existing write_pos cache-line padding — no sizeof change, no new coherency traffic). Publisher bumps dropped_count on the self- repair / drop path; Subscriber bumps lost_count at each loss site. New SharedRegion::stats() returns a RegionStats snapshot: per-ring state, in_flight, write_pos, dropped_count, lost_count plus aggregates (total_writes, total_drops, total_losses, live_rings, approximate pool_free). Safe under live traffic. Python bindings expose RingStats, RegionStats, and SharedRegion.stats() so external monitoring can read counters without a CLI. --- ARCHITECTURE.md | 26 ++++++++ include/kickmsg/Region.h | 44 +++++++++++++ include/kickmsg/types.h | 16 +++-- py_bindings/kickmsg.pyi | 46 +++++++++++++ py_bindings/src/kickmsg_py.cc | 49 +++++++++++++- src/Publisher.cc | 1 + src/Region.cc | 65 ++++++++++++++++++- src/Subscriber.cc | 16 ++++- tests/unit/region-t.cc | 117 ++++++++++++++++++++++++++++++++++ 9 files changed, 370 insertions(+), 10 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 2a62ff8..26f3152 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -264,6 +264,15 @@ the two new fields: one cache line for `schema_state` (a eight cache lines for `schema_data` (512 B). Pre-v4 binaries are rejected at `open()` by the existing version check. +Version bumped `4 → 5` for the cross-process runtime counters +added to `SubRingHeader` (`dropped_count`, `lost_count` — see the +Subscriber Ring section for their layout and semantics). The +fields fit inside the existing `write_pos` cache-line padding, so +`sizeof(SubRingHeader)` is unchanged, but the in-memory meaning of +the bytes following `has_waiter` changed — v4 and v5 cannot share +a region. Pre-v5 binaries are rejected at `open()` by the existing +version check. + ## Treiber Free Stack @@ -325,12 +334,29 @@ Ring[0] - **has_waiter** (atomic uint32): set by the subscriber before blocking on `futex_wait`, cleared after. Publishers skip the `futex_wake_all` syscall when no subscriber is sleeping. +- **dropped_count** (atomic uint64): cumulative count of publisher + delivery drops on this ring — incremented whenever the two-phase + commit CAS lock fails and the publisher falls through to the self- + repair / drop path. Cross-process visible; the per-Publisher + `dropped()` accessor reports the same events but counted per + instance. +- **lost_count** (atomic uint64): cumulative count of subscriber + losses on this ring — bumped on every `++lost_` site (drain-ahead + skip, stale-overwrite, invalid-slot, unpinnable, seqlock miss). + Cross-process visible; `Subscriber::lost()` reports the same events + per instance. - **Sequence number** is monotonically increasing (`pos + 1`), used as a seqlock for data consistency validation and as a commit barrier between publishers (see Publish Flow below). - Stale entries (sequence < subscriber's expected) are detected and reported as lost messages. +`write_pos`, `has_waiter`, `dropped_count`, `lost_count` all share one +cache line: the hot path already owns this line when incrementing +`write_pos`, so bumping the counters on a rare drop/loss adds no new +coherency traffic. Different rings target different lines (128 B +stride), so publishers/subscribers on distinct rings never contend. + ### Subscriber join and visibility window A subscriber joins by CAS-ing a Free ring to Live. The CAS expects diff --git a/include/kickmsg/Region.h b/include/kickmsg/Region.h index 72f3dea..c7d719a 100644 --- a/include/kickmsg/Region.h +++ b/include/kickmsg/Region.h @@ -3,12 +3,41 @@ #include #include +#include #include "kickmsg/types.h" #include "kickmsg/os/SharedMemory.h" namespace kickmsg { + /// Runtime snapshot of a single subscriber ring. + /// Values are relaxed/acquire-loaded, so the snapshot is internally + /// consistent per-ring but may race mildly across rings — fine for a + /// diagnostic view; not intended as a strongly-consistent read. + struct RingStats + { + uint32_t state; ///< ring::State as a raw int (0=Free, 1=Live, 2=Draining) + uint32_t in_flight; ///< Publishers currently admitted to this ring + uint64_t write_pos; ///< Monotonic claim counter (rough throughput proxy) + uint64_t dropped_count; ///< Cumulative publisher drops on this ring + uint64_t lost_count; ///< Cumulative subscriber losses on this ring + }; + + /// Aggregate region snapshot returned by SharedRegion::stats(). + /// Safe to call under live traffic: all reads are relaxed/acquire, + /// no writes. + struct RegionStats + { + std::vector rings; ///< One entry per subscriber-ring slot (length == max_subs) + uint64_t total_writes; ///< Sum of write_pos over Live rings + uint64_t total_drops; ///< Sum of dropped_count across all rings + uint64_t total_losses; ///< Sum of lost_count across all rings + uint64_t live_rings; ///< Number of rings currently Live + uint64_t pool_free; ///< Approximate free-slot count (walks Treiber stack — racy under churn) + uint64_t pool_size; ///< Total pool capacity (static) + }; + + class SharedRegion { public: @@ -134,6 +163,21 @@ namespace kickmsg /// Returns the number of rings reset. std::size_t reset_retired_rings(); + /// Runtime counter snapshot — safe under live traffic. + /// + /// Reads the cross-process per-ring counters (`write_pos`, + /// `dropped_count`, `lost_count`) plus ring state and an approximate + /// pool-free count. Intended for external monitoring and the CLI's + /// `stats` / `watch` subcommands. + /// + /// Cheap (no syscalls, no locks, a handful of atomic loads) but not a + /// strongly-consistent view: individual per-ring values are consistent + /// with themselves (sequential loads on one variable), but different + /// rings may be read at slightly different instants. The free-stack + /// walk for `pool_free` is bounded by `pool_size` so it can't loop + /// forever under racing pushes/pops. + RegionStats stats() const; + /// Reclaim orphaned slots (refcount > 0 but not referenced by any ring entry). /// These are caused by publisher crashes between allocate and publish, or by /// skipped drain on subscriber teardown timeout. diff --git a/include/kickmsg/types.h b/include/kickmsg/types.h index 59a3b18..e9933b4 100644 --- a/include/kickmsg/types.h +++ b/include/kickmsg/types.h @@ -20,7 +20,7 @@ namespace kickmsg "Kickmsg requires lock-free 32-bit atomics."); constexpr uint64_t MAGIC = 0x4B49434B4D534721ULL; // "KICKMSG!" - constexpr uint32_t VERSION = 4; + constexpr uint32_t VERSION = 5; constexpr uint32_t INVALID_SLOT = UINT32_MAX; constexpr uint64_t LOCKED_SEQUENCE = UINT64_MAX; constexpr std::size_t CACHE_LINE = 64; @@ -240,15 +240,23 @@ namespace kickmsg /// Per-subscriber ring header in shared memory. /// state_flight packs ring state and in_flight publisher count into one /// atomic, enabling single-CAS admission without cross-variable fences. - /// write_pos and has_waiter share a cache line: the publisher writes - /// write_pos then reads has_waiter, the subscriber reads write_pos then - /// writes has_waiter — both access the same line, one cache miss each. + /// write_pos, has_waiter, dropped_count, lost_count share a cache line: + /// the hot path already owns this line when incrementing write_pos, so + /// the extra fetch_add on a drop/loss path introduces no new cache- + /// coherency traffic. Writers on different rings target different + /// lines (128 B stride), so no cross-ring false sharing either. struct SubRingHeader { alignas(CACHE_LINE) std::atomic state_flight; ///< Packed [in_flight:30 | state:2] alignas(CACHE_LINE) std::atomic write_pos; ///< Monotonically increasing position counter std::atomic has_waiter; ///< Set by subscriber before futex_wait + std::atomic dropped_count; ///< Cumulative publisher drops on this ring (all publishers) + std::atomic lost_count; ///< Cumulative subscriber losses on this ring (all subscribers) }; + static_assert(sizeof(SubRingHeader) == 2 * CACHE_LINE, + "SubRingHeader must stay 2 cache lines — the counter fields fit in " + "the existing write_pos line padding; expanding this struct requires " + "reconsidering ring-stride math in Region.cc"); /// Slot header: prepended to each payload buffer in the pool. /// Packed to guarantee binary layout across compilers. diff --git a/py_bindings/kickmsg.pyi b/py_bindings/kickmsg.pyi index fd6d000..87a493b 100644 --- a/py_bindings/kickmsg.pyi +++ b/py_bindings/kickmsg.pyi @@ -118,6 +118,49 @@ class HealthReport: def __repr__(self) -> str: ... +class RingStats: + @property + def state(self) -> int: + """Ring state as raw int: 0=Free, 1=Live, 2=Draining.""" + + @property + def in_flight(self) -> int: ... + + @property + def write_pos(self) -> int: ... + + @property + def dropped_count(self) -> int: ... + + @property + def lost_count(self) -> int: ... + + def __repr__(self) -> str: ... + +class RegionStats: + @property + def rings(self) -> list[RingStats]: ... + + @property + def total_writes(self) -> int: ... + + @property + def total_drops(self) -> int: ... + + @property + def total_losses(self) -> int: ... + + @property + def live_rings(self) -> int: ... + + @property + def pool_free(self) -> int: ... + + @property + def pool_size(self) -> int: ... + + def __repr__(self) -> str: ... + class SharedRegion: @staticmethod def create(name: str, type: ChannelType, cfg: Config, creator: str = '') -> SharedRegion: ... @@ -142,6 +185,9 @@ class SharedRegion: def diagnose(self) -> HealthReport: ... + def stats(self) -> RegionStats: + """Runtime counter snapshot (per-ring + aggregate). Safe under live traffic.""" + def repair_locked_entries(self) -> int: ... def reset_retired_rings(self) -> int: ... diff --git a/py_bindings/src/kickmsg_py.cc b/py_bindings/src/kickmsg_py.cc index 877c2ca..6fc95ee 100644 --- a/py_bindings/src/kickmsg_py.cc +++ b/py_bindings/src/kickmsg_py.cc @@ -7,7 +7,8 @@ /// Config — channel::Config /// SchemaInfo — payload schema descriptor /// HealthReport — SharedRegion::diagnose() result -/// SharedRegion — factory methods + schema/health/repair +/// RingStats / RegionStats — SharedRegion::stats() result +/// SharedRegion — factory methods + schema/health/repair/stats /// Publisher — send(bytes) + allocate() → AllocatedSlot /// AllocatedSlot — writable zero-copy handle + .publish() /// Subscriber — try_receive / receive (GIL release) / *_view @@ -64,6 +65,7 @@ #include #include #include +#include #include "kickmsg/Node.h" #include "kickmsg/Publisher.h" @@ -339,6 +341,48 @@ namespace kickmsg ", schema_stuck=" + (r.schema_stuck ? "True" : "False") + ")"; }); + // ------------------------------------------------------------------- + // RingStats / RegionStats — runtime counter snapshot via stats() + // ------------------------------------------------------------------- + + nb::class_(m, "RingStats") + .def_ro("state", &RingStats::state) + .def_ro("in_flight", &RingStats::in_flight) + .def_ro("write_pos", &RingStats::write_pos) + .def_ro("dropped_count", &RingStats::dropped_count) + .def_ro("lost_count", &RingStats::lost_count) + .def("__repr__", [](RingStats const& r) + { + char const* state_name = + r.state == ring::Free ? "Free" : + r.state == ring::Live ? "Live" : + r.state == ring::Draining ? "Draining" : "?"; + return std::string{"RingStats(state="} + state_name + + ", in_flight=" + std::to_string(r.in_flight) + + ", write_pos=" + std::to_string(r.write_pos) + + ", dropped=" + std::to_string(r.dropped_count) + + ", lost=" + std::to_string(r.lost_count) + ")"; + }); + + nb::class_(m, "RegionStats") + .def_ro("rings", &RegionStats::rings) + .def_ro("total_writes", &RegionStats::total_writes) + .def_ro("total_drops", &RegionStats::total_drops) + .def_ro("total_losses", &RegionStats::total_losses) + .def_ro("live_rings", &RegionStats::live_rings) + .def_ro("pool_free", &RegionStats::pool_free) + .def_ro("pool_size", &RegionStats::pool_size) + .def("__repr__", [](RegionStats const& s) + { + return std::string{"RegionStats(live_rings="} + + std::to_string(s.live_rings) + + ", total_writes=" + std::to_string(s.total_writes) + + ", total_drops=" + std::to_string(s.total_drops) + + ", total_losses=" + std::to_string(s.total_losses) + + ", pool_free=" + std::to_string(s.pool_free) + + "/" + std::to_string(s.pool_size) + ")"; + }); + // ------------------------------------------------------------------- // SharedRegion // ------------------------------------------------------------------- @@ -364,6 +408,9 @@ namespace kickmsg .def("try_claim_schema", &SharedRegion::try_claim_schema, "info"_a) .def("reset_schema_claim", &SharedRegion::reset_schema_claim) .def("diagnose", &SharedRegion::diagnose) + .def("stats", &SharedRegion::stats, + "Runtime counter snapshot (per-ring + aggregate). " + "Safe under live traffic.") .def("repair_locked_entries", &SharedRegion::repair_locked_entries) .def("reset_retired_rings", &SharedRegion::reset_retired_rings) .def("reclaim_orphaned_slots",&SharedRegion::reclaim_orphaned_slots) diff --git a/src/Publisher.cc b/src/Publisher.cc index 19f1102..abf44ce 100644 --- a/src/Publisher.cc +++ b/src/Publisher.cc @@ -185,6 +185,7 @@ namespace kickmsg } ++dropped_; + ring->dropped_count.fetch_add(1, std::memory_order_relaxed); ++excess; ring->state_flight.fetch_sub(ring::IN_FLIGHT_ONE, std::memory_order_release); diff --git a/src/Region.cc b/src/Region.cc index f8afb02..96a9a31 100644 --- a/src/Region.cc +++ b/src/Region.cc @@ -120,9 +120,11 @@ namespace kickmsg for (uint32_t i = 0; i < cfg.max_subscribers; ++i) { auto* ring = sub_ring_at(base(), h, i); - ring->state_flight = ring::make_packed(ring::Free); - ring->write_pos = 0; - ring->has_waiter = 0; + ring->state_flight = ring::make_packed(ring::Free); + ring->write_pos = 0; + ring->has_waiter = 0; + ring->dropped_count = 0; + ring->lost_count = 0; } // Write magic LAST with release: create_or_open() polls magic with @@ -449,6 +451,63 @@ namespace kickmsg std::memory_order_relaxed); } + RegionStats SharedRegion::stats() const + { + auto const* b = base(); + auto const* h = header(); + + RegionStats out{}; + out.pool_size = h->pool_size; + out.rings.reserve(h->max_subs); + + for (uint64_t i = 0; i < h->max_subs; ++i) + { + // sub_ring_at needs a non-const base*/header*, but the operation + // is read-only — const_cast is safe here. + auto* ring = sub_ring_at(const_cast(b), + h, static_cast(i)); + uint32_t packed = ring->state_flight.load(std::memory_order_acquire); + + RingStats rs{}; + rs.state = static_cast(ring::get_state(packed)); + rs.in_flight = ring::get_in_flight(packed); + rs.write_pos = ring->write_pos.load(std::memory_order_acquire); + rs.dropped_count = ring->dropped_count.load(std::memory_order_relaxed); + rs.lost_count = ring->lost_count.load(std::memory_order_relaxed); + + if (rs.state == ring::Live) + { + ++out.live_rings; + out.total_writes += rs.write_pos; + } + out.total_drops += rs.dropped_count; + out.total_losses += rs.lost_count; + + out.rings.push_back(rs); + } + + // Approximate free-slot count: walk the Treiber stack from the head, + // bounded by pool_size so a concurrent push/pop storm can't fool us + // into an unbounded loop. Under churn we can undercount (a slot + // being popped mid-walk) or overcount (a slot's next_free pointing + // to a just-pushed node we've already counted) — acceptable for a + // diagnostic view. + uint64_t top = h->free_top.load(std::memory_order_acquire); + uint32_t idx = tagged_idx(top); + uint64_t count = 0; + uint64_t const limit = h->pool_size; + while (idx != INVALID_SLOT and count <= limit) + { + if (idx >= h->pool_size) break; + auto* slot = slot_at(const_cast(b), h, idx); + idx = slot->next_free.load(std::memory_order_relaxed); + ++count; + } + out.pool_free = count; + + return out; + } + std::size_t SharedRegion::reclaim_orphaned_slots() { auto* b = base(); diff --git a/src/Subscriber.cc b/src/Subscriber.cc index 076b3d2..e47f37f 100644 --- a/src/Subscriber.cc +++ b/src/Subscriber.cc @@ -168,7 +168,9 @@ namespace kickmsg uint64_t capacity = header_->sub_ring_capacity; if (wp - read_pos_ > capacity) { - lost_ += (wp - read_pos_) - capacity; + uint64_t skipped = (wp - read_pos_) - capacity; + lost_ += skipped; + ring->lost_count.fetch_add(skipped, std::memory_order_relaxed); read_pos_ = wp - capacity; } @@ -189,6 +191,7 @@ namespace kickmsg } // Entry was overwritten (seq > expected): advance and retry. ++lost_; + ring->lost_count.fetch_add(1, std::memory_order_relaxed); ++read_pos_; continue; } @@ -199,6 +202,7 @@ namespace kickmsg if (slot_idx >= header_->pool_size or payload_len > header_->slot_data_size) { ++lost_; + ring->lost_count.fetch_add(1, std::memory_order_relaxed); ++read_pos_; continue; } @@ -224,6 +228,7 @@ namespace kickmsg { // refcount == 0: slot already freed, count as lost. ++lost_; + ring->lost_count.fetch_add(1, std::memory_order_relaxed); ++read_pos_; continue; } @@ -240,6 +245,7 @@ namespace kickmsg treiber_push(header_->free_top, slot, slot_idx); } ++lost_; + ring->lost_count.fetch_add(1, std::memory_order_relaxed); ++read_pos_; continue; } @@ -308,7 +314,9 @@ namespace kickmsg uint64_t capacity = header_->sub_ring_capacity; if (wp - read_pos_ > capacity) { - lost_ += (wp - read_pos_) - capacity; + uint64_t skipped = (wp - read_pos_) - capacity; + lost_ += skipped; + ring->lost_count.fetch_add(skipped, std::memory_order_relaxed); read_pos_ = wp - capacity; } @@ -324,6 +332,7 @@ namespace kickmsg return std::nullopt; } ++lost_; + ring->lost_count.fetch_add(1, std::memory_order_relaxed); ++read_pos_; continue; } @@ -334,6 +343,7 @@ namespace kickmsg if (slot_idx >= header_->pool_size or payload_len > header_->slot_data_size) { ++lost_; + ring->lost_count.fetch_add(1, std::memory_order_relaxed); ++read_pos_; continue; } @@ -355,6 +365,7 @@ namespace kickmsg if (not pinned) { ++lost_; + ring->lost_count.fetch_add(1, std::memory_order_relaxed); ++read_pos_; continue; } @@ -370,6 +381,7 @@ namespace kickmsg treiber_push(header_->free_top, slot, slot_idx); } ++lost_; + ring->lost_count.fetch_add(1, std::memory_order_relaxed); ++read_pos_; continue; } diff --git a/tests/unit/region-t.cc b/tests/unit/region-t.cc index 4415f3e..392ed04 100644 --- a/tests/unit/region-t.cc +++ b/tests/unit/region-t.cc @@ -918,3 +918,120 @@ TEST_F(RegionTest, SchemaDoesNotAffectConfigHash) ASSERT_TRUE(got.has_value()); EXPECT_STREQ(got->name, "creator/Type"); } + +// ----------------------------------------------------------------------------- +// stats() — cross-process counter snapshot +// ----------------------------------------------------------------------------- + +TEST_F(RegionTest, StatsOnFreshRegionReportsZeros) +{ + auto cfg = default_cfg(); + auto region = kickmsg::SharedRegion::create( + SHM_NAME, kickmsg::channel::PubSub, cfg, "stats"); + auto s = region.stats(); + + EXPECT_EQ(s.rings.size(), cfg.max_subscribers); + EXPECT_EQ(s.live_rings, 0u); + EXPECT_EQ(s.total_writes, 0u); + EXPECT_EQ(s.total_drops, 0u); + EXPECT_EQ(s.total_losses, 0u); + EXPECT_EQ(s.pool_size, cfg.pool_size); + // Fresh region: every slot is on the free stack. + EXPECT_EQ(s.pool_free, cfg.pool_size); + + for (auto const& r : s.rings) + { + EXPECT_EQ(r.state, kickmsg::ring::Free); + EXPECT_EQ(r.in_flight, 0u); + EXPECT_EQ(r.write_pos, 0u); + EXPECT_EQ(r.dropped_count, 0u); + EXPECT_EQ(r.lost_count, 0u); + } +} + +TEST_F(RegionTest, StatsWritePosAdvancesWithPublishes) +{ + auto cfg = default_cfg(); + auto region = kickmsg::SharedRegion::create( + SHM_NAME, kickmsg::channel::PubSub, cfg, "stats"); + + kickmsg::Subscriber sub(region); + kickmsg::Publisher pub(region); + + constexpr int N = 5; + uint32_t payload = 0xC0FFEE; + for (int i = 0; i < N; ++i) + { + ASSERT_GE(pub.send(&payload, sizeof(payload)), 0); + } + + auto s = region.stats(); + EXPECT_EQ(s.live_rings, 1u); + EXPECT_EQ(s.total_writes, static_cast(N)); + + // Exactly one ring should be Live and carry write_pos == N. + std::size_t live_seen = 0; + for (auto const& r : s.rings) + { + if (r.state == kickmsg::ring::Live) + { + ++live_seen; + EXPECT_EQ(r.write_pos, static_cast(N)); + } + } + EXPECT_EQ(live_seen, 1u); +} + +TEST_F(RegionTest, StatsLostCountMatchesSubscriberLostOnOverflow) +{ + auto cfg = default_cfg(); // sub_ring_capacity = 8 + auto region = kickmsg::SharedRegion::create( + SHM_NAME, kickmsg::channel::PubSub, cfg, "stats"); + + kickmsg::Subscriber sub(region); + kickmsg::Publisher pub(region); + + // Publish more than the ring can hold without draining — forces the + // subscriber's drain-ahead path to bump lost_count on its next read. + uint32_t payload = 0; + std::size_t const to_publish = cfg.sub_ring_capacity * 3; + for (std::size_t i = 0; i < to_publish; ++i) + { + payload = static_cast(i); + ASSERT_GE(pub.send(&payload, sizeof(payload)), 0); + } + + // Drive the subscriber: the first try_receive hits the drain-ahead + // branch and jumps read_pos forward, recording the skipped count. + while (sub.try_receive()) { /* drain */ } + + EXPECT_GT(sub.lost(), 0u); + + auto s = region.stats(); + // Exactly one ring is Live — its lost_count equals the subscriber's. + uint64_t ring_lost = 0; + for (auto const& r : s.rings) + { + ring_lost += r.lost_count; + } + EXPECT_EQ(ring_lost, sub.lost()); + EXPECT_EQ(s.total_losses, sub.lost()); +} + +TEST_F(RegionTest, StatsPoolFreeTracksAllocations) +{ + auto cfg = default_cfg(); + auto region = kickmsg::SharedRegion::create( + SHM_NAME, kickmsg::channel::PubSub, cfg, "stats"); + + kickmsg::Subscriber sub(region); + kickmsg::Publisher pub(region); + + // Hold a slot mid-publish (allocate without publish). + auto* ptr = pub.allocate(8); + ASSERT_NE(ptr, nullptr); + + auto s = region.stats(); + // One slot is popped from the free stack and not yet returned. + EXPECT_EQ(s.pool_free, cfg.pool_size - 1); +} From d1696f877512419d86aca4be0844989575d0d4cd Mon Sep 17 00:00:00 2001 From: Philippe Leduc Date: Fri, 17 Apr 2026 16:17:34 +0200 Subject: [PATCH 2/2] Add registry to record SHM locs --- ARCHITECTURE.md | 83 ++++++++++ CMakeLists.txt | 33 ++-- benchmarks/bench.cc | 2 +- examples/hello_pubsub.cc | 2 +- include/kickmsg/Hash.h | 19 +++ include/kickmsg/Node.h | 30 +++- include/kickmsg/Registry.h | 134 ++++++++++++++++ include/kickmsg/os/Process.h | 17 +++ include/kickmsg/os/Time.h | 3 + py_bindings/kickmsg.pyi | 55 ++++++- py_bindings/src/kickmsg_py.cc | 61 +++++++- src/Node.cc | 89 ++++++++++- src/Region.cc | 9 +- src/Registry.cc | 246 ++++++++++++++++++++++++++++++ src/Subscriber.cc | 2 +- src/os/darwin/SharedMemory.cc | 166 +------------------- src/os/darwin/Time.cc | 18 +-- src/os/linux/SharedMemory.cc | 144 +----------------- src/os/linux/Time.cc | 17 +-- src/os/posix/Process.cc | 28 ++++ src/os/posix/SharedMemory.cc | 155 +++++++++++++++++++ src/os/posix/Time.cc | 32 ++++ src/os/windows/Process.cc | 33 ++++ src/os/windows/Time.cc | 7 +- src/types.cc | 23 ++- tests/CMakeLists.txt | 1 + tests/crash_test.cc | 4 +- tests/stress/churn.cc | 2 +- tests/stress/edge_cases.cc | 2 +- tests/stress/live_repair.cc | 2 +- tests/stress/pool_exhaustion.cc | 2 +- tests/stress/treiber.cc | 2 +- tests/unit/node-t.cc | 10 +- tests/unit/region-t.cc | 10 +- tests/unit/registry-t.cc | 261 ++++++++++++++++++++++++++++++++ tests/unit/subscriber-t.cc | 2 +- 36 files changed, 1304 insertions(+), 402 deletions(-) create mode 100644 include/kickmsg/Registry.h create mode 100644 include/kickmsg/os/Process.h create mode 100644 src/Registry.cc create mode 100644 src/os/posix/Process.cc create mode 100644 src/os/posix/SharedMemory.cc create mode 100644 src/os/posix/Time.cc create mode 100644 src/os/windows/Process.cc create mode 100644 tests/unit/registry-t.cc diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 26f3152..a01a16d 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -1227,6 +1227,89 @@ Mailbox paths include the owner's node name because they are personal reply channels -- the sender must know who to reply to. +## Registry & Discovery + +Kickmsg's core IPC path is decentralized: no broker, no daemon, no +central registry. Channels are plain shared-memory regions that +producers and consumers find by name. That works well until an +operator asks *"what's running on this box?"* — which is the job of +the **participant registry**. + +### What it is + +A single dedicated shared-memory region per prefix, at +`/{prefix}_registry`, holding a fixed-size array of participant +entries. Each entry records one `(Node, channel, role)` membership: + +- `pid` — OS process id of the owner +- `node_name` — the `Node` that registered +- `shm_name` — the channel region this participant is attached to +- `channel_type` — PubSub / Broadcast +- `role` — Publisher / Subscriber / Both (broadcast) +- `created_at_ns` — registration timestamp +- `state` — atomic `Free` / `Claiming` / `Active` + +A `Node` lazily opens-or-creates the registry on its first +`advertise` / `subscribe` / `join_broadcast` / `create_mailbox` / +`open_mailbox` call, scans for the first `Free` slot, CAS-claims it +(`Free → Claiming`), writes the fields, then release-stores +`Active`. The `Node`'s destructor deregisters every slot it claimed. + +The key property is **cross-platform parity**: Linux `/dev/shm` is +filesystem-visible, but macOS and Windows are not — we can't use +`readdir` to list topics there. Routing discovery through a regular +`SharedMemory` region means one code path, identical behaviour on +all three targets. + +### State machine + +``` +Free (0) ── CAS ──► Claiming (1) ── release-store ──► Active (2) + ▲ │ + │ │ + └────────── deregister: store-release ◄──────────────┘ + (or sweep_stale: CAS(Active → Free)) +``` + +Snapshots acquire-load `state` per slot; only `Active` entries are +returned. The `Claiming` state is the publication fence for the +field bytes — a reader observing `Active` is guaranteed to see all +the field writes that happened-before the release-store. + +### Role upgrade + +A `Node` that both advertises and subscribes to the same topic is +recorded as a single entry with `role = Both` rather than two +separate entries. `Node::touch_registry` detects the existing slot +and upgrades via deregister + re-register. Upgrades happen at +connect time only — zero hot-path cost. + +### Liveness + +The registry does not track heartbeats. A crashed process that +never ran its `Node` destructor leaves its entries stuck at +`Active` until reclaimed. Two recovery paths: + +- **Query-time filter**: diagnostic tools probe each entry's pid via + `process_exists()` and hide dead entries from the user without + touching the registry. Non-invasive; safe under live traffic. +- **`Registry::sweep_stale()`**: iterates active entries and + CAS-resets any slot whose owner pid is gone. Opt-in cleanup for + an operator or supervisor sweep; not automatic. + +Split deliberately so a concurrent read never rewrites the registry +under another process's feet. + +### Sizing + +Default capacity is 1024 slots × 256 B = 256 KB per prefix. A robot +telemetry Node typically holds a few hundred topics; the registry is +sized for an order of magnitude of headroom above that. Exhaustion +is non-fatal: `register_participant` returns `INVALID_SLOT` and the +Node continues to work without a discovery row — registration is a +diagnostic nicety, not a correctness dependency. + + ## Design Tradeoffs ### Silent data loss on slow subscribers diff --git a/CMakeLists.txt b/CMakeLists.txt index 5176465..f6ef559 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,22 +23,32 @@ if(WIN32) src/os/windows/Time.cc src/os/windows/Futex.cc src/os/windows/SharedMemory.cc + src/os/windows/Process.cc ) set(OS_LIBRARIES synchronization) -elseif(APPLE) - set(OS_SOURCES - src/os/darwin/Time.cc - src/os/darwin/Futex.cc - src/os/darwin/SharedMemory.cc - ) - set(OS_LIBRARIES pthread) else() + # Linux and macOS share most of the POSIX OS layer; only sleep(), + # create(), and the futex backend diverge. set(OS_SOURCES - src/os/linux/Time.cc - src/os/linux/Futex.cc - src/os/linux/SharedMemory.cc + src/os/posix/Time.cc + src/os/posix/SharedMemory.cc + src/os/posix/Process.cc ) - set(OS_LIBRARIES rt pthread) + if(APPLE) + list(APPEND OS_SOURCES + src/os/darwin/Time.cc + src/os/darwin/SharedMemory.cc + src/os/darwin/Futex.cc + ) + set(OS_LIBRARIES pthread) + else() + list(APPEND OS_SOURCES + src/os/linux/Time.cc + src/os/linux/SharedMemory.cc + src/os/linux/Futex.cc + ) + set(OS_LIBRARIES rt pthread) + endif() endif() # --- Library --- @@ -49,6 +59,7 @@ add_library(kickmsg STATIC src/Publisher.cc src/Subscriber.cc src/Region.cc + src/Registry.cc src/Node.cc ${OS_SOURCES} ) diff --git a/benchmarks/bench.cc b/benchmarks/bench.cc index 1ed48e9..3c242b3 100644 --- a/benchmarks/bench.cc +++ b/benchmarks/bench.cc @@ -139,7 +139,7 @@ static void run_latency(BenchConfig const& bc, bool zerocopy) while (pub.send(payload.data(), payload.size()) < 0) { - kickmsg::sleep(0ns); + kickmsg::yield(); } if (zerocopy) diff --git a/examples/hello_pubsub.cc b/examples/hello_pubsub.cc index 8fbd13b..361810d 100644 --- a/examples/hello_pubsub.cc +++ b/examples/hello_pubsub.cc @@ -6,7 +6,7 @@ /// - "display" subscribes to "temperature" and prints them /// /// Single-process for simplicity; in production, each node lives in -/// its own process sharing the same prefix. +/// its own process sharing the same namespace. #include #include diff --git a/include/kickmsg/Hash.h b/include/kickmsg/Hash.h index ed2be79..ed8edfc 100644 --- a/include/kickmsg/Hash.h +++ b/include/kickmsg/Hash.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace kickmsg { @@ -39,6 +40,24 @@ namespace kickmsg /// descriptor-string hashing is by far the most common use. uint64_t fnv1a_64(std::string_view s) noexcept; + /// 64-bit FNV-1a of a trivially-copyable scalar or POD. Lets + /// callers chain fields without spelling out `&v, sizeof(v)`: + /// h = fnv1a_64(cfg.max_subscribers, h); + /// h = fnv1a_64(cfg.pool_size, h); + /// Pointers, nullptr, and string_view are excluded so the + /// dedicated overloads keep their priority. + template + auto fnv1a_64(T const& value, uint64_t seed = FNV1A_64_OFFSET_BASIS) noexcept + -> std::enable_if_t< + std::is_trivially_copyable_v + and not std::is_pointer_v + and not std::is_null_pointer_v + and not std::is_same_v, + uint64_t> + { + return fnv1a_64(&value, sizeof(T), seed); + } + /// Convenience: pack a 64-bit FNV-1a of `descriptor` into the /// leading eight bytes of a 64-byte identity slot, zero-padding /// the remaining 56 bytes. Intended as a drop-in for filling diff --git a/include/kickmsg/Node.h b/include/kickmsg/Node.h index c12e86d..f7ceac0 100644 --- a/include/kickmsg/Node.h +++ b/include/kickmsg/Node.h @@ -8,6 +8,7 @@ #include "kickmsg/Region.h" #include "kickmsg/Publisher.h" #include "kickmsg/Subscriber.h" +#include "kickmsg/Registry.h" namespace kickmsg { @@ -28,15 +29,19 @@ namespace kickmsg class Node { public: - // Name components (node name, namespace/prefix, topic, channel, + // Name components (node name, namespace, topic, channel, // owner, tag) are sanitized into a POSIX-shm-compatible form: // leading '/' is stripped, interior '/' becomes '.', and any char // outside [A-Za-z0-9._-] becomes '_'. This lets callers pass // ROS-style paths like "/robot/arm/joint1" directly — the region - // ends up at "/_robot.arm.joint1" in /dev/shm, still + // ends up at "/_robot.arm.joint1" in /dev/shm, still // human-readable (no hashing). A component that sanitizes to the // empty string throws std::invalid_argument. - Node(std::string const& name, std::string const& prefix = "kickmsg"); + Node(std::string const& name, std::string const& kmsg_namespace = "kickmsg"); + + /// Deregisters every participant entry this Node holds in the + /// namespace's registry. + ~Node(); // Explicit non-copyable / move-only. Node already holds SharedRegion // values (move-only), so it's non-copyable de facto; declaring it @@ -123,8 +128,8 @@ namespace kickmsg /// else got there first (read back with topic_schema()). bool try_claim_topic_schema(char const* topic, SchemaInfo const& info); - std::string const& name() const { return name_; } - std::string const& prefix() const { return prefix_; } + std::string const& name() const { return name_; } + std::string const& kmsg_namespace() const { return namespace_; } private: std::string make_topic_name(char const* topic) const; @@ -137,8 +142,17 @@ namespace kickmsg SharedRegion* find_region(std::string const& shm_name); SharedRegion const* find_region(std::string const& shm_name) const; + Registry& lazy_registry(); + + /// Register `shm_name` with `role`, or upgrade the existing entry + /// to `Both` if this Node already has one with the complementary + /// role. Registry failures are logged and swallowed. + void touch_registry(std::string const& shm_name, + channel::Type channel_type, + registry::Role role); + std::string name_; - std::string prefix_; + std::string namespace_; // Keyed by SHM name for O(1) lookup. A telemetry node on a // humanoid robot can easily hold 100-300 topics (joints × (meas, // target) + cameras + IMUs + force sensors + hands), so O(N) @@ -148,6 +162,10 @@ namespace kickmsg // guarantees reference stability for elements (the mmap addresses // used by Publisher/Subscriber don't move on rehash). std::unordered_map regions_; + + struct RegistrySlot { uint32_t slot_index; registry::Role role; }; + std::unordered_map registry_slots_; + std::optional registry_; }; } diff --git a/include/kickmsg/Registry.h b/include/kickmsg/Registry.h new file mode 100644 index 0000000..09e6085 --- /dev/null +++ b/include/kickmsg/Registry.h @@ -0,0 +1,134 @@ +#ifndef KICKMSG_REGISTRY_H +#define KICKMSG_REGISTRY_H + +#include +#include +#include +#include + +#include "kickmsg/types.h" +#include "kickmsg/os/SharedMemory.h" + +namespace kickmsg +{ + namespace registry + { + constexpr uint32_t VERSION = 1; + constexpr uint64_t MAGIC = 0x214745524B43494BULL; // "KICKREG!" + constexpr uint32_t DEFAULT_CAPACITY = 1024; + constexpr std::size_t SHM_NAME_MAX = 128; + constexpr std::size_t NODE_NAME_MAX = 64; + + enum Role : uint32_t + { + Publisher = 1, + Subscriber = 2, + Both = 3, ///< Node is both producer and consumer on this channel + }; + + /// Per-slot state. Claiming is the window between CAS-claim and + /// the release-store of field bytes — readers skip it. + enum SlotState : uint32_t + { + Free = 0, + Claiming = 1, + Active = 2, + }; + } + + /// In-SHM entry, 256 B. Readers go through `Registry::snapshot()` + /// because `state` is a std::atomic and this type is therefore not + /// copyable. Do not reorder fields without bumping `registry::VERSION`. + struct ParticipantEntry + { + std::atomic state; + uint32_t channel_type; + uint32_t role; + uint32_t _padding1; + uint64_t pid; + uint64_t created_at_ns; + char shm_name[registry::SHM_NAME_MAX]; + char node_name[registry::NODE_NAME_MAX]; + uint8_t _padding2[32]; + }; + static_assert(sizeof(ParticipantEntry) == 256, + "ParticipantEntry layout is part of the registry ABI"); + + /// Plain copyable snapshot of one participant. + struct Participant + { + uint64_t pid; + uint64_t created_at_ns; + uint32_t channel_type; + uint32_t role; + std::string shm_name; + std::string node_name; + }; + + struct RegistryHeader + { + std::atomic magic; + uint32_t version; + uint32_t capacity; + uint8_t _padding[48]; + }; + static_assert(sizeof(RegistryHeader) == CACHE_LINE, + "RegistryHeader must be exactly one cache line"); + + /// Shared-memory participant registry. One per namespace at + /// `/{namespace}_registry`. Persists beyond any single process; + /// remove with `unlink()`. + class Registry + { + public: + Registry() = default; + ~Registry() = default; + Registry(Registry const&) = delete; + Registry& operator=(Registry const&) = delete; + Registry(Registry&&) noexcept = default; + Registry& operator=(Registry&&) noexcept = default; + + /// `capacity` is only used on the create branch; an existing + /// registry keeps its creator's capacity. + static Registry open_or_create(std::string const& kmsg_namespace, + uint32_t capacity = registry::DEFAULT_CAPACITY); + + static void unlink(std::string const& kmsg_namespace); + + /// Returns the claimed slot index, or `INVALID_SLOT` if the + /// registry is full. + uint32_t register_participant(std::string const& shm_name, + channel::Type channel_type, + registry::Role role, + std::string const& node_name); + + /// Idempotent — `INVALID_SLOT` or already-Free slots are no-ops. + void deregister(uint32_t slot_index); + + /// Copy of all `Active` entries. Does not filter by process + /// liveness — callers use `process_exists()` if they need that. + std::vector snapshot() const; + + /// CAS-resets `Active` slots whose `pid` no longer exists. + /// Returns the number of slots freed. + uint32_t sweep_stale(); + + std::string const& name() const { return name_; } + uint32_t capacity() const; + + private: + static std::size_t region_size(uint32_t capacity); + static std::string make_shm_name(std::string const& kmsg_namespace); + void init_as_creator(uint32_t capacity); + + RegistryHeader* header(); + RegistryHeader const* header() const; + ParticipantEntry* entries(); + ParticipantEntry const* entries() const; + + SharedMemory shm_; + std::string name_; + }; +} + +#endif diff --git a/include/kickmsg/os/Process.h b/include/kickmsg/os/Process.h new file mode 100644 index 0000000..daa0391 --- /dev/null +++ b/include/kickmsg/os/Process.h @@ -0,0 +1,17 @@ +#ifndef KICKMSG_OS_PROCESS_H +#define KICKMSG_OS_PROCESS_H + +#include + +namespace kickmsg +{ + /// PID of the current process. + uint64_t current_pid() noexcept; + + /// Return true if a process with \p pid currently exists on this host. + /// Inherently racy: the process may exit between the probe and any + /// action taken on the result. + bool process_exists(uint64_t pid) noexcept; +} + +#endif diff --git a/include/kickmsg/os/Time.h b/include/kickmsg/os/Time.h index 9cdec5c..cd68a3d 100644 --- a/include/kickmsg/os/Time.h +++ b/include/kickmsg/os/Time.h @@ -9,6 +9,9 @@ namespace kickmsg void sleep(nanoseconds ns); + /// Release the current timeslice back to the scheduler. + void yield(); + nanoseconds since_epoch(); nanoseconds elapsed_time(nanoseconds start); diff --git a/py_bindings/kickmsg.pyi b/py_bindings/kickmsg.pyi index 87a493b..b736372 100644 --- a/py_bindings/kickmsg.pyi +++ b/py_bindings/kickmsg.pyi @@ -201,6 +201,57 @@ class SharedRegion: def unlink_shm(name: str) -> None: """Unlink a shared-memory entry by name (no-op if absent).""" +class Role(enum.Enum): + Publisher = 1 + + Subscriber = 2 + + Both = 3 + +class Participant: + @property + def pid(self) -> int: ... + + @property + def created_at_ns(self) -> int: ... + + @property + def channel_type(self) -> int: ... + + @property + def role(self) -> int: ... + + @property + def shm_name(self) -> str: ... + + @property + def node_name(self) -> str: ... + + def __repr__(self) -> str: ... + +class Registry: + @staticmethod + def open_or_create(namespace: str, capacity: int = ...) -> Registry: + """Open the registry SHM for `namespace`, creating it if absent.""" + + @staticmethod + def unlink(namespace: str) -> None: + """Remove the registry SHM for `namespace` from the filesystem.""" + + def snapshot(self) -> list[Participant]: + """Copy all currently Active participant entries. Does not filter by process liveness.""" + + def sweep_stale(self) -> int: + """Reclaim slots owned by processes that no longer exist. Returns the number of slots freed.""" + + @property + def name(self) -> str: ... + + @property + def capacity(self) -> int: ... + + def __repr__(self) -> str: ... + class SampleView: def __buffer__(self, flags, /): """ @@ -324,7 +375,7 @@ class BroadcastHandle: def __repr__(self) -> str: ... class Node: - def __init__(self, name: str, prefix: str = 'kickmsg') -> None: ... + def __init__(self, name: str, namespace: str = 'kickmsg') -> None: ... def advertise(self, topic: str, cfg: Config = ...) -> Publisher: ... @@ -354,6 +405,6 @@ class Node: def name(self) -> str: ... @property - def prefix(self) -> str: ... + def namespace(self) -> str: ... def __repr__(self) -> str: ... diff --git a/py_bindings/src/kickmsg_py.cc b/py_bindings/src/kickmsg_py.cc index 6fc95ee..8dd1562 100644 --- a/py_bindings/src/kickmsg_py.cc +++ b/py_bindings/src/kickmsg_py.cc @@ -14,6 +14,9 @@ /// Subscriber — try_receive / receive (GIL release) / *_view /// SampleView — read-only zero-copy sample (buffer protocol) /// BroadcastHandle — NamedTuple-like (pub, sub) +/// Role — registry::Role enum (Publisher/Subscriber/Both) +/// Participant — registry snapshot entry +/// Registry — per-namespace participant discovery /// Node — high-level topic / broadcast / mailbox /// schema (submodule) /// Diff — enum (bitmask) @@ -70,6 +73,7 @@ #include "kickmsg/Node.h" #include "kickmsg/Publisher.h" #include "kickmsg/Region.h" +#include "kickmsg/Registry.h" #include "kickmsg/Subscriber.h" #include "kickmsg/Hash.h" #include "kickmsg/types.h" @@ -426,6 +430,55 @@ namespace kickmsg m.def("unlink_shm", [](std::string const& name) { SharedMemory::unlink(name); }, "name"_a, "Unlink a shared-memory entry by name (no-op if absent)."); + // ------------------------------------------------------------------- + // Registry — per-namespace participant directory + // ------------------------------------------------------------------- + + nb::enum_(m, "Role") + .value("Publisher", registry::Publisher) + .value("Subscriber", registry::Subscriber) + .value("Both", registry::Both); + + nb::class_(m, "Participant") + .def_ro("pid", &Participant::pid) + .def_ro("created_at_ns", &Participant::created_at_ns) + .def_ro("channel_type", &Participant::channel_type) + .def_ro("role", &Participant::role) + .def_ro("shm_name", &Participant::shm_name) + .def_ro("node_name", &Participant::node_name) + .def("__repr__", [](Participant const& p) + { + char const* role_name = + p.role == registry::Publisher ? "Publisher" : + p.role == registry::Subscriber ? "Subscriber" : + p.role == registry::Both ? "Both" : "?"; + return std::string{"Participant(shm='"} + p.shm_name + + "', node='" + p.node_name + + "', pid=" + std::to_string(p.pid) + + ", role=" + role_name + ")"; + }); + + nb::class_(m, "Registry") + .def_static("open_or_create", &Registry::open_or_create, + "namespace"_a, "capacity"_a = registry::DEFAULT_CAPACITY, + nb::rv_policy::move, + "Open the registry SHM for `namespace`, creating it if absent.") + .def_static("unlink", &Registry::unlink, "namespace"_a, + "Remove the registry SHM for `namespace` from the filesystem.") + .def("snapshot", &Registry::snapshot, + "Copy all currently Active participant entries. Does not " + "filter by process liveness.") + .def("sweep_stale", &Registry::sweep_stale, + "Reclaim slots owned by processes that no longer exist. " + "Returns the number of slots freed.") + .def_prop_ro("name", &Registry::name) + .def_prop_ro("capacity", &Registry::capacity) + .def("__repr__", [](Registry const& r) + { + return std::string{"Registry(name='"} + r.name() + + "', capacity=" + std::to_string(r.capacity()) + ")"; + }); + // SampleRef (the C++ byte-copy sample) is not bound directly — // try_receive() / receive() auto-convert it to `bytes` at the // Python boundary. Users who want ring-position information @@ -730,7 +783,7 @@ namespace kickmsg // keep_alive<0, 1>: the return value (0) pins the Node (1 = self). nb::class_(m, "Node") .def(nb::init(), - "name"_a, "prefix"_a = std::string{"kickmsg"}) + "name"_a, "namespace"_a = std::string{"kickmsg"}) .def("advertise", [](Node& n, char const* topic, channel::Config const& cfg) { return n.advertise(topic, cfg); }, @@ -773,12 +826,12 @@ namespace kickmsg .def("topic_schema", &Node::topic_schema, "topic"_a) .def("try_claim_topic_schema", &Node::try_claim_topic_schema, "topic"_a, "info"_a) - .def_prop_ro("name", &Node::name) - .def_prop_ro("prefix", &Node::prefix) + .def_prop_ro("name", &Node::name) + .def_prop_ro("namespace", &Node::kmsg_namespace) .def("__repr__", [](Node const& n) { return std::string{"Node(name='"} + n.name() + - "', prefix='" + n.prefix() + "')"; + "', namespace='" + n.kmsg_namespace() + "')"; }); } } diff --git a/src/Node.cc b/src/Node.cc index cec7dca..6ec5659 100644 --- a/src/Node.cc +++ b/src/Node.cc @@ -1,13 +1,76 @@ #include "kickmsg/Node.h" +#include + #include "kickmsg/Naming.h" namespace kickmsg { - Node::Node(std::string const& name, std::string const& prefix) + Node::Node(std::string const& name, std::string const& kmsg_namespace) : name_{sanitize_shm_component(name, "node")} - , prefix_{sanitize_shm_component(prefix, "namespace")} + , namespace_{sanitize_shm_component(kmsg_namespace, "namespace")} + { + } + + Node::~Node() + { + if (registry_.has_value()) + { + for (auto const& [_, rs] : registry_slots_) + { + registry_->deregister(rs.slot_index); + } + } + registry_slots_.clear(); + } + + Registry& Node::lazy_registry() { + if (not registry_.has_value()) + { + registry_.emplace(Registry::open_or_create(namespace_)); + } + return *registry_; + } + + void Node::touch_registry(std::string const& shm_name, + channel::Type channel_type, + registry::Role role) + { + try + { + auto& reg = lazy_registry(); + auto it = registry_slots_.find(shm_name); + if (it != registry_slots_.end()) + { + if (it->second.role != role and it->second.role != registry::Both) + { + reg.deregister(it->second.slot_index); + uint32_t slot = reg.register_participant( + shm_name, channel_type, registry::Both, name_); + if (slot == INVALID_SLOT) + { + registry_slots_.erase(it); + return; + } + it->second = RegistrySlot{slot, registry::Both}; + } + return; + } + + uint32_t slot = reg.register_participant( + shm_name, channel_type, role, name_); + if (slot != INVALID_SLOT) + { + registry_slots_[shm_name] = RegistrySlot{slot, role}; + } + } + catch (std::exception const& e) + { + std::fprintf(stderr, + "kickmsg: registry unavailable for namespace '%s': %s\n", + namespace_.c_str(), e.what()); + } } Publisher Node::advertise(char const* topic, channel::Config const& cfg) @@ -16,6 +79,7 @@ namespace kickmsg auto [it, _] = regions_.emplace( shm_name, SharedRegion::create(shm_name.c_str(), channel::PubSub, cfg, name_.c_str())); + touch_registry(shm_name, channel::PubSub, registry::Publisher); return Publisher(it->second); } @@ -24,10 +88,12 @@ namespace kickmsg auto shm_name = make_topic_name(topic); if (auto* r = find_region(shm_name)) { + touch_registry(shm_name, channel::PubSub, registry::Subscriber); return Subscriber(*r); } auto [it, _] = regions_.emplace( shm_name, SharedRegion::open(shm_name.c_str())); + touch_registry(shm_name, channel::PubSub, registry::Subscriber); return Subscriber(it->second); } @@ -36,12 +102,14 @@ namespace kickmsg auto shm_name = make_topic_name(topic); if (auto* r = find_region(shm_name)) { + touch_registry(shm_name, channel::PubSub, registry::Publisher); return Publisher(*r); } auto [it, _] = regions_.emplace( shm_name, SharedRegion::create_or_open( shm_name.c_str(), channel::PubSub, cfg, name_.c_str())); + touch_registry(shm_name, channel::PubSub, registry::Publisher); return Publisher(it->second); } @@ -50,12 +118,14 @@ namespace kickmsg auto shm_name = make_topic_name(topic); if (auto* r = find_region(shm_name)) { + touch_registry(shm_name, channel::PubSub, registry::Subscriber); return Subscriber(*r); } auto [it, _] = regions_.emplace( shm_name, SharedRegion::create_or_open( shm_name.c_str(), channel::PubSub, cfg, name_.c_str())); + touch_registry(shm_name, channel::PubSub, registry::Subscriber); return Subscriber(it->second); } @@ -64,12 +134,14 @@ namespace kickmsg auto shm_name = make_broadcast_name(channel); if (auto* r = find_region(shm_name)) { + touch_registry(shm_name, channel::Broadcast, registry::Both); return BroadcastHandle{Publisher{*r}, Subscriber{*r}}; } auto [it, _] = regions_.emplace( shm_name, SharedRegion::create_or_open( shm_name.c_str(), channel::Broadcast, cfg, name_.c_str())); + touch_registry(shm_name, channel::Broadcast, registry::Both); return BroadcastHandle{Publisher{it->second}, Subscriber{it->second}}; } @@ -81,6 +153,8 @@ namespace kickmsg auto [it, _] = regions_.emplace( shm_name, SharedRegion::create(shm_name.c_str(), channel::PubSub, mbx_cfg, name_.c_str())); + // Mailbox owner is the one who receives — Subscriber role. + touch_registry(shm_name, channel::PubSub, registry::Subscriber); return Subscriber(it->second); } @@ -89,10 +163,13 @@ namespace kickmsg auto shm_name = make_mailbox_name(owner_node, tag); if (auto* r = find_region(shm_name)) { + touch_registry(shm_name, channel::PubSub, registry::Publisher); return Publisher(*r); } auto [it, _] = regions_.emplace( shm_name, SharedRegion::open(shm_name.c_str())); + // Mailbox sender is the Publisher side. + touch_registry(shm_name, channel::PubSub, registry::Publisher); return Publisher(it->second); } @@ -138,20 +215,20 @@ namespace kickmsg std::string Node::make_topic_name(char const* topic) const { - // prefix_ is pre-sanitized in the ctor; topic is user-supplied on + // namespace_ is pre-sanitized in the ctor; topic is user-supplied on // each call and may be a ROS-style "/a/b/c" path. - return "/" + prefix_ + "_" + sanitize_shm_component(topic, "topic"); + return "/" + namespace_ + "_" + sanitize_shm_component(topic, "topic"); } std::string Node::make_broadcast_name(char const* channel) const { - return "/" + prefix_ + "_broadcast_" + return "/" + namespace_ + "_broadcast_" + sanitize_shm_component(channel, "channel"); } std::string Node::make_mailbox_name(char const* owner, char const* tag) const { - return "/" + prefix_ + "_" + return "/" + namespace_ + "_" + sanitize_shm_component(owner, "mailbox owner") + "_mbx_" + sanitize_shm_component(tag, "mailbox tag"); } diff --git a/src/Region.cc b/src/Region.cc index 96a9a31..9a5313f 100644 --- a/src/Region.cc +++ b/src/Region.cc @@ -2,14 +2,9 @@ #include #include #include -#ifdef _WIN32 -#include -#define getpid _getpid -#else -#include -#endif #include "kickmsg/Region.h" +#include "kickmsg/os/Process.h" #include "kickmsg/os/Time.h" namespace kickmsg @@ -89,7 +84,7 @@ namespace kickmsg h->sub_ring_stride = ring_stride; h->commit_timeout_us = static_cast(cfg.commit_timeout.count()); h->config_hash = compute_config_hash(type, cfg); - h->creator_pid = static_cast(getpid()); + h->creator_pid = kickmsg::current_pid(); h->created_at_ns = static_cast(kickmsg::since_epoch().count()); h->creator_name_len = creator_len; std::memcpy(header_creator_name(h), creator_name, creator_len); diff --git a/src/Registry.cc b/src/Registry.cc new file mode 100644 index 0000000..e39b705 --- /dev/null +++ b/src/Registry.cc @@ -0,0 +1,246 @@ +#include "kickmsg/Registry.h" + +#include +#include +#include + +#include "kickmsg/Naming.h" +#include "kickmsg/os/Process.h" +#include "kickmsg/os/Time.h" + +namespace kickmsg +{ + std::size_t Registry::region_size(uint32_t capacity) + { + return sizeof(RegistryHeader) + + static_cast(capacity) * sizeof(ParticipantEntry); + } + + std::string Registry::make_shm_name(std::string const& kmsg_namespace) + { + return "/" + sanitize_shm_component(kmsg_namespace, "namespace") + + "_registry"; + } + + RegistryHeader* Registry::header() + { + return static_cast(shm_.address()); + } + + RegistryHeader const* Registry::header() const + { + return static_cast(shm_.address()); + } + + ParticipantEntry* Registry::entries() + { + return reinterpret_cast( + static_cast(shm_.address()) + sizeof(RegistryHeader)); + } + + ParticipantEntry const* Registry::entries() const + { + return reinterpret_cast( + static_cast(shm_.address()) + sizeof(RegistryHeader)); + } + + uint32_t Registry::capacity() const + { + return header()->capacity; + } + + void Registry::init_as_creator(uint32_t capacity) + { + std::memset(shm_.address(), 0, region_size(capacity)); + + auto* h = header(); + h->version = registry::VERSION; + h->capacity = capacity; + + // MAGIC published last — readers spin on it with acquire. + h->magic.store(registry::MAGIC, std::memory_order_release); + } + + Registry Registry::open_or_create(std::string const& kmsg_namespace, uint32_t capacity) + { + if (capacity == 0) + { + throw std::invalid_argument("Registry capacity must be > 0"); + } + + std::string name = make_shm_name(kmsg_namespace); + std::size_t bytes = region_size(capacity); + + Registry r; + r.name_ = name; + if (r.shm_.try_create(name, bytes)) + { + r.init_as_creator(capacity); + return r; + } + r.name_.clear(); + + // Creator lost — spin-wait for MAGIC to guarantee we don't read + // half-initialized bytes. + for (int i = 0; i < 200; ++i) + { + SharedMemory shm; + if (shm.try_open(name)) + { + auto const* h = static_cast(shm.address()); + if (h->magic.load(std::memory_order_acquire) == registry::MAGIC) + { + if (h->version != registry::VERSION) + { + throw std::runtime_error( + "Registry version mismatch on " + name); + } + Registry out; + out.name_ = name; + out.shm_ = std::move(shm); + return out; + } + } + kickmsg::sleep(10ms); + } + + throw std::runtime_error("Timed out waiting for registry init: " + name); + } + + void Registry::unlink(std::string const& kmsg_namespace) + { + SharedMemory::unlink(make_shm_name(kmsg_namespace)); + } + + uint32_t Registry::register_participant(std::string const& shm_name, + channel::Type channel_type, + registry::Role role, + std::string const& node_name) + { + auto* h = header(); + auto* es = entries(); + uint32_t cap = h->capacity; + + for (uint32_t i = 0; i < cap; ++i) + { + uint32_t expected = registry::Free; + if (not es[i].state.compare_exchange_strong( + expected, registry::Claiming, + std::memory_order_acq_rel, + std::memory_order_relaxed)) + { + continue; + } + + // Field stores are relaxed; the release-store of Active below + // is the publication fence for readers. + es[i].channel_type = static_cast(channel_type); + es[i].role = static_cast(role); + es[i]._padding1 = 0; + es[i].pid = current_pid(); + es[i].created_at_ns = static_cast( + kickmsg::since_epoch().count()); + + std::memset(es[i].shm_name, 0, sizeof(es[i].shm_name)); + std::size_t n = std::min(shm_name.size(), sizeof(es[i].shm_name) - 1); + std::memcpy(es[i].shm_name, shm_name.data(), n); + + std::memset(es[i].node_name, 0, sizeof(es[i].node_name)); + n = std::min(node_name.size(), sizeof(es[i].node_name) - 1); + std::memcpy(es[i].node_name, node_name.data(), n); + + std::memset(es[i]._padding2, 0, sizeof(es[i]._padding2)); + + es[i].state.store(registry::Active, std::memory_order_release); + return i; + } + + return INVALID_SLOT; + } + + void Registry::deregister(uint32_t slot_index) + { + if (slot_index == INVALID_SLOT) + { + return; + } + auto* h = header(); + auto* es = entries(); + if (slot_index >= h->capacity) + { + return; + } + es[slot_index].state.store(registry::Free, std::memory_order_release); + } + + std::vector Registry::snapshot() const + { + auto const* h = header(); + auto const* es = entries(); + uint32_t cap = h->capacity; + + std::vector out; + out.reserve(cap); + for (uint32_t i = 0; i < cap; ++i) + { + uint32_t s = es[i].state.load(std::memory_order_acquire); + if (s != registry::Active) + { + continue; + } + + Participant p{}; + p.pid = es[i].pid; + p.created_at_ns = es[i].created_at_ns; + p.channel_type = es[i].channel_type; + p.role = es[i].role; + p.shm_name.assign( + es[i].shm_name, + ::strnlen(es[i].shm_name, sizeof(es[i].shm_name))); + p.node_name.assign( + es[i].node_name, + ::strnlen(es[i].node_name, sizeof(es[i].node_name))); + + // Seqlock-style recheck: if the slot was dereg'd and reclaimed + // while we copied, our fields may mix two tenants. + uint32_t s2 = es[i].state.load(std::memory_order_acquire); + if (s2 != registry::Active) + { + continue; + } + out.push_back(std::move(p)); + } + return out; + } + + uint32_t Registry::sweep_stale() + { + auto* h = header(); + auto* es = entries(); + uint32_t cap = h->capacity; + + uint32_t freed = 0; + for (uint32_t i = 0; i < cap; ++i) + { + uint32_t s = es[i].state.load(std::memory_order_acquire); + if (s != registry::Active) + { + continue; + } + uint64_t pid = es[i].pid; + if (process_exists(pid)) + { + continue; + } + uint32_t expected = registry::Active; + if (es[i].state.compare_exchange_strong( + expected, registry::Free, + std::memory_order_acq_rel, + std::memory_order_relaxed)) + { + ++freed; + } + } + return freed; + } +} diff --git a/src/Subscriber.cc b/src/Subscriber.cc index e47f37f..644bd9c 100644 --- a/src/Subscriber.cc +++ b/src/Subscriber.cc @@ -85,7 +85,7 @@ namespace kickmsg ++drain_timeouts_; break; } - kickmsg::sleep(0ns); + kickmsg::yield(); } if (quiesced) diff --git a/src/os/darwin/SharedMemory.cc b/src/os/darwin/SharedMemory.cc index 4c48089..a4798c3 100644 --- a/src/os/darwin/SharedMemory.cc +++ b/src/os/darwin/SharedMemory.cc @@ -1,5 +1,5 @@ -// macOS uses POSIX shared memory (same as Linux). -// shm_open / ftruncate / mmap are available on all supported macOS versions. +// macOS-specific SharedMemory::create(). Other methods live in +// src/os/posix/SharedMemory.cc. #include "kickmsg/os/SharedMemory.h" #include @@ -8,173 +8,21 @@ #include #include #include -#include -#include namespace kickmsg { - static void throw_system_error(char const* context) - { - throw std::system_error(errno, std::system_category(), context); - } - - SharedMemory::SharedMemory(SharedMemory&& other) noexcept - : size_{other.size_} - , address_{other.address_} - , fd_{other.fd_} - { - other.size_ = 0; - other.address_ = nullptr; - other.fd_ = INVALID_SHM_HANDLE; - } - - SharedMemory& SharedMemory::operator=(SharedMemory&& other) noexcept - { - if (this != &other) - { - close(); - size_ = other.size_; - address_ = other.address_; - fd_ = other.fd_; - other.size_ = 0; - other.address_ = nullptr; - other.fd_ = INVALID_SHM_HANDLE; - } - return *this; - } - - SharedMemory::~SharedMemory() - { - close(); - } - - void SharedMemory::map(std::size_t size) - { - if (::ftruncate(fd_, static_cast(size)) < 0) - { - ::close(fd_); - fd_ = INVALID_SHM_HANDLE; - throw_system_error("SharedMemory: ftruncate()"); - } - - address_ = ::mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0); - if (address_ == MAP_FAILED) - { - address_ = nullptr; - ::close(fd_); - fd_ = INVALID_SHM_HANDLE; - throw_system_error("SharedMemory: mmap()"); - } - - size_ = size; - } - void SharedMemory::create(std::string const& name, std::size_t size) { - // macOS has two shm_open / ftruncate quirks that the original - // `O_CREAT | O_TRUNC` Linux pattern trips over: - // 1. shm_open(O_CREAT|O_TRUNC) on an existing SHM object - // returns EINVAL — Linux accepts it, Darwin rejects it. - // 2. ftruncate() can only be called once per SHM object; a - // second call on the same object returns EINVAL. - // Unlink-then-exclusive-create sidesteps both: the subsequent - // shm_open sees a name that either didn't exist or was just - // detached, and the following ftruncate is always the first - // sizing call on a fresh object. - // - // This function is called by SharedRegion::create() (the strict - // factory where the caller intends exclusive ownership). The - // race-prone caller SharedRegion::create_or_open() was refactored - // to NOT re-enter this function after its try_create probe — it - // stamps the header directly on the probe's mapping. + // Darwin's shm_open(O_CREAT|O_TRUNC) returns EINVAL on an existing + // object, and ftruncate can only be called once per object. Unlink + // first, then exclusive-create, to sidestep both. ::shm_unlink(name.c_str()); fd_ = ::shm_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, 0666); if (fd_ < 0) { - throw_system_error("SharedMemory: shm_open(create)"); + throw std::system_error(errno, std::system_category(), + "SharedMemory: shm_open(create)"); } map(size); } - - bool SharedMemory::try_create(std::string const& name, std::size_t size) - { - // Keep the fd and do the full setup (ftruncate + mmap) inline. - // We must NOT close the fd and call create() — create() would - // shm_unlink the name (to sidestep Darwin's O_TRUNC quirk) and - // recreate a different object, racing any concurrent caller that - // observed the original name between our close and create's CAS. - fd_ = ::shm_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, 0666); - if (fd_ < 0) - { - if (errno == EEXIST) - { - fd_ = INVALID_SHM_HANDLE; - return false; - } - throw_system_error("SharedMemory: shm_open(try_create)"); - } - map(size); - return true; - } - - void SharedMemory::open(std::string const& name) - { - if (not try_open(name)) - { - throw_system_error("SharedMemory: shm_open(open)"); - } - } - - bool SharedMemory::try_open(std::string const& name) - { - fd_ = ::shm_open(name.c_str(), O_RDWR, 0); - if (fd_ < 0) - { - if (errno == ENOENT) - { - fd_ = INVALID_SHM_HANDLE; - return false; - } - throw_system_error("SharedMemory: shm_open(try_open)"); - } - - struct stat st{}; - if (::fstat(fd_, &st) < 0) - { - ::close(fd_); - fd_ = INVALID_SHM_HANDLE; - throw_system_error("SharedMemory: fstat()"); - } - - size_ = static_cast(st.st_size); - address_ = ::mmap(nullptr, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0); - if (address_ == MAP_FAILED) - { - address_ = nullptr; - ::close(fd_); - fd_ = INVALID_SHM_HANDLE; - throw_system_error("SharedMemory: mmap()"); - } - return true; - } - - void SharedMemory::close() - { - if (address_ != nullptr) - { - ::munmap(address_, size_); - address_ = nullptr; - } - if (fd_ != INVALID_SHM_HANDLE) - { - ::close(fd_); - fd_ = INVALID_SHM_HANDLE; - } - size_ = 0; - } - - void SharedMemory::unlink(std::string const& name) - { - ::shm_unlink(name.c_str()); - } } diff --git a/src/os/darwin/Time.cc b/src/os/darwin/Time.cc index 0f67177..d7348b4 100644 --- a/src/os/darwin/Time.cc +++ b/src/os/darwin/Time.cc @@ -1,5 +1,5 @@ -// macOS uses POSIX clock_gettime (available since macOS 10.12). -// clock_nanosleep is NOT available on macOS — use nanosleep instead. +// macOS-specific sleep(). clock_nanosleep is unavailable; nanosleep is the +// POSIX fallback. Other Time entry points live in src/os/posix/Time.cc. #include "kickmsg/os/Time.h" #include @@ -18,7 +18,7 @@ namespace kickmsg while (true) { timespec required = remaining; - int result = nanosleep(&required, &remaining); + int result = ::nanosleep(&required, &remaining); if (result == 0) { return; @@ -30,16 +30,4 @@ namespace kickmsg throw std::system_error(errno, std::system_category(), "nanosleep()"); } } - - nanoseconds since_epoch() - { - timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - return seconds{ts.tv_sec} + nanoseconds{ts.tv_nsec}; - } - - nanoseconds elapsed_time(nanoseconds start) - { - return since_epoch() - start; - } } diff --git a/src/os/linux/SharedMemory.cc b/src/os/linux/SharedMemory.cc index d2ac21b..3503a37 100644 --- a/src/os/linux/SharedMemory.cc +++ b/src/os/linux/SharedMemory.cc @@ -1,3 +1,5 @@ +// Linux-specific SharedMemory::create(). Other methods live in +// src/os/posix/SharedMemory.cc. #include "kickmsg/os/SharedMemory.h" #include @@ -6,155 +8,17 @@ #include #include #include -#include -#include namespace kickmsg { - static void throw_system_error(char const* context) - { - throw std::system_error(errno, std::system_category(), context); - } - - SharedMemory::SharedMemory(SharedMemory&& other) noexcept - : size_{other.size_} - , address_{other.address_} - , fd_{other.fd_} - { - other.size_ = 0; - other.address_ = nullptr; - other.fd_ = INVALID_SHM_HANDLE; - } - - SharedMemory& SharedMemory::operator=(SharedMemory&& other) noexcept - { - if (this != &other) - { - close(); - size_ = other.size_; - address_ = other.address_; - fd_ = other.fd_; - other.size_ = 0; - other.address_ = nullptr; - other.fd_ = INVALID_SHM_HANDLE; - } - return *this; - } - - SharedMemory::~SharedMemory() - { - close(); - } - - void SharedMemory::map(std::size_t size) - { - if (::ftruncate(fd_, static_cast(size)) < 0) - { - ::close(fd_); - fd_ = -1; - throw_system_error("SharedMemory.cc: ftruncate()"); - } - - address_ = ::mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0); - if (address_ == MAP_FAILED) - { - address_ = nullptr; - ::close(fd_); - fd_ = -1; - throw_system_error("SharedMemory.cc: mmap()"); - } - - size_ = size; - } - void SharedMemory::create(std::string const& name, std::size_t size) { fd_ = ::shm_open(name.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666); if (fd_ < 0) { - throw_system_error("SharedMemory.cc: shm_open(create)"); + throw std::system_error(errno, std::system_category(), + "SharedMemory: shm_open(create)"); } map(size); } - - bool SharedMemory::try_create(std::string const& name, std::size_t size) - { - // Keep the fd and do the full setup (ftruncate + mmap) inline. - // SharedRegion::create_or_open consumes the resulting mapping - // directly — there's no reason to close here and re-enter create(), - // and the old round-trip pattern caused a subtle race on Darwin. - fd_ = ::shm_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, 0666); - if (fd_ < 0) - { - if (errno == EEXIST) - { - fd_ = INVALID_SHM_HANDLE; - return false; - } - throw_system_error("SharedMemory.cc: shm_open(try_create)"); - } - map(size); - return true; - } - - void SharedMemory::open(std::string const& name) - { - if (not try_open(name)) - { - throw_system_error("SharedMemory: shm_open(open)"); - } - } - - bool SharedMemory::try_open(std::string const& name) - { - fd_ = ::shm_open(name.c_str(), O_RDWR, 0); - if (fd_ < 0) - { - if (errno == ENOENT) - { - fd_ = INVALID_SHM_HANDLE; - return false; - } - throw_system_error("SharedMemory: shm_open(try_open)"); - } - - struct stat st{}; - if (::fstat(fd_, &st) < 0) - { - ::close(fd_); - fd_ = INVALID_SHM_HANDLE; - throw_system_error("SharedMemory: fstat()"); - } - - size_ = static_cast(st.st_size); - address_ = ::mmap(nullptr, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0); - if (address_ == MAP_FAILED) - { - address_ = nullptr; - ::close(fd_); - fd_ = INVALID_SHM_HANDLE; - throw_system_error("SharedMemory: mmap()"); - } - return true; - } - - void SharedMemory::close() - { - if (address_ != nullptr) - { - ::munmap(address_, size_); - address_ = nullptr; - } - if (fd_ != INVALID_SHM_HANDLE) - { - ::close(fd_); - fd_ = -1; - } - size_ = 0; - } - - void SharedMemory::unlink(std::string const& name) - { - ::shm_unlink(name.c_str()); - } } diff --git a/src/os/linux/Time.cc b/src/os/linux/Time.cc index 89d323a..efb8b97 100644 --- a/src/os/linux/Time.cc +++ b/src/os/linux/Time.cc @@ -1,10 +1,11 @@ +// Linux-specific sleep(). Other Time entry points live in +// src/os/posix/Time.cc. #include "kickmsg/os/Time.h" #include #include #include #include -#include namespace kickmsg { @@ -17,7 +18,7 @@ namespace kickmsg while (true) { timespec required = remaining; - int result = clock_nanosleep(CLOCK_MONOTONIC, 0, &required, &remaining); + int result = ::clock_nanosleep(CLOCK_MONOTONIC, 0, &required, &remaining); if (result == 0) { return; @@ -29,16 +30,4 @@ namespace kickmsg throw std::system_error(result, std::system_category(), "clock_nanosleep()"); } } - - nanoseconds since_epoch() - { - timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - return seconds{ts.tv_sec} + nanoseconds{ts.tv_nsec}; - } - - nanoseconds elapsed_time(nanoseconds start) - { - return since_epoch() - start; - } } diff --git a/src/os/posix/Process.cc b/src/os/posix/Process.cc new file mode 100644 index 0000000..4eeb77a --- /dev/null +++ b/src/os/posix/Process.cc @@ -0,0 +1,28 @@ +#include "kickmsg/os/Process.h" + +#include +#include +#include +#include + +namespace kickmsg +{ + uint64_t current_pid() noexcept + { + return static_cast(::getpid()); + } + + bool process_exists(uint64_t pid) noexcept + { + if (pid == 0) + { + return false; + } + if (::kill(static_cast(pid), 0) == 0) + { + return true; + } + // EPERM means the process exists but we can't signal it. + return errno == EPERM; + } +} diff --git a/src/os/posix/SharedMemory.cc b/src/os/posix/SharedMemory.cc new file mode 100644 index 0000000..b98a85e --- /dev/null +++ b/src/os/posix/SharedMemory.cc @@ -0,0 +1,155 @@ +// Parts of SharedMemory that are identical on Linux and macOS. +// Platform-specific create() lives in src/os/{linux,darwin}/SharedMemory.cc. +#include "kickmsg/os/SharedMemory.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace kickmsg +{ + namespace + { + [[noreturn]] void throw_system_error(char const* context) + { + throw std::system_error(errno, std::system_category(), context); + } + } + + SharedMemory::SharedMemory(SharedMemory&& other) noexcept + : size_{other.size_} + , address_{other.address_} + , fd_{other.fd_} + { + other.size_ = 0; + other.address_ = nullptr; + other.fd_ = INVALID_SHM_HANDLE; + } + + SharedMemory& SharedMemory::operator=(SharedMemory&& other) noexcept + { + if (this != &other) + { + close(); + size_ = other.size_; + address_ = other.address_; + fd_ = other.fd_; + other.size_ = 0; + other.address_ = nullptr; + other.fd_ = INVALID_SHM_HANDLE; + } + return *this; + } + + SharedMemory::~SharedMemory() + { + close(); + } + + void SharedMemory::map(std::size_t size) + { + if (::ftruncate(fd_, static_cast(size)) < 0) + { + ::close(fd_); + fd_ = INVALID_SHM_HANDLE; + throw_system_error("SharedMemory: ftruncate()"); + } + + address_ = ::mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0); + if (address_ == MAP_FAILED) + { + address_ = nullptr; + ::close(fd_); + fd_ = INVALID_SHM_HANDLE; + throw_system_error("SharedMemory: mmap()"); + } + + size_ = size; + } + + bool SharedMemory::try_create(std::string const& name, std::size_t size) + { + // Keep the fd and do the full setup (ftruncate + mmap) inline. + // SharedRegion::create_or_open consumes the resulting mapping + // directly — there's no reason to close here and re-enter create(), + // and the old round-trip pattern caused a subtle race on Darwin. + fd_ = ::shm_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, 0666); + if (fd_ < 0) + { + if (errno == EEXIST) + { + fd_ = INVALID_SHM_HANDLE; + return false; + } + throw_system_error("SharedMemory: shm_open(try_create)"); + } + map(size); + return true; + } + + void SharedMemory::open(std::string const& name) + { + if (not try_open(name)) + { + throw_system_error("SharedMemory: shm_open(open)"); + } + } + + bool SharedMemory::try_open(std::string const& name) + { + fd_ = ::shm_open(name.c_str(), O_RDWR, 0); + if (fd_ < 0) + { + if (errno == ENOENT) + { + fd_ = INVALID_SHM_HANDLE; + return false; + } + throw_system_error("SharedMemory: shm_open(try_open)"); + } + + struct stat st{}; + if (::fstat(fd_, &st) < 0) + { + ::close(fd_); + fd_ = INVALID_SHM_HANDLE; + throw_system_error("SharedMemory: fstat()"); + } + + size_ = static_cast(st.st_size); + address_ = ::mmap(nullptr, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0); + if (address_ == MAP_FAILED) + { + address_ = nullptr; + ::close(fd_); + fd_ = INVALID_SHM_HANDLE; + throw_system_error("SharedMemory: mmap()"); + } + return true; + } + + void SharedMemory::close() + { + if (address_ != nullptr) + { + ::munmap(address_, size_); + address_ = nullptr; + } + if (fd_ != INVALID_SHM_HANDLE) + { + ::close(fd_); + fd_ = INVALID_SHM_HANDLE; + } + size_ = 0; + } + + void SharedMemory::unlink(std::string const& name) + { + ::shm_unlink(name.c_str()); + } +} diff --git a/src/os/posix/Time.cc b/src/os/posix/Time.cc new file mode 100644 index 0000000..61441e9 --- /dev/null +++ b/src/os/posix/Time.cc @@ -0,0 +1,32 @@ +// Parts of the Time API that are identical on Linux and macOS. +// Per-platform sleep() lives in src/os/{linux,darwin}/Time.cc. +#include "kickmsg/os/Time.h" + +#include +#include +#include +#include +#include + +namespace kickmsg +{ + void yield() + { + ::sched_yield(); + } + + nanoseconds since_epoch() + { + timespec ts; + if (::clock_gettime(CLOCK_MONOTONIC, &ts) != 0) + { + throw std::system_error(errno, std::system_category(), "clock_gettime()"); + } + return seconds{ts.tv_sec} + nanoseconds{ts.tv_nsec}; + } + + nanoseconds elapsed_time(nanoseconds start) + { + return since_epoch() - start; + } +} diff --git a/src/os/windows/Process.cc b/src/os/windows/Process.cc new file mode 100644 index 0000000..3cbb0a3 --- /dev/null +++ b/src/os/windows/Process.cc @@ -0,0 +1,33 @@ +#include "kickmsg/os/Process.h" + +#define WIN32_LEAN_AND_MEAN +#include + +namespace kickmsg +{ + uint64_t current_pid() noexcept + { + return static_cast(::GetCurrentProcessId()); + } + + bool process_exists(uint64_t pid) noexcept + { + if (pid == 0) + { + return false; + } + // PROCESS_QUERY_LIMITED_INFORMATION is granted even for processes + // running under different integrity levels / sessions, which is + // what we want for a cross-user discovery tool. + HANDLE h = ::OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, + FALSE, + static_cast(pid)); + if (h != nullptr) + { + ::CloseHandle(h); + return true; + } + // ERROR_ACCESS_DENIED: process exists but we can't open it. + return ::GetLastError() == ERROR_ACCESS_DENIED; + } +} diff --git a/src/os/windows/Time.cc b/src/os/windows/Time.cc index 4042403..d035cbc 100644 --- a/src/os/windows/Time.cc +++ b/src/os/windows/Time.cc @@ -10,12 +10,17 @@ namespace kickmsg auto ms = duration_cast(ns); if (ms.count() <= 0) { - SwitchToThread(); + yield(); return; } Sleep(static_cast(ms.count())); } + void yield() + { + ::SwitchToThread(); + } + nanoseconds since_epoch() { static LARGE_INTEGER freq{}; diff --git a/src/types.cc b/src/types.cc index 1b86c62..b932c64 100644 --- a/src/types.cc +++ b/src/types.cc @@ -37,22 +37,17 @@ namespace kickmsg return reinterpret_cast(h) + sizeof(Header); } - // FNV-1a hash of config fields, used to detect parameter mismatches - // when opening an existing region. Chained through hash::fnv1a_64() - // so the hashed byte sequence (and therefore the resulting value) is - // identical to a single fnv1a_64 over the concatenation of the same - // raw field bytes in the same order — do NOT reorder these fields - // without bumping VERSION, since existing regions on disk are hashed - // with this ordering. + // FNV-1a over the config fields, detecting parameter mismatches at + // open time. Field order is part of the on-disk hash — do NOT + // reorder without bumping VERSION. uint64_t compute_config_hash(channel::Type type, channel::Config const& cfg) { - uint64_t h; - h = hash::fnv1a_64(&type, sizeof(type)); - h = hash::fnv1a_64(&cfg.max_subscribers, sizeof(cfg.max_subscribers), h); - h = hash::fnv1a_64(&cfg.sub_ring_capacity, sizeof(cfg.sub_ring_capacity), h); - h = hash::fnv1a_64(&cfg.pool_size, sizeof(cfg.pool_size), h); - h = hash::fnv1a_64(&cfg.max_payload_size, sizeof(cfg.max_payload_size), h); - h = hash::fnv1a_64(&cfg.commit_timeout, sizeof(cfg.commit_timeout), h); + uint64_t h = hash::fnv1a_64(type); + h = hash::fnv1a_64(cfg.max_subscribers, h); + h = hash::fnv1a_64(cfg.sub_ring_capacity, h); + h = hash::fnv1a_64(cfg.pool_size, h); + h = hash::fnv1a_64(cfg.max_payload_size, h); + h = hash::fnv1a_64(cfg.commit_timeout.count(), h); return h; } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 0adee45..0c3786e 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -5,6 +5,7 @@ add_executable(kickmsg_unit unit/subscriber-t.cc unit/naming-t.cc unit/node-t.cc + unit/registry-t.cc ) target_link_libraries(kickmsg_unit PRIVATE kickmsg GTest::gmock_main) set_target_properties(kickmsg_unit diff --git a/tests/crash_test.cc b/tests/crash_test.cc index 1726eb8..8c09fd2 100644 --- a/tests/crash_test.cc +++ b/tests/crash_test.cc @@ -49,7 +49,7 @@ static void child_publisher_main(int /*round*/) auto* ptr = pub.allocate(sizeof(CrashPayload)); if (ptr == nullptr) { - kickmsg::sleep(0ns); + kickmsg::yield(); continue; } @@ -182,7 +182,7 @@ static RoundResult run_one_round(int round) msg.checksum = compute_checksum(msg); while (pub.send(&msg, sizeof(msg)) < 0) { - kickmsg::sleep(0ns); + kickmsg::yield(); } } _exit(0); diff --git a/tests/stress/churn.cc b/tests/stress/churn.cc index 672a54b..68ad3db 100644 --- a/tests/stress/churn.cc +++ b/tests/stress/churn.cc @@ -33,7 +33,7 @@ bool run_subscriber_churn() while (pub.send(&msg, sizeof(msg)) < 0) { - kickmsg::sleep(0ns); + kickmsg::yield(); } } pub_done = true; diff --git a/tests/stress/edge_cases.cc b/tests/stress/edge_cases.cc index efe4114..756f794 100644 --- a/tests/stress/edge_cases.cc +++ b/tests/stress/edge_cases.cc @@ -210,7 +210,7 @@ bool run_subscriber_saturation() std::fprintf(stderr, " [FATAL] send() returned %d\n", rc); std::abort(); } - kickmsg::sleep(0ns); + kickmsg::yield(); } } } diff --git a/tests/stress/live_repair.cc b/tests/stress/live_repair.cc index 55c4d0b..f393d2a 100644 --- a/tests/stress/live_repair.cc +++ b/tests/stress/live_repair.cc @@ -50,7 +50,7 @@ bool run_live_repair() } else if (rc == -EAGAIN) { - kickmsg::sleep(0ns); + kickmsg::yield(); } } }; diff --git a/tests/stress/pool_exhaustion.cc b/tests/stress/pool_exhaustion.cc index 729f5c0..d22dd45 100644 --- a/tests/stress/pool_exhaustion.cc +++ b/tests/stress/pool_exhaustion.cc @@ -46,7 +46,7 @@ bool run_pool_exhaustion() std::abort(); } eagain_count.fetch_add(1, std::memory_order_relaxed); - kickmsg::sleep(0ns); + kickmsg::yield(); } } }; diff --git a/tests/stress/treiber.cc b/tests/stress/treiber.cc index 0bccfb0..6314a6f 100644 --- a/tests/stress/treiber.cc +++ b/tests/stress/treiber.cc @@ -30,7 +30,7 @@ bool run_treiber_stress() if (idx == kickmsg::INVALID_SLOT) { contention_hits.fetch_add(1); - kickmsg::sleep(0ns); + kickmsg::yield(); --i; continue; } diff --git a/tests/unit/node-t.cc b/tests/unit/node-t.cc index 5b63302..d0ba578 100644 --- a/tests/unit/node-t.cc +++ b/tests/unit/node-t.cc @@ -62,7 +62,7 @@ TEST_F(NodeTest, NamingConventions) { kickmsg::Node node("mynode", "app"); EXPECT_EQ(node.name(), "mynode"); - EXPECT_EQ(node.prefix(), "app"); + EXPECT_EQ(node.kmsg_namespace(), "app"); } TEST_F(NodeTest, JoinBroadcastTwoNodes) @@ -314,10 +314,10 @@ TEST_F(NodeTest, RosStyleTopicNamesAreSanitizedIntoShmPath) ASSERT_TRUE(got.has_value()); EXPECT_EQ(std::memcmp(got->data(), &val, sizeof(val)), 0); - // Also confirm Node::name() / prefix() return the sanitized form so - // callers can log/introspect the actual identifiers in use. - EXPECT_EQ(pub_node.prefix(), "test.ns"); - EXPECT_EQ(pub_node.name(), "drv"); + // Also confirm Node::name() / kmsg_namespace() return the sanitized + // form so callers can log/introspect the actual identifiers in use. + EXPECT_EQ(pub_node.kmsg_namespace(), "test.ns"); + EXPECT_EQ(pub_node.name(), "drv"); } TEST_F(NodeTest, EmptyTopicNameThrows) diff --git a/tests/unit/region-t.cc b/tests/unit/region-t.cc index 392ed04..06e105a 100644 --- a/tests/unit/region-t.cc +++ b/tests/unit/region-t.cc @@ -8,12 +8,8 @@ #include #include #include -#ifdef _WIN32 -#include -#define getpid _getpid -#else -#include -#endif + +#include "kickmsg/os/Process.h" class RegionTest : public ::testing::Test { @@ -120,7 +116,7 @@ TEST_F(RegionTest, HeaderStoresCreatorMetadata) SHM_NAME, kickmsg::channel::PubSub, cfg, "my_node"); auto* hdr = region.header(); - EXPECT_EQ(hdr->creator_pid, static_cast(getpid())); + EXPECT_EQ(hdr->creator_pid, kickmsg::current_pid()); EXPECT_GT(hdr->created_at_ns, 0u); EXPECT_NE(hdr->config_hash, 0u); } diff --git a/tests/unit/registry-t.cc b/tests/unit/registry-t.cc new file mode 100644 index 0000000..d95e5e4 --- /dev/null +++ b/tests/unit/registry-t.cc @@ -0,0 +1,261 @@ +#include +#include +#include + +#include + +#include "kickmsg/Node.h" +#include "kickmsg/Registry.h" + +class RegistryTest : public ::testing::Test +{ +protected: + static constexpr char const* KMSG_NAMESPACE = "kickmsg_regtest"; + + void SetUp() override + { + kickmsg::Registry::unlink(KMSG_NAMESPACE); + } + + void TearDown() override + { + kickmsg::Registry::unlink(KMSG_NAMESPACE); + for (auto const& name : shm_to_unlink_) + { + kickmsg::SharedMemory::unlink(name); + } + } + + void track(std::string name) + { + shm_to_unlink_.push_back(std::move(name)); + } + +private: + std::vector shm_to_unlink_; +}; + +TEST_F(RegistryTest, OpenOrCreateIsIdempotent) +{ + auto r1 = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); + EXPECT_EQ(r1.name(), std::string{"/"} + KMSG_NAMESPACE + "_registry"); + + // Second call opens the existing region — same name, same capacity. + auto r2 = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); + EXPECT_EQ(r1.capacity(), r2.capacity()); + EXPECT_EQ(r1.name(), r2.name()); +} + +TEST_F(RegistryTest, RegisterAndSnapshotRoundTrip) +{ + auto reg = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); + + uint32_t s1 = reg.register_participant( + "/test_topic_a", kickmsg::channel::PubSub, + kickmsg::registry::Publisher, "node_alpha"); + ASSERT_NE(s1, kickmsg::INVALID_SLOT); + + uint32_t s2 = reg.register_participant( + "/test_topic_a", kickmsg::channel::PubSub, + kickmsg::registry::Subscriber, "node_beta"); + ASSERT_NE(s2, kickmsg::INVALID_SLOT); + EXPECT_NE(s1, s2); + + auto snap = reg.snapshot(); + ASSERT_EQ(snap.size(), 2u); + + // Collect into a set so we don't depend on iteration order. + std::unordered_set roles_by_node; + for (auto const& p : snap) + { + EXPECT_EQ(p.shm_name, "/test_topic_a"); + EXPECT_EQ(p.channel_type, kickmsg::channel::PubSub); + roles_by_node.insert(p.node_name + ":" + std::to_string(p.role)); + } + EXPECT_TRUE(roles_by_node.count("node_alpha:1")); // Publisher = 1 + EXPECT_TRUE(roles_by_node.count("node_beta:2")); // Subscriber = 2 + + reg.deregister(s1); + auto after = reg.snapshot(); + ASSERT_EQ(after.size(), 1u); + EXPECT_EQ(after[0].node_name, "node_beta"); +} + +TEST_F(RegistryTest, DeregisterInvalidSlotIsNoop) +{ + auto reg = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); + // Should not crash or throw. + reg.deregister(kickmsg::INVALID_SLOT); + reg.deregister(99999); // Past capacity — silently ignored. + EXPECT_EQ(reg.snapshot().size(), 0u); +} + +TEST_F(RegistryTest, CapacityExhaustionReturnsInvalidSlot) +{ + // Small capacity so we can fill it quickly. + constexpr uint32_t CAP = 4; + auto reg = kickmsg::Registry::open_or_create(KMSG_NAMESPACE, CAP); + EXPECT_EQ(reg.capacity(), CAP); + + std::vector slots; + for (uint32_t i = 0; i < CAP; ++i) + { + uint32_t s = reg.register_participant( + "/test_topic_" + std::to_string(i), + kickmsg::channel::PubSub, + kickmsg::registry::Publisher, "node"); + ASSERT_NE(s, kickmsg::INVALID_SLOT); + slots.push_back(s); + } + + // One more push tips it over. + uint32_t full = reg.register_participant( + "/overflow", kickmsg::channel::PubSub, + kickmsg::registry::Publisher, "node"); + EXPECT_EQ(full, kickmsg::INVALID_SLOT); + + // Free a slot and try again — should succeed. + reg.deregister(slots[0]); + uint32_t reclaimed = reg.register_participant( + "/after_free", kickmsg::channel::PubSub, + kickmsg::registry::Subscriber, "node2"); + EXPECT_NE(reclaimed, kickmsg::INVALID_SLOT); +} + +TEST_F(RegistryTest, VersionMismatchOnSmallerExistingRegionThrows) +{ + // Validate the open path still works when the region already exists: + // open_or_create should happily attach to an existing compatible + // region of a different capacity (capacity is only used on create). + auto created = kickmsg::Registry::open_or_create(KMSG_NAMESPACE, 8); + EXPECT_EQ(created.capacity(), 8u); + + auto opened = kickmsg::Registry::open_or_create(KMSG_NAMESPACE, 1024); + // Capacity from the existing region, not the requested one. + EXPECT_EQ(opened.capacity(), 8u); +} + +TEST_F(RegistryTest, SweepStaleRemovesDeadPidEntries) +{ + auto reg = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); + + // Live entry — current process pid. + uint32_t alive = reg.register_participant( + "/live_topic", kickmsg::channel::PubSub, + kickmsg::registry::Publisher, "alive"); + ASSERT_NE(alive, kickmsg::INVALID_SLOT); + + // Live entry for this process (via another participant). + reg.register_participant( + "/live_topic2", kickmsg::channel::PubSub, + kickmsg::registry::Subscriber, "alive2"); + + EXPECT_EQ(reg.snapshot().size(), 2u); + + // No sweep needed yet — both pids alive. + EXPECT_EQ(reg.sweep_stale(), 0u); + EXPECT_EQ(reg.snapshot().size(), 2u); +} + +// ----------------------------------------------------------------------------- +// Node integration — Node advertise/subscribe/etc should populate the registry +// ----------------------------------------------------------------------------- + +TEST_F(RegistryTest, NodeAdvertiseRegistersPublisher) +{ + kickmsg::channel::Config cfg; + cfg.max_subscribers = 2; + cfg.sub_ring_capacity = 4; + cfg.pool_size = 8; + cfg.max_payload_size = 32; + + { + kickmsg::Node n("pub_node", KMSG_NAMESPACE); + auto pub = n.advertise("topicX", cfg); + track("/" + std::string{KMSG_NAMESPACE} + "_topicX"); + + auto reg = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); + auto snap = reg.snapshot(); + ASSERT_EQ(snap.size(), 1u); + EXPECT_EQ(snap[0].node_name, "pub_node"); + EXPECT_EQ(snap[0].role, kickmsg::registry::Publisher); + EXPECT_EQ(snap[0].shm_name, + std::string{"/"} + KMSG_NAMESPACE + "_topicX"); + } + + // Node went out of scope — entry should be gone. + auto reg = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); + EXPECT_EQ(reg.snapshot().size(), 0u); +} + +TEST_F(RegistryTest, NodeBroadcastRegistersBoth) +{ + kickmsg::channel::Config cfg; + cfg.max_subscribers = 2; + cfg.sub_ring_capacity = 4; + cfg.pool_size = 8; + cfg.max_payload_size = 32; + + kickmsg::Node n("bcast_node", KMSG_NAMESPACE); + auto bh = n.join_broadcast("chanX", cfg); + track("/" + std::string{KMSG_NAMESPACE} + "_broadcast_chanX"); + + auto reg = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); + auto snap = reg.snapshot(); + ASSERT_EQ(snap.size(), 1u); + EXPECT_EQ(snap[0].role, kickmsg::registry::Both); + EXPECT_EQ(snap[0].channel_type, kickmsg::channel::Broadcast); +} + +TEST_F(RegistryTest, NodeAdvertiseThenSubscribeUpgradesToBoth) +{ + // A Node that both advertises and subscribes to the same topic should + // appear once in the registry with role=Both (not two entries). + kickmsg::channel::Config cfg; + cfg.max_subscribers = 2; + cfg.sub_ring_capacity = 4; + cfg.pool_size = 8; + cfg.max_payload_size = 32; + + kickmsg::Node n("dual_node", KMSG_NAMESPACE); + auto pub = n.advertise("dualtopic", cfg); + auto sub = n.subscribe("dualtopic"); + track("/" + std::string{KMSG_NAMESPACE} + "_dualtopic"); + + auto reg = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); + auto snap = reg.snapshot(); + ASSERT_EQ(snap.size(), 1u); + EXPECT_EQ(snap[0].role, kickmsg::registry::Both); + EXPECT_EQ(snap[0].node_name, "dual_node"); +} + +TEST_F(RegistryTest, MultipleNodesEachAppearOnce) +{ + kickmsg::channel::Config cfg; + cfg.max_subscribers = 4; + cfg.sub_ring_capacity = 4; + cfg.pool_size = 8; + cfg.max_payload_size = 32; + + kickmsg::Node pub("pub_a", KMSG_NAMESPACE); + auto p = pub.advertise("shared", cfg); + track("/" + std::string{KMSG_NAMESPACE} + "_shared"); + + kickmsg::Node s1("sub_a", KMSG_NAMESPACE); + auto s1_h = s1.subscribe("shared"); + kickmsg::Node s2("sub_b", KMSG_NAMESPACE); + auto s2_h = s2.subscribe("shared"); + + auto reg = kickmsg::Registry::open_or_create(KMSG_NAMESPACE); + auto snap = reg.snapshot(); + EXPECT_EQ(snap.size(), 3u); + + std::unordered_set nodes; + for (auto const& part : snap) + { + nodes.insert(part.node_name); + } + EXPECT_TRUE(nodes.count("pub_a")); + EXPECT_TRUE(nodes.count("sub_a")); + EXPECT_TRUE(nodes.count("sub_b")); +} diff --git a/tests/unit/subscriber-t.cc b/tests/unit/subscriber-t.cc index 2634a1a..927bef8 100644 --- a/tests/unit/subscriber-t.cc +++ b/tests/unit/subscriber-t.cc @@ -442,7 +442,7 @@ TEST_F(SubscriberTest, SlowPublisherNoCorruption) // Wait for signal to start publishing while (not pub_start.load(std::memory_order_acquire)) { - kickmsg::sleep(0ns); + kickmsg::yield(); } // Publish several messages slowly (each takes > commit_timeout)