Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/hello_zerocopy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ int main()
// Publish a few "frames"
for (uint32_t i = 0; i < 3; ++i)
{
void* ptr = pub.allocate(sizeof(ImageHeader));
auto [ptr, max_size] = pub.allocate();
if (ptr == nullptr)
{
std::cerr << "Pool exhausted at frame " << i << "\n";
Expand All @@ -53,7 +53,7 @@ int main()

ImageHeader hdr{640, 480, 3, i};
std::memcpy(ptr, &hdr, sizeof(hdr));
pub.publish();
pub.publish(sizeof(hdr));

std::cout << "Published frame " << i << " (640x480x3)\n";
}
Expand Down
13 changes: 7 additions & 6 deletions examples/python/hello_camera_zerocopy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
AllocatedSlot / SampleView buffer-protocol path was designed for.

Shows how to:
1. Reserve a slot directly in shared memory via Publisher.allocate(size),
then obtain a writable memoryview with `memoryview(slot)` and fill it
in-place (no intermediate buffer). In a real pipeline you'd:
1. Reserve a slot directly in shared memory via Publisher.allocate(),
which returns an AllocatedSlot sized to max_payload_size. Obtain a
writable memoryview with `memoryview(slot)` and fill it in-place
(no intermediate buffer). In a real pipeline you'd:
- numpy.copyto(np.asarray(slot).reshape(H, W, 3), frame), or
- DMA from a V4L2 buffer into the slot, or
- render directly into the slot.
Then call `slot.publish()` to commit.
Then call `slot.publish(n)` with the number of bytes actually written.
2. Receive it zero-copy on the other side via Subscriber.try_receive_view
and `memoryview(view)` — read-only, no copy either. The memoryview
pins the SampleView alive so the slot stays valid until every
Expand Down Expand Up @@ -53,12 +54,12 @@ def main() -> int:
for i in range(3):
# Zero-copy capture: reserve a slot, write directly into SHM via
# a memoryview, publish. Nothing is copied on the publish side.
slot = pub.allocate(frame_bytes)
slot = pub.allocate()
if slot is None:
print(f"frame {i}: pool exhausted, dropping")
continue
fill_frame(memoryview(slot), height, width)
slot.publish()
slot.publish(frame_bytes)
print(f"Published frame {i} ({width}x{height} RGB, {frame_bytes} B zero-copy)")

for i in range(3):
Expand Down
23 changes: 17 additions & 6 deletions include/kickmsg/Publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@

namespace kickmsg
{
/// Reservation returned by Publisher::allocate(): a writable pointer
/// into shared memory plus the maximum number of bytes the caller may
/// write through it. data == nullptr signals pool exhaustion.
struct Allocation
{
void* data;
std::size_t max_size;
};

class Publisher
{
public:
Expand All @@ -15,7 +24,6 @@ namespace kickmsg
, header_{region.header()}
, commit_timeout_{microseconds{header_->commit_timeout_us}}
, pending_slot_{INVALID_SLOT}
, pending_len_{0}
{
}

Expand All @@ -29,7 +37,6 @@ namespace kickmsg
, header_{other.header_}
, commit_timeout_{other.commit_timeout_}
, pending_slot_{other.pending_slot_}
, pending_len_{other.pending_len_}
, dropped_{other.dropped_}
{
other.pending_slot_ = INVALID_SLOT;
Expand All @@ -44,15 +51,20 @@ namespace kickmsg
header_ = other.header_;
commit_timeout_ = other.commit_timeout_;
pending_slot_ = other.pending_slot_;
pending_len_ = other.pending_len_;
dropped_ = other.dropped_;
other.pending_slot_ = INVALID_SLOT;
}
return *this;
}

void* allocate(std::size_t len);
std::size_t publish();
/// Reserve a slot. Returns {data, max_size}; data is nullptr if
/// the pool is exhausted.
Allocation allocate();

/// Commit the currently reserved slot, recording `len` as the
/// payload size. No bounds check: caller guarantees
/// `len <= max_size` from the preceding allocate().
std::size_t publish(std::size_t len);

/// Allocate, copy, and publish in one call.
/// Returns bytes written on success, -EMSGSIZE if too large, -EAGAIN if pool exhausted.
Expand All @@ -71,7 +83,6 @@ namespace kickmsg
Header* header_;
microseconds commit_timeout_;
uint32_t pending_slot_;
uint32_t pending_len_;
uint64_t dropped_{0};
};
}
Expand Down
86 changes: 49 additions & 37 deletions py_bindings/src/kickmsg_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,21 @@
///
/// Zero-copy contract (lifetime-safe via the Python buffer protocol):
///
/// slot = pub.allocate(N) → AllocatedSlot. memoryview(slot) is a
/// writable view into the SHM slot. The
/// memoryview pins the slot, which pins
/// the Publisher, which pins the mmap —
/// so retained memoryviews stay valid
/// (at the mmap level) as long as Python
/// holds them.
/// slot.publish() → commits. NEW memoryview(slot) after
/// this raises BufferError. Memoryviews
/// obtained BEFORE publish remain pointer-
/// valid but writing through them after
/// publish would corrupt in-flight
/// subscribers — user contract: don't.
/// slot = pub.allocate() → AllocatedSlot sized to max_payload_size.
/// memoryview(slot) is a writable view into
/// the SHM slot. The memoryview pins the
/// slot, which pins the Publisher, which
/// pins the mmap — so retained memoryviews
/// stay valid (at the mmap level) as long
/// as Python holds them.
/// slot.publish(n) → commits, recording `n` bytes as the
/// payload size. NEW memoryview(slot)
/// after this raises BufferError.
/// Memoryviews obtained BEFORE publish
/// remain pointer-valid but writing
/// through them after publish would
/// corrupt in-flight subscribers — user
/// contract: don't.
///
/// view = sub.try_receive_view() → SampleView. memoryview(view) is a
/// read-only view into the SHM slot. The
Expand Down Expand Up @@ -85,10 +87,10 @@ using namespace nb::literals;
namespace kickmsg
{
// Python-only wrapper around a Publisher reservation. Holds the slot
// pointer + length returned by Publisher::allocate(), exposes the
// writable buffer protocol so `memoryview(slot)` points directly into
// the shared-memory slot (zero-copy), and has a .publish() method
// that commits via the Publisher.
// pointer and max payload size returned by Publisher::allocate(),
// exposes the writable buffer protocol so `memoryview(slot)` points
// directly into the shared-memory slot (zero-copy), and has a
// .publish(n) method that commits `n` bytes via the Publisher.
//
// Lifetime: the Py_buffer obtained through buffer protocol pins this
// AllocatedSlot alive (view->obj = self; Py_INCREF), which in turn
Expand All @@ -101,11 +103,11 @@ namespace kickmsg
{
Publisher* publisher;
void* ptr;
std::size_t len;
std::size_t max_size;
bool published;

PyAllocatedSlot(Publisher& p, void* data, std::size_t n)
: publisher{&p}, ptr{data}, len{n}, published{false}
PyAllocatedSlot(Publisher& p, void* data, std::size_t cap)
: publisher{&p}, ptr{data}, max_size{cap}, published{false}
{
}
};
Expand Down Expand Up @@ -176,7 +178,7 @@ namespace
view->buf = slot->ptr;
view->obj = self;
Py_INCREF(self);
view->len = static_cast<Py_ssize_t>(slot->len);
view->len = static_cast<Py_ssize_t>(slot->max_size);
view->itemsize = 1;
view->readonly = 0; // writable
view->ndim = 1;
Expand Down Expand Up @@ -633,26 +635,36 @@ namespace kickmsg
nb::class_<PyAllocatedSlot>(m, "AllocatedSlot",
nb::type_slots(as_slots))
.def("publish",
[](PyAllocatedSlot& s) -> std::size_t
[](PyAllocatedSlot& s, std::size_t len) -> std::size_t
{
if (s.published)
{
throw nb::value_error(
"AllocatedSlot.publish() called more than once");
}
if (len > s.max_size)
{
throw nb::value_error(
"publish(len) exceeds slot max_size");
}
s.published = true;
return s.publisher->publish();
return s.publisher->publish(len);
},
"Commit the reserved slot. Returns the number of rings "
"the sample was delivered to. After this call, any NEW "
"memoryview(slot) fails with BufferError.")
"len"_a,
"Commit the reserved slot, recording `len` bytes as the "
"payload size. Returns the number of rings the sample was "
"delivered to. After this call, any NEW memoryview(slot) "
"fails with BufferError.")
.def("__len__",
[](PyAllocatedSlot const& s) -> std::size_t { return s.len; })
[](PyAllocatedSlot const& s) -> std::size_t { return s.max_size; })
.def_prop_ro("max_size",
[](PyAllocatedSlot const& s) -> std::size_t { return s.max_size; })
.def_prop_ro("published",
[](PyAllocatedSlot const& s) -> bool { return s.published; })
.def("__repr__", [](PyAllocatedSlot const& s)
{
return std::string{"AllocatedSlot(len="} + std::to_string(s.len) +
return std::string{"AllocatedSlot(max_size="} +
std::to_string(s.max_size) +
", published=" + (s.published ? "True" : "False") + ")";
});

Expand Down Expand Up @@ -701,26 +713,26 @@ namespace kickmsg
"the message exceeds max_payload_size, BlockingIOError if "
"the slot pool is exhausted, OSError on other failures.")
.def("allocate",
[](Publisher& p, std::size_t len) -> std::optional<PyAllocatedSlot>
[](Publisher& p) -> std::optional<PyAllocatedSlot>
{
void* ptr = p.allocate(len);
if (ptr == nullptr)
auto a = p.allocate();
if (a.data == nullptr)
{
return std::nullopt;
}
return PyAllocatedSlot{p, ptr, len};
return PyAllocatedSlot{p, a.data, a.max_size};
},
"len"_a,
// keep_alive<0, 1>: the returned AllocatedSlot (arg 0)
// must pin the Publisher (arg 1 = self). Memoryviews
// obtained from the slot in turn pin the AllocatedSlot
// (via Py_buffer::obj), so the full chain is
// memoryview → AllocatedSlot → Publisher → SharedRegion.
nb::keep_alive<0, 1>(),
"Reserve a slot of `len` bytes and return an AllocatedSlot. "
"Use memoryview(slot) or numpy.asarray(slot) to fill it "
"in place (zero-copy), then call slot.publish(). Returns "
"None if the pool is exhausted.")
"Reserve a slot sized to max_payload_size and return an "
"AllocatedSlot. Use memoryview(slot) or numpy.asarray(slot) "
"to write up to slot.max_size bytes in place (zero-copy), "
"then call slot.publish(n) with the actual number of bytes "
"written. Returns None if the pool is exhausted.")
.def_prop_ro("dropped", &Publisher::dropped,
"Per-ring delivery drops (CAS contention or pool exhaustion).")
.def("__repr__", [](Publisher const& p)
Expand Down
27 changes: 9 additions & 18 deletions src/Publisher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,35 @@ namespace kickmsg
auto* slot = slot_at(base_, header_, pending_slot_);
treiber_push(header_->free_top, slot, pending_slot_);
pending_slot_ = INVALID_SLOT;
pending_len_ = 0;
}
}

void* Publisher::allocate(std::size_t len)
Allocation Publisher::allocate()
{
if (len > header_->slot_data_size)
{
return nullptr;
}

// Release any previously allocated but unpublished slot.
release_pending();

uint32_t slot_idx = treiber_pop(header_->free_top, base_, header_);
if (slot_idx == INVALID_SLOT)
{
return nullptr;
return Allocation{nullptr, 0};
}

pending_slot_ = slot_idx;
pending_len_ = static_cast<uint32_t>(len);

auto* slot = slot_at(base_, header_, slot_idx);
return slot_data(slot);
return Allocation{slot_data(slot), header_->slot_data_size};
}

std::size_t Publisher::publish()
std::size_t Publisher::publish(std::size_t len)
{
if (pending_slot_ == INVALID_SLOT)
{
return 0;
}

uint32_t slot_idx = pending_slot_;
uint32_t len = pending_len_;
pending_slot_ = INVALID_SLOT;
pending_len_ = 0;

auto* slot = slot_at(base_, header_, slot_idx);
uint64_t capacity = header_->sub_ring_capacity;
Expand Down Expand Up @@ -207,7 +198,7 @@ namespace kickmsg
// We exclusively own this entry. No other publisher can CAS from
// LOCKED_SEQUENCE since they expect prev_seq.
e.slot_idx.store(slot_idx, std::memory_order_relaxed);
e.payload_len.store(len, std::memory_order_relaxed);
e.payload_len.store(static_cast<uint32_t>(len), std::memory_order_relaxed);

// Release-store commits the entry: subscribers and future publishers
// at this position will see all preceding stores.
Expand Down Expand Up @@ -249,14 +240,14 @@ namespace kickmsg
return -EMSGSIZE;
}

auto* ptr = allocate(len);
if (ptr == nullptr)
auto a = allocate();
if (a.data == nullptr)
{
return -EAGAIN;
}

std::memcpy(ptr, data, len);
publish();
std::memcpy(a.data, data, len);
publish(len);
return static_cast<int32_t>(len);
}

Expand Down
16 changes: 8 additions & 8 deletions tests/crash_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ static void child_publisher_main(int /*round*/)

for (uint32_t i = 0; ; ++i)
{
auto* ptr = pub.allocate(sizeof(CrashPayload));
if (ptr == nullptr)
auto a = pub.allocate();
if (a.data == nullptr)
{
kickmsg::yield();
continue;
Expand All @@ -68,9 +68,9 @@ static void child_publisher_main(int /*round*/)
msg.magic = CrashPayload::MAGIC;
msg.seq = i;
msg.checksum = compute_checksum(msg);
std::memcpy(ptr, &msg, sizeof(msg));
std::memcpy(a.data, &msg, sizeof(msg));

pub.publish();
pub.publish(sizeof(msg));
}
}

Expand Down Expand Up @@ -369,8 +369,8 @@ static bool test_multi_publisher_crash()
kickmsg::Publisher p(r);
for (uint32_t seq = 0; ; ++seq)
{
auto* ptr = p.allocate(sizeof(CrashPayload));
if (ptr == nullptr)
auto a = p.allocate();
if (a.data == nullptr)
{
kickmsg::yield();
continue;
Expand All @@ -379,8 +379,8 @@ static bool test_multi_publisher_crash()
msg.magic = CrashPayload::MAGIC;
msg.seq = seq;
msg.checksum = compute_checksum(msg);
std::memcpy(ptr, &msg, sizeof(msg));
p.publish();
std::memcpy(a.data, &msg, sizeof(msg));
p.publish(sizeof(msg));
}
}
}
Expand Down
Loading
Loading