Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,15 @@ the two new fields: one cache line for `schema_state` (a
eight cache lines for `schema_data` (512 B). Pre-v4 binaries are
rejected at `open()` by the existing version check.

Version bumped `4 → 5` for the cross-process runtime counters
added to `SubRingHeader` (`dropped_count`, `lost_count` — see the
Subscriber Ring section for their layout and semantics). The
fields fit inside the existing `write_pos` cache-line padding, so
`sizeof(SubRingHeader)` is unchanged, but the in-memory meaning of
the bytes following `has_waiter` changed — v4 and v5 cannot share
a region. Pre-v5 binaries are rejected at `open()` by the existing
version check.


## Treiber Free Stack

Expand Down Expand Up @@ -325,12 +334,29 @@ Ring[0]
- **has_waiter** (atomic uint32): set by the subscriber before blocking
on `futex_wait`, cleared after. Publishers skip the `futex_wake_all`
syscall when no subscriber is sleeping.
- **dropped_count** (atomic uint64): cumulative count of publisher
delivery drops on this ring — incremented whenever the two-phase
commit CAS lock fails and the publisher falls through to the self-
repair / drop path. Cross-process visible; the per-Publisher
`dropped()` accessor reports the same events but counted per
instance.
- **lost_count** (atomic uint64): cumulative count of subscriber
losses on this ring — bumped on every `++lost_` site (drain-ahead
skip, stale-overwrite, invalid-slot, unpinnable, seqlock miss).
Cross-process visible; `Subscriber::lost()` reports the same events
per instance.
- **Sequence number** is monotonically increasing (`pos + 1`), used as a
seqlock for data consistency validation and as a commit barrier between
publishers (see Publish Flow below).
- Stale entries (sequence < subscriber's expected) are detected and
reported as lost messages.

`write_pos`, `has_waiter`, `dropped_count`, `lost_count` all share one
cache line: the hot path already owns this line when incrementing
`write_pos`, so bumping the counters on a rare drop/loss adds no new
coherency traffic. Different rings target different lines (128 B
stride), so publishers/subscribers on distinct rings never contend.

### Subscriber join and visibility window

A subscriber joins by CAS-ing a Free ring to Live. The CAS expects
Expand Down Expand Up @@ -1201,6 +1227,89 @@ Mailbox paths include the owner's node name because they are personal
reply channels -- the sender must know who to reply to.


## Registry & Discovery

Kickmsg's core IPC path is decentralized: no broker, no daemon, no
central registry. Channels are plain shared-memory regions that
producers and consumers find by name. That works well until an
operator asks *"what's running on this box?"* — which is the job of
the **participant registry**.

### What it is

A single dedicated shared-memory region per prefix, at
`/{prefix}_registry`, holding a fixed-size array of participant
entries. Each entry records one `(Node, channel, role)` membership:

- `pid` — OS process id of the owner
- `node_name` — the `Node` that registered
- `shm_name` — the channel region this participant is attached to
- `channel_type` — PubSub / Broadcast
- `role` — Publisher / Subscriber / Both (broadcast)
- `created_at_ns` — registration timestamp
- `state` — atomic `Free` / `Claiming` / `Active`

A `Node` lazily opens-or-creates the registry on its first
`advertise` / `subscribe` / `join_broadcast` / `create_mailbox` /
`open_mailbox` call, scans for the first `Free` slot, CAS-claims it
(`Free → Claiming`), writes the fields, then release-stores
`Active`. The `Node`'s destructor deregisters every slot it claimed.

The key property is **cross-platform parity**: Linux `/dev/shm` is
filesystem-visible, but macOS and Windows are not — we can't use
`readdir` to list topics there. Routing discovery through a regular
`SharedMemory` region means one code path, identical behaviour on
all three targets.

### State machine

```
Free (0) ── CAS ──► Claiming (1) ── release-store ──► Active (2)
▲ │
│ │
└────────── deregister: store-release ◄──────────────┘
(or sweep_stale: CAS(Active → Free))
```

Snapshots acquire-load `state` per slot; only `Active` entries are
returned. The `Claiming` state is the publication fence for the
field bytes — a reader observing `Active` is guaranteed to see all
the field writes that happened-before the release-store.

### Role upgrade

A `Node` that both advertises and subscribes to the same topic is
recorded as a single entry with `role = Both` rather than two
separate entries. `Node::touch_registry` detects the existing slot
and upgrades via deregister + re-register. Upgrades happen at
connect time only — zero hot-path cost.

### Liveness

The registry does not track heartbeats. A crashed process that
never ran its `Node` destructor leaves its entries stuck at
`Active` until reclaimed. Two recovery paths:

- **Query-time filter**: diagnostic tools probe each entry's pid via
`process_exists()` and hide dead entries from the user without
touching the registry. Non-invasive; safe under live traffic.
- **`Registry::sweep_stale()`**: iterates active entries and
CAS-resets any slot whose owner pid is gone. Opt-in cleanup for
an operator or supervisor sweep; not automatic.

Split deliberately so a concurrent read never rewrites the registry
under another process's feet.

### Sizing

Default capacity is 1024 slots × 256 B = 256 KB per prefix. A robot
telemetry Node typically holds a few hundred topics; the registry is
sized for an order of magnitude of headroom above that. Exhaustion
is non-fatal: `register_participant` returns `INVALID_SLOT` and the
Node continues to work without a discovery row — registration is a
diagnostic nicety, not a correctness dependency.


## Design Tradeoffs

### Silent data loss on slow subscribers
Expand Down
33 changes: 22 additions & 11 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,32 @@ if(WIN32)
src/os/windows/Time.cc
src/os/windows/Futex.cc
src/os/windows/SharedMemory.cc
src/os/windows/Process.cc
)
set(OS_LIBRARIES synchronization)
elseif(APPLE)
set(OS_SOURCES
src/os/darwin/Time.cc
src/os/darwin/Futex.cc
src/os/darwin/SharedMemory.cc
)
set(OS_LIBRARIES pthread)
else()
# Linux and macOS share most of the POSIX OS layer; only sleep(),
# create(), and the futex backend diverge.
set(OS_SOURCES
src/os/linux/Time.cc
src/os/linux/Futex.cc
src/os/linux/SharedMemory.cc
src/os/posix/Time.cc
src/os/posix/SharedMemory.cc
src/os/posix/Process.cc
)
set(OS_LIBRARIES rt pthread)
if(APPLE)
list(APPEND OS_SOURCES
src/os/darwin/Time.cc
src/os/darwin/SharedMemory.cc
src/os/darwin/Futex.cc
)
set(OS_LIBRARIES pthread)
else()
list(APPEND OS_SOURCES
src/os/linux/Time.cc
src/os/linux/SharedMemory.cc
src/os/linux/Futex.cc
)
set(OS_LIBRARIES rt pthread)
endif()
endif()

# --- Library ---
Expand All @@ -49,6 +59,7 @@ add_library(kickmsg STATIC
src/Publisher.cc
src/Subscriber.cc
src/Region.cc
src/Registry.cc
src/Node.cc
${OS_SOURCES}
)
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ static void run_latency(BenchConfig const& bc, bool zerocopy)

while (pub.send(payload.data(), payload.size()) < 0)
{
kickmsg::sleep(0ns);
kickmsg::yield();
}

if (zerocopy)
Expand Down
2 changes: 1 addition & 1 deletion examples/hello_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/// - "display" subscribes to "temperature" and prints them
///
/// Single-process for simplicity; in production, each node lives in
/// its own process sharing the same prefix.
/// its own process sharing the same namespace.

#include <cstdint>
#include <cstring>
Expand Down
19 changes: 19 additions & 0 deletions include/kickmsg/Hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <cstddef>
#include <cstdint>
#include <string_view>
#include <type_traits>

namespace kickmsg
{
Expand Down Expand Up @@ -39,6 +40,24 @@ namespace kickmsg
/// descriptor-string hashing is by far the most common use.
uint64_t fnv1a_64(std::string_view s) noexcept;

/// 64-bit FNV-1a of a trivially-copyable scalar or POD. Lets
/// callers chain fields without spelling out `&v, sizeof(v)`:
/// h = fnv1a_64(cfg.max_subscribers, h);
/// h = fnv1a_64(cfg.pool_size, h);
/// Pointers, nullptr, and string_view are excluded so the
/// dedicated overloads keep their priority.
template <typename T>
auto fnv1a_64(T const& value, uint64_t seed = FNV1A_64_OFFSET_BASIS) noexcept
-> std::enable_if_t<
std::is_trivially_copyable_v<T>
and not std::is_pointer_v<T>
and not std::is_null_pointer_v<T>
and not std::is_same_v<T, std::string_view>,
uint64_t>
{
return fnv1a_64(&value, sizeof(T), seed);
}

/// Convenience: pack a 64-bit FNV-1a of `descriptor` into the
/// leading eight bytes of a 64-byte identity slot, zero-padding
/// the remaining 56 bytes. Intended as a drop-in for filling
Expand Down
30 changes: 24 additions & 6 deletions include/kickmsg/Node.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "kickmsg/Region.h"
#include "kickmsg/Publisher.h"
#include "kickmsg/Subscriber.h"
#include "kickmsg/Registry.h"

namespace kickmsg
{
Expand All @@ -28,15 +29,19 @@ namespace kickmsg
class Node
{
public:
// Name components (node name, namespace/prefix, topic, channel,
// Name components (node name, namespace, topic, channel,
// owner, tag) are sanitized into a POSIX-shm-compatible form:
// leading '/' is stripped, interior '/' becomes '.', and any char
// outside [A-Za-z0-9._-] becomes '_'. This lets callers pass
// ROS-style paths like "/robot/arm/joint1" directly — the region
// ends up at "/<prefix>_robot.arm.joint1" in /dev/shm, still
// ends up at "/<namespace>_robot.arm.joint1" in /dev/shm, still
// human-readable (no hashing). A component that sanitizes to the
// empty string throws std::invalid_argument.
Node(std::string const& name, std::string const& prefix = "kickmsg");
Node(std::string const& name, std::string const& kmsg_namespace = "kickmsg");

/// Deregisters every participant entry this Node holds in the
/// namespace's registry.
~Node();

// Explicit non-copyable / move-only. Node already holds SharedRegion
// values (move-only), so it's non-copyable de facto; declaring it
Expand Down Expand Up @@ -123,8 +128,8 @@ namespace kickmsg
/// else got there first (read back with topic_schema()).
bool try_claim_topic_schema(char const* topic, SchemaInfo const& info);

std::string const& name() const { return name_; }
std::string const& prefix() const { return prefix_; }
std::string const& name() const { return name_; }
std::string const& kmsg_namespace() const { return namespace_; }

private:
std::string make_topic_name(char const* topic) const;
Expand All @@ -137,8 +142,17 @@ namespace kickmsg
SharedRegion* find_region(std::string const& shm_name);
SharedRegion const* find_region(std::string const& shm_name) const;

Registry& lazy_registry();

/// Register `shm_name` with `role`, or upgrade the existing entry
/// to `Both` if this Node already has one with the complementary
/// role. Registry failures are logged and swallowed.
void touch_registry(std::string const& shm_name,
channel::Type channel_type,
registry::Role role);

std::string name_;
std::string prefix_;
std::string namespace_;
// Keyed by SHM name for O(1) lookup. A telemetry node on a
// humanoid robot can easily hold 100-300 topics (joints × (meas,
// target) + cameras + IMUs + force sensors + hands), so O(N)
Expand All @@ -148,6 +162,10 @@ namespace kickmsg
// guarantees reference stability for elements (the mmap addresses
// used by Publisher/Subscriber don't move on rehash).
std::unordered_map<std::string, SharedRegion> regions_;

struct RegistrySlot { uint32_t slot_index; registry::Role role; };
std::unordered_map<std::string, RegistrySlot> registry_slots_;
std::optional<Registry> registry_;
};
}

Expand Down
44 changes: 44 additions & 0 deletions include/kickmsg/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,41 @@

#include <optional>
#include <string>
#include <vector>

#include "kickmsg/types.h"
#include "kickmsg/os/SharedMemory.h"

namespace kickmsg
{
/// Runtime snapshot of a single subscriber ring.
/// Values are relaxed/acquire-loaded, so the snapshot is internally
/// consistent per-ring but may race mildly across rings — fine for a
/// diagnostic view; not intended as a strongly-consistent read.
struct RingStats
{
uint32_t state; ///< ring::State as a raw int (0=Free, 1=Live, 2=Draining)
uint32_t in_flight; ///< Publishers currently admitted to this ring
uint64_t write_pos; ///< Monotonic claim counter (rough throughput proxy)
uint64_t dropped_count; ///< Cumulative publisher drops on this ring
uint64_t lost_count; ///< Cumulative subscriber losses on this ring
};

/// Aggregate region snapshot returned by SharedRegion::stats().
/// Safe to call under live traffic: all reads are relaxed/acquire,
/// no writes.
struct RegionStats
{
std::vector<RingStats> rings; ///< One entry per subscriber-ring slot (length == max_subs)
uint64_t total_writes; ///< Sum of write_pos over Live rings
uint64_t total_drops; ///< Sum of dropped_count across all rings
uint64_t total_losses; ///< Sum of lost_count across all rings
uint64_t live_rings; ///< Number of rings currently Live
uint64_t pool_free; ///< Approximate free-slot count (walks Treiber stack — racy under churn)
uint64_t pool_size; ///< Total pool capacity (static)
};


class SharedRegion
{
public:
Expand Down Expand Up @@ -134,6 +163,21 @@ namespace kickmsg
/// Returns the number of rings reset.
std::size_t reset_retired_rings();

/// Runtime counter snapshot — safe under live traffic.
///
/// Reads the cross-process per-ring counters (`write_pos`,
/// `dropped_count`, `lost_count`) plus ring state and an approximate
/// pool-free count. Intended for external monitoring and the CLI's
/// `stats` / `watch` subcommands.
///
/// Cheap (no syscalls, no locks, a handful of atomic loads) but not a
/// strongly-consistent view: individual per-ring values are consistent
/// with themselves (sequential loads on one variable), but different
/// rings may be read at slightly different instants. The free-stack
/// walk for `pool_free` is bounded by `pool_size` so it can't loop
/// forever under racing pushes/pops.
RegionStats stats() const;

/// Reclaim orphaned slots (refcount > 0 but not referenced by any ring entry).
/// These are caused by publisher crashes between allocate and publish, or by
/// skipped drain on subscriber teardown timeout.
Expand Down
Loading
Loading