diff --git a/examples/hello_zerocopy.cc b/examples/hello_zerocopy.cc index cdc82f5..5d144d0 100644 --- a/examples/hello_zerocopy.cc +++ b/examples/hello_zerocopy.cc @@ -44,7 +44,7 @@ int main() // Publish a few "frames" for (uint32_t i = 0; i < 3; ++i) { - void* ptr = pub.allocate(sizeof(ImageHeader)); + auto [ptr, max_size] = pub.allocate(); if (ptr == nullptr) { std::cerr << "Pool exhausted at frame " << i << "\n"; @@ -53,7 +53,7 @@ int main() ImageHeader hdr{640, 480, 3, i}; std::memcpy(ptr, &hdr, sizeof(hdr)); - pub.publish(); + pub.publish(sizeof(hdr)); std::cout << "Published frame " << i << " (640x480x3)\n"; } diff --git a/examples/python/hello_camera_zerocopy.py b/examples/python/hello_camera_zerocopy.py index 67a98b6..ff347ad 100644 --- a/examples/python/hello_camera_zerocopy.py +++ b/examples/python/hello_camera_zerocopy.py @@ -2,13 +2,14 @@ AllocatedSlot / SampleView buffer-protocol path was designed for. Shows how to: - 1. Reserve a slot directly in shared memory via Publisher.allocate(size), - then obtain a writable memoryview with `memoryview(slot)` and fill it - in-place (no intermediate buffer). In a real pipeline you'd: + 1. Reserve a slot directly in shared memory via Publisher.allocate(), + which returns an AllocatedSlot sized to max_payload_size. Obtain a + writable memoryview with `memoryview(slot)` and fill it in-place + (no intermediate buffer). In a real pipeline you'd: - numpy.copyto(np.asarray(slot).reshape(H, W, 3), frame), or - DMA from a V4L2 buffer into the slot, or - render directly into the slot. - Then call `slot.publish()` to commit. + Then call `slot.publish(n)` with the number of bytes actually written. 2. Receive it zero-copy on the other side via Subscriber.try_receive_view and `memoryview(view)` — read-only, no copy either. The memoryview pins the SampleView alive so the slot stays valid until every @@ -53,12 +54,12 @@ def main() -> int: for i in range(3): # Zero-copy capture: reserve a slot, write directly into SHM via # a memoryview, publish. Nothing is copied on the publish side. - slot = pub.allocate(frame_bytes) + slot = pub.allocate() if slot is None: print(f"frame {i}: pool exhausted, dropping") continue fill_frame(memoryview(slot), height, width) - slot.publish() + slot.publish(frame_bytes) print(f"Published frame {i} ({width}x{height} RGB, {frame_bytes} B zero-copy)") for i in range(3): diff --git a/include/kickmsg/Publisher.h b/include/kickmsg/Publisher.h index cb05df4..c57455a 100644 --- a/include/kickmsg/Publisher.h +++ b/include/kickmsg/Publisher.h @@ -7,6 +7,15 @@ namespace kickmsg { + /// Reservation returned by Publisher::allocate(): a writable pointer + /// into shared memory plus the maximum number of bytes the caller may + /// write through it. data == nullptr signals pool exhaustion. + struct Allocation + { + void* data; + std::size_t max_size; + }; + class Publisher { public: @@ -15,7 +24,6 @@ namespace kickmsg , header_{region.header()} , commit_timeout_{microseconds{header_->commit_timeout_us}} , pending_slot_{INVALID_SLOT} - , pending_len_{0} { } @@ -29,7 +37,6 @@ namespace kickmsg , header_{other.header_} , commit_timeout_{other.commit_timeout_} , pending_slot_{other.pending_slot_} - , pending_len_{other.pending_len_} , dropped_{other.dropped_} { other.pending_slot_ = INVALID_SLOT; @@ -44,15 +51,20 @@ namespace kickmsg header_ = other.header_; commit_timeout_ = other.commit_timeout_; pending_slot_ = other.pending_slot_; - pending_len_ = other.pending_len_; dropped_ = other.dropped_; other.pending_slot_ = INVALID_SLOT; } return *this; } - void* allocate(std::size_t len); - std::size_t publish(); + /// Reserve a slot. Returns {data, max_size}; data is nullptr if + /// the pool is exhausted. + Allocation allocate(); + + /// Commit the currently reserved slot, recording `len` as the + /// payload size. No bounds check: caller guarantees + /// `len <= max_size` from the preceding allocate(). + std::size_t publish(std::size_t len); /// Allocate, copy, and publish in one call. /// Returns bytes written on success, -EMSGSIZE if too large, -EAGAIN if pool exhausted. @@ -71,7 +83,6 @@ namespace kickmsg Header* header_; microseconds commit_timeout_; uint32_t pending_slot_; - uint32_t pending_len_; uint64_t dropped_{0}; }; } diff --git a/py_bindings/src/kickmsg_py.cc b/py_bindings/src/kickmsg_py.cc index 4f02fd8..1754438 100644 --- a/py_bindings/src/kickmsg_py.cc +++ b/py_bindings/src/kickmsg_py.cc @@ -27,19 +27,21 @@ /// /// Zero-copy contract (lifetime-safe via the Python buffer protocol): /// -/// slot = pub.allocate(N) → AllocatedSlot. memoryview(slot) is a -/// writable view into the SHM slot. The -/// memoryview pins the slot, which pins -/// the Publisher, which pins the mmap — -/// so retained memoryviews stay valid -/// (at the mmap level) as long as Python -/// holds them. -/// slot.publish() → commits. NEW memoryview(slot) after -/// this raises BufferError. Memoryviews -/// obtained BEFORE publish remain pointer- -/// valid but writing through them after -/// publish would corrupt in-flight -/// subscribers — user contract: don't. +/// slot = pub.allocate() → AllocatedSlot sized to max_payload_size. +/// memoryview(slot) is a writable view into +/// the SHM slot. The memoryview pins the +/// slot, which pins the Publisher, which +/// pins the mmap — so retained memoryviews +/// stay valid (at the mmap level) as long +/// as Python holds them. +/// slot.publish(n) → commits, recording `n` bytes as the +/// payload size. NEW memoryview(slot) +/// after this raises BufferError. +/// Memoryviews obtained BEFORE publish +/// remain pointer-valid but writing +/// through them after publish would +/// corrupt in-flight subscribers — user +/// contract: don't. /// /// view = sub.try_receive_view() → SampleView. memoryview(view) is a /// read-only view into the SHM slot. The @@ -85,10 +87,10 @@ using namespace nb::literals; namespace kickmsg { // Python-only wrapper around a Publisher reservation. Holds the slot - // pointer + length returned by Publisher::allocate(), exposes the - // writable buffer protocol so `memoryview(slot)` points directly into - // the shared-memory slot (zero-copy), and has a .publish() method - // that commits via the Publisher. + // pointer and max payload size returned by Publisher::allocate(), + // exposes the writable buffer protocol so `memoryview(slot)` points + // directly into the shared-memory slot (zero-copy), and has a + // .publish(n) method that commits `n` bytes via the Publisher. // // Lifetime: the Py_buffer obtained through buffer protocol pins this // AllocatedSlot alive (view->obj = self; Py_INCREF), which in turn @@ -101,11 +103,11 @@ namespace kickmsg { Publisher* publisher; void* ptr; - std::size_t len; + std::size_t max_size; bool published; - PyAllocatedSlot(Publisher& p, void* data, std::size_t n) - : publisher{&p}, ptr{data}, len{n}, published{false} + PyAllocatedSlot(Publisher& p, void* data, std::size_t cap) + : publisher{&p}, ptr{data}, max_size{cap}, published{false} { } }; @@ -176,7 +178,7 @@ namespace view->buf = slot->ptr; view->obj = self; Py_INCREF(self); - view->len = static_cast(slot->len); + view->len = static_cast(slot->max_size); view->itemsize = 1; view->readonly = 0; // writable view->ndim = 1; @@ -633,26 +635,36 @@ namespace kickmsg nb::class_(m, "AllocatedSlot", nb::type_slots(as_slots)) .def("publish", - [](PyAllocatedSlot& s) -> std::size_t + [](PyAllocatedSlot& s, std::size_t len) -> std::size_t { if (s.published) { throw nb::value_error( "AllocatedSlot.publish() called more than once"); } + if (len > s.max_size) + { + throw nb::value_error( + "publish(len) exceeds slot max_size"); + } s.published = true; - return s.publisher->publish(); + return s.publisher->publish(len); }, - "Commit the reserved slot. Returns the number of rings " - "the sample was delivered to. After this call, any NEW " - "memoryview(slot) fails with BufferError.") + "len"_a, + "Commit the reserved slot, recording `len` bytes as the " + "payload size. Returns the number of rings the sample was " + "delivered to. After this call, any NEW memoryview(slot) " + "fails with BufferError.") .def("__len__", - [](PyAllocatedSlot const& s) -> std::size_t { return s.len; }) + [](PyAllocatedSlot const& s) -> std::size_t { return s.max_size; }) + .def_prop_ro("max_size", + [](PyAllocatedSlot const& s) -> std::size_t { return s.max_size; }) .def_prop_ro("published", [](PyAllocatedSlot const& s) -> bool { return s.published; }) .def("__repr__", [](PyAllocatedSlot const& s) { - return std::string{"AllocatedSlot(len="} + std::to_string(s.len) + + return std::string{"AllocatedSlot(max_size="} + + std::to_string(s.max_size) + ", published=" + (s.published ? "True" : "False") + ")"; }); @@ -701,26 +713,26 @@ namespace kickmsg "the message exceeds max_payload_size, BlockingIOError if " "the slot pool is exhausted, OSError on other failures.") .def("allocate", - [](Publisher& p, std::size_t len) -> std::optional + [](Publisher& p) -> std::optional { - void* ptr = p.allocate(len); - if (ptr == nullptr) + auto a = p.allocate(); + if (a.data == nullptr) { return std::nullopt; } - return PyAllocatedSlot{p, ptr, len}; + return PyAllocatedSlot{p, a.data, a.max_size}; }, - "len"_a, // keep_alive<0, 1>: the returned AllocatedSlot (arg 0) // must pin the Publisher (arg 1 = self). Memoryviews // obtained from the slot in turn pin the AllocatedSlot // (via Py_buffer::obj), so the full chain is // memoryview → AllocatedSlot → Publisher → SharedRegion. nb::keep_alive<0, 1>(), - "Reserve a slot of `len` bytes and return an AllocatedSlot. " - "Use memoryview(slot) or numpy.asarray(slot) to fill it " - "in place (zero-copy), then call slot.publish(). Returns " - "None if the pool is exhausted.") + "Reserve a slot sized to max_payload_size and return an " + "AllocatedSlot. Use memoryview(slot) or numpy.asarray(slot) " + "to write up to slot.max_size bytes in place (zero-copy), " + "then call slot.publish(n) with the actual number of bytes " + "written. Returns None if the pool is exhausted.") .def_prop_ro("dropped", &Publisher::dropped, "Per-ring delivery drops (CAS contention or pool exhaustion).") .def("__repr__", [](Publisher const& p) diff --git a/src/Publisher.cc b/src/Publisher.cc index ef14fe8..9626e97 100644 --- a/src/Publisher.cc +++ b/src/Publisher.cc @@ -17,34 +17,27 @@ namespace kickmsg auto* slot = slot_at(base_, header_, pending_slot_); treiber_push(header_->free_top, slot, pending_slot_); pending_slot_ = INVALID_SLOT; - pending_len_ = 0; } } - void* Publisher::allocate(std::size_t len) + Allocation Publisher::allocate() { - if (len > header_->slot_data_size) - { - return nullptr; - } - // Release any previously allocated but unpublished slot. release_pending(); uint32_t slot_idx = treiber_pop(header_->free_top, base_, header_); if (slot_idx == INVALID_SLOT) { - return nullptr; + return Allocation{nullptr, 0}; } pending_slot_ = slot_idx; - pending_len_ = static_cast(len); auto* slot = slot_at(base_, header_, slot_idx); - return slot_data(slot); + return Allocation{slot_data(slot), header_->slot_data_size}; } - std::size_t Publisher::publish() + std::size_t Publisher::publish(std::size_t len) { if (pending_slot_ == INVALID_SLOT) { @@ -52,9 +45,7 @@ namespace kickmsg } uint32_t slot_idx = pending_slot_; - uint32_t len = pending_len_; pending_slot_ = INVALID_SLOT; - pending_len_ = 0; auto* slot = slot_at(base_, header_, slot_idx); uint64_t capacity = header_->sub_ring_capacity; @@ -207,7 +198,7 @@ namespace kickmsg // We exclusively own this entry. No other publisher can CAS from // LOCKED_SEQUENCE since they expect prev_seq. e.slot_idx.store(slot_idx, std::memory_order_relaxed); - e.payload_len.store(len, std::memory_order_relaxed); + e.payload_len.store(static_cast(len), std::memory_order_relaxed); // Release-store commits the entry: subscribers and future publishers // at this position will see all preceding stores. @@ -249,14 +240,14 @@ namespace kickmsg return -EMSGSIZE; } - auto* ptr = allocate(len); - if (ptr == nullptr) + auto a = allocate(); + if (a.data == nullptr) { return -EAGAIN; } - std::memcpy(ptr, data, len); - publish(); + std::memcpy(a.data, data, len); + publish(len); return static_cast(len); } diff --git a/tests/crash_test.cc b/tests/crash_test.cc index 888d674..0847ff4 100644 --- a/tests/crash_test.cc +++ b/tests/crash_test.cc @@ -57,8 +57,8 @@ static void child_publisher_main(int /*round*/) for (uint32_t i = 0; ; ++i) { - auto* ptr = pub.allocate(sizeof(CrashPayload)); - if (ptr == nullptr) + auto a = pub.allocate(); + if (a.data == nullptr) { kickmsg::yield(); continue; @@ -68,9 +68,9 @@ static void child_publisher_main(int /*round*/) msg.magic = CrashPayload::MAGIC; msg.seq = i; msg.checksum = compute_checksum(msg); - std::memcpy(ptr, &msg, sizeof(msg)); + std::memcpy(a.data, &msg, sizeof(msg)); - pub.publish(); + pub.publish(sizeof(msg)); } } @@ -369,8 +369,8 @@ static bool test_multi_publisher_crash() kickmsg::Publisher p(r); for (uint32_t seq = 0; ; ++seq) { - auto* ptr = p.allocate(sizeof(CrashPayload)); - if (ptr == nullptr) + auto a = p.allocate(); + if (a.data == nullptr) { kickmsg::yield(); continue; @@ -379,8 +379,8 @@ static bool test_multi_publisher_crash() msg.magic = CrashPayload::MAGIC; msg.seq = seq; msg.checksum = compute_checksum(msg); - std::memcpy(ptr, &msg, sizeof(msg)); - p.publish(); + std::memcpy(a.data, &msg, sizeof(msg)); + p.publish(sizeof(msg)); } } } diff --git a/tests/python/test_zerocopy.py b/tests/python/test_zerocopy.py index 209856a..32df71d 100644 --- a/tests/python/test_zerocopy.py +++ b/tests/python/test_zerocopy.py @@ -20,16 +20,17 @@ def test_publisher_allocate_returns_writable_slot(shm_name, small_cfg): pub = kickmsg.Publisher(region) sub = kickmsg.Subscriber(region) - slot = pub.allocate(16) + slot = pub.allocate() assert slot is not None - assert len(slot) == 16 + assert slot.max_size == small_cfg.max_payload_size + assert len(slot) == small_cfg.max_payload_size assert not slot.published mv = memoryview(slot) assert not mv.readonly mv[:5] = b"hello" mv[5:16] = b"-zerocopy!!" - slot.publish() + slot.publish(16) assert slot.published got = sub.try_receive() @@ -61,10 +62,10 @@ def test_slot_publish_rejects_new_memoryview(shm_name, small_cfg): pub = kickmsg.Publisher(region) sub = kickmsg.Subscriber(region) - slot = pub.allocate(8) + slot = pub.allocate() mv1 = memoryview(slot) mv1[:4] = b"xxxx" - slot.publish() + slot.publish(8) with pytest.raises(BufferError): memoryview(slot) @@ -78,11 +79,11 @@ def test_slot_publish_rejects_new_memoryview(shm_name, small_cfg): def test_double_publish_raises(shm_name, small_cfg): region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") pub = kickmsg.Publisher(region) - slot = pub.allocate(4) - memoryview(slot)[:] = b"ping" - slot.publish() + slot = pub.allocate() + memoryview(slot)[:4] = b"ping" + slot.publish(4) with pytest.raises(ValueError, match="more than once"): - slot.publish() + slot.publish(4) def test_view_release_returns_slot_to_pool(shm_name, small_cfg): @@ -134,14 +135,15 @@ def test_zerocopy_camera_frame_pattern(shm_name): sub = kickmsg.Subscriber(region) frame_size = 320 * 240 * 3 # smaller test frame - slot = pub.allocate(frame_size) + slot = pub.allocate() assert slot is not None + assert slot.max_size == cfg.max_payload_size buf = memoryview(slot) # Fill with a pattern — cheap, full write coverage. for i in range(0, frame_size, 4096): chunk = min(4096, frame_size - i) buf[i:i + chunk] = bytes([(i + j) & 0xFF for j in range(chunk)]) - slot.publish() + slot.publish(frame_size) view = sub.try_receive_view() try: diff --git a/tests/unit/publisher-t.cc b/tests/unit/publisher-t.cc index 2a92767..6c50693 100644 --- a/tests/unit/publisher-t.cc +++ b/tests/unit/publisher-t.cc @@ -59,12 +59,13 @@ TEST_F(PublisherTest, AllocatePublishSeparately) kickmsg::Subscriber sub(region); kickmsg::Publisher pub(region); - auto* ptr = pub.allocate(sizeof(uint32_t)); - ASSERT_NE(ptr, nullptr); + auto a = pub.allocate(); + ASSERT_NE(a.data, nullptr); + EXPECT_GE(a.max_size, sizeof(uint32_t)); uint32_t val = 42; - std::memcpy(ptr, &val, sizeof(val)); - std::size_t delivered = pub.publish(); + std::memcpy(a.data, &val, sizeof(val)); + std::size_t delivered = pub.publish(sizeof(val)); EXPECT_EQ(delivered, 1u); auto sample = sub.try_receive(); @@ -75,13 +76,15 @@ TEST_F(PublisherTest, AllocatePublishSeparately) EXPECT_EQ(got, 42u); } -TEST_F(PublisherTest, AllocateTooLargeReturnsNull) +TEST_F(PublisherTest, AllocateExposesMaxSize) { auto cfg = default_cfg(); auto region = kickmsg::SharedRegion::create(SHM_NAME, kickmsg::channel::PubSub, cfg); kickmsg::Publisher pub(region); - EXPECT_EQ(pub.allocate(cfg.max_payload_size + 1), nullptr); + auto a = pub.allocate(); + ASSERT_NE(a.data, nullptr); + EXPECT_EQ(a.max_size, cfg.max_payload_size); } TEST_F(PublisherTest, SendReturnsEmsgsize) diff --git a/tests/unit/region-t.cc b/tests/unit/region-t.cc index 06e105a..b065ba0 100644 --- a/tests/unit/region-t.cc +++ b/tests/unit/region-t.cc @@ -1024,8 +1024,8 @@ TEST_F(RegionTest, StatsPoolFreeTracksAllocations) kickmsg::Publisher pub(region); // Hold a slot mid-publish (allocate without publish). - auto* ptr = pub.allocate(8); - ASSERT_NE(ptr, nullptr); + auto a = pub.allocate(); + ASSERT_NE(a.data, nullptr); auto s = region.stats(); // One slot is popped from the free stack and not yet returned.