Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 176 additions & 5 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,14 @@ region.
first, here's what they wrote, you decide." The library never
throws on schema grounds.

Version bumped `3 → 4` because the `Header` binary layout grew by
the two new fields: one cache line for `schema_state` (a
`uint32_t` atomic padded to 64 B by `alignas(CACHE_LINE)`) plus
eight cache lines for `schema_data` (512 B). Pre-v4 binaries are
rejected at `open()` by the existing version check.
Version bumped `4 → 5` for the cross-process runtime counters
added to `SubRingHeader` (`dropped_count`, `lost_count` — see the
Subscriber Ring section for their layout and semantics). The
fields fit inside the existing `write_pos` cache-line padding, so
`sizeof(SubRingHeader)` is unchanged, but the in-memory meaning of
the bytes following `has_waiter` changed — v4 and v5 cannot share
a region. Pre-v5 binaries are rejected at `open()` by the existing
version check.


## Treiber Free Stack
Expand Down Expand Up @@ -325,12 +328,29 @@ Ring[0]
- **has_waiter** (atomic uint32): set by the subscriber before blocking
on `futex_wait`, cleared after. Publishers skip the `futex_wake_all`
syscall when no subscriber is sleeping.
- **dropped_count** (atomic uint64): cumulative count of publisher
delivery drops on this ring — incremented whenever the two-phase
commit CAS lock fails and the publisher falls through to the self-
repair / drop path. Cross-process visible; the per-Publisher
`dropped()` accessor reports the same events but counted per
instance.
- **lost_count** (atomic uint64): cumulative count of subscriber
losses on this ring — bumped on every `++lost_` site (drain-ahead
skip, stale-overwrite, invalid-slot, unpinnable, seqlock miss).
Cross-process visible; `Subscriber::lost()` reports the same events
per instance.
- **Sequence number** is monotonically increasing (`pos + 1`), used as a
seqlock for data consistency validation and as a commit barrier between
publishers (see Publish Flow below).
- Stale entries (sequence < subscriber's expected) are detected and
reported as lost messages.

`write_pos`, `has_waiter`, `dropped_count`, `lost_count` all share one
cache line: the hot path already owns this line when incrementing
`write_pos`, so bumping the counters on a rare drop/loss adds no new
coherency traffic. Different rings target different lines (128 B
stride), so publishers/subscribers on distinct rings never contend.

### Subscriber join and visibility window

A subscriber joins by CAS-ing a Free ring to Live. The CAS expects
Expand Down Expand Up @@ -1201,6 +1221,157 @@ Mailbox paths include the owner's node name because they are personal
reply channels -- the sender must know who to reply to.


## Registry & Discovery

Kickmsg's core IPC path is decentralized: no broker, no daemon, no
central registry. Channels are plain shared-memory regions that
producers and consumers find by name. That works well until an
operator asks *"what's running on this box?"* — which is the job of
the **participant registry**.

### What it is

A single dedicated shared-memory region per namespace, at
`/{namespace}_registry`, holding a fixed-size array of participant
entries. Each entry records one `(Node, channel, role)` membership.
All scalar fields are `std::atomic<T>` so that the seqlock protocol
between the writer and a concurrent new-tenant claim never involves
plain non-atomic accesses on the same bytes:

- `state` — atomic `Free` / `Claiming` / `Active` / `Reclaiming`
- `generation` — atomic counter bumped on every claim and release (seqlock)
- `pid` — atomic; OS process id of the owner
- `pid_starttime` — atomic; opaque OS-specific process start time
- `channel_type` — atomic; PubSub / Broadcast
- `role` — atomic; Publisher / Subscriber / Both
- `kind` — atomic; Pubsub / Broadcast / Mailbox (registry-level intent tag)
- `topic_name` — user-facing path (leading `/`, interior `/` preserved)
- `shm_name` — implementation-level POSIX path
- `node_name` — the `Node` that registered
- `created_at_ns` — atomic; registration timestamp

A `Node` lazily opens-or-creates the registry on its first
`advertise` / `subscribe` / `join_broadcast` / `create_mailbox` /
`open_mailbox` call, scans for the first `Free` slot, CAS-claims it
(`Free → Claiming`), writes the fields, then release-stores
`Active`. The `Node`'s destructor deregisters every slot it claimed.

The key property is **cross-platform parity**: Linux `/dev/shm` is
filesystem-visible, but macOS and Windows are not — we can't use
`readdir` to list topics there. Routing discovery through a regular
`SharedMemory` region means one code path, identical behaviour on
all three targets.

### State machine

```
Free (0) ── CAS ──► Claiming (1) ── release-store ──► Active (2)
▲ │
│ │
├────────── deregister: store-release ◄──────────────┤
│ │
│ sweep_stale: │
└── CAS(Reclaiming → Free) ◄── CAS(Active | Claiming → Reclaiming)
```

Snapshots acquire-load `state` per slot; only `Active` entries are
returned. The `Claiming` state is the publication fence for the
field bytes — a reader observing `Active` is guaranteed to see all
the field writes that happened-before the release-store.

`Reclaiming` is the exclusive lock held by `sweep_stale` while it
verifies the dead-pid condition and finalizes the slot to `Free`.
It blocks concurrent registrants (they need `Free → Claiming`) and
prevents ABA on the state CAS: without it, a full `dereg + register`
cycle on another CPU could restore the slot to `Active` between the
sweeper's pid check and its CAS, causing the sweeper to stomp a live
tenant's registration. `sweep_stale` releases `Reclaiming` back to
the pre-CAS value if the re-verified pid differs from what it
observed (ABA detected), so the live tenant is restored.

### Role upgrade

A `Node` that both advertises and subscribes to the same topic is
recorded as a single entry with `role = Both` rather than two
separate entries. `Node::touch_registry` detects the existing slot
and upgrades via deregister + re-register. Upgrades happen at
connect time only — zero hot-path cost.

### Liveness

The registry does not track heartbeats. A crashed process that
never ran its `Node` destructor leaves its entries stuck at
`Active` (or `Claiming`, if it died mid-register) until reclaimed.
Two recovery paths:

- **Query-time filter**: diagnostic tools probe each entry's pid via
`process_exists()` and hide dead entries from the user without
touching the registry. Non-invasive; safe under live traffic.
- **`Registry::sweep_stale()`**: CAS-resets any `Active` or `Claiming`
slot whose `pid` is dead. Opt-in cleanup for an operator or
supervisor sweep; also called opportunistically from
`register_participant` when the registry is full, so long-running
deployments don't silently drop new registrations as crashed-process
residue accumulates.

The `Claiming` reclaim branch skips slots where `pid == 0`: that state
is the brief window between the `Free→Claiming` CAS and the
registrant's first field store. Claiming the slot in that window
would stomp a live registrant. Cost: an early crash (before the pid
store) leaks one slot until the region is unlinked.

**PID-reuse mitigation.** `pid_starttime` is captured at register
time: `/proc/<pid>/stat` on Linux, `sysctl(KERN_PROC_PID)` on Darwin,
`GetProcessTimes()` on Windows. Sweep compares the stored starttime
against the live process's starttime; a PID-reuse after wraparound
yields a different value and we reclaim. If the OS returns 0 (the
process is gone by the time we query), the check degrades to
PID-alone.

### Sizing

Default capacity is 4096 slots × 512 B = 2 MB per namespace. A robot
telemetry Node typically holds a few hundred topics; the registry is
sized for an order of magnitude of headroom above that. Exhaustion
is non-fatal: `register_participant` returns `INVALID_SLOT` and the
Node continues to work without a discovery row — registration is a
diagnostic nicety, not a correctness dependency.

### Implicit invariants

Load-bearing assumptions for anyone editing the registry:

- **Field order is ABI.** `sizeof(ParticipantEntry) == 512` and
`offsetof(…, _padding) == 368` are statically asserted; any field
reorder or resize must update the padding and bump `registry::VERSION`.
- **State publication fence.** `state.store(Active, release)` in
`register_participant` is the one fence that publishes all field
writes that preceded it. `pid` has its own earlier release-store so
`sweep_stale` can acquire-load it while state is still `Claiming`
(before the Active fence). Any new field that needs to be visible
during `Claiming` must use its own release/acquire pair.
- **Generation bump on every mutation.** `generation` is bumped by
`register_participant` *and* by `deregister` *and* by
`sweep_stale`'s reclaim path. A snapshot's seqlock recheck detects
only mutations that bump gen — adding a future write-path that
modifies fields without bumping gen will cause torn reads.
- **`touch_registry` must never throw.** `Node::advertise` and friends
treat registration as best-effort. A failure is logged once per
`Node` (latched via `registry_disabled_`) and subsequent calls
become no-ops. Don't change this contract without also changing
`Node::advertise`'s error handling.
- **Role upgrade has a brief visibility gap.** `touch_registry`
upgrades Publisher/Subscriber → Both via `deregister` + re-register
(no atomic "change role in place" primitive). A concurrent
`snapshot()` may see this Node absent for ~1 µs during the swap.
Callers treating an empty `list_topics` result as "definitely
absent" are wrong; the right reading is "absent or in a brief gap."
- **`Kind` and ring state enums are stringified in two places.** The
native binding's `__repr__` methods and the Python `_KIND_NAME` /
ring-state maps in `diagnostics.py` must stay in sync when a new
enum value is added.


## Design Tradeoffs

### Silent data loss on slow subscribers
Expand Down
35 changes: 24 additions & 11 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,34 @@ if(WIN32)
src/os/windows/Time.cc
src/os/windows/Futex.cc
src/os/windows/SharedMemory.cc
src/os/windows/Process.cc
)
set(OS_LIBRARIES synchronization)
elseif(APPLE)
set(OS_SOURCES
src/os/darwin/Time.cc
src/os/darwin/Futex.cc
src/os/darwin/SharedMemory.cc
)
set(OS_LIBRARIES pthread)
else()
# Linux and macOS share most of the POSIX OS layer; only sleep(),
# create(), and the futex backend diverge.
set(OS_SOURCES
src/os/linux/Time.cc
src/os/linux/Futex.cc
src/os/linux/SharedMemory.cc
src/os/posix/Time.cc
src/os/posix/SharedMemory.cc
src/os/posix/Process.cc
)
set(OS_LIBRARIES rt pthread)
if(APPLE)
list(APPEND OS_SOURCES
src/os/darwin/Time.cc
src/os/darwin/SharedMemory.cc
src/os/darwin/Futex.cc
src/os/darwin/Process.cc
)
set(OS_LIBRARIES pthread)
else()
list(APPEND OS_SOURCES
src/os/linux/Time.cc
src/os/linux/SharedMemory.cc
src/os/linux/Futex.cc
src/os/linux/Process.cc
)
set(OS_LIBRARIES rt pthread)
endif()
endif()

# --- Library ---
Expand All @@ -49,6 +61,7 @@ add_library(kickmsg STATIC
src/Publisher.cc
src/Subscriber.cc
src/Region.cc
src/Registry.cc
src/Node.cc
${OS_SOURCES}
)
Expand Down
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,49 @@ region.reset_retired_rings();
region.reclaim_orphaned_slots();
```

## CLI (`kickmsg`)

Installing the Python wheel puts a `kickmsg` command on `$PATH` that
inspects running channels via a shared participant registry (one per
namespace, backed by a SHM region at `/{namespace}_registry`). Works
identically on Linux, macOS, and Windows — no `/dev/shm` filesystem
walk required.

```bash
kickmsg list # topic-centric enumeration
kickmsg list -o name,pub,sub,stall # ps-style column selection
kickmsg info <shm> # static header metadata
kickmsg stats <shm> # runtime counters (write_pos / dropped / lost)
kickmsg watch <shm> # top-like live view with msg/s rates
kickmsg diagnose <shm> # wraps SharedRegion::diagnose()
kickmsg repair <shm> [--locked] # run repair primitives
kickmsg schema <shm> # focused schema descriptor view
kickmsg schema-diff <a> <b> # field-by-field schema comparison
```

All subcommands accept `--json` for scripting.

### Programmatic use (GUIs, exporters)

The same data the CLI renders is available as typed dataclasses through
`kickmsg.diagnostics`, so a GUI can consume it without shelling out:

```python
from kickmsg import diagnostics as diag

for topic in diag.list_topics(namespace="kickmsg"):
print(topic.shm_name, len(topic.producers), len(topic.consumers))

stats = diag.stats("/kickmsg_telemetry")
for ring in stats.rings:
if ring.state == "live":
print(ring.write_pos, ring.dropped_count, ring.lost_count)

# Live updates (generator — caller drives the loop)
for frame in diag.watch("/kickmsg_telemetry", interval=1.0):
gui.update(frame.stats, frame.rates_msg_per_sec)
```

## Building

### Prerequisites
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ static void run_latency(BenchConfig const& bc, bool zerocopy)

while (pub.send(payload.data(), payload.size()) < 0)
{
kickmsg::sleep(0ns);
kickmsg::yield();
}

if (zerocopy)
Expand Down
2 changes: 1 addition & 1 deletion examples/hello_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/// - "display" subscribes to "temperature" and prints them
///
/// Single-process for simplicity; in production, each node lives in
/// its own process sharing the same prefix.
/// its own process sharing the same namespace.

#include <cstdint>
#include <cstring>
Expand Down
Loading
Loading