From 2217316509348537540260cd935d3b0466269993 Mon Sep 17 00:00:00 2001 From: Philippe Leduc Date: Tue, 26 May 2026 17:39:34 +0200 Subject: [PATCH] Add relaxed mailbox creation order API --- include/kickmsg/Node.h | 21 +++++++++++++ src/Node.cc | 69 +++++++++++++++++++++++++++--------------- tests/unit/node-t.cc | 62 +++++++++++++++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 25 deletions(-) diff --git a/include/kickmsg/Node.h b/include/kickmsg/Node.h index 6139837..1c4820e 100644 --- a/include/kickmsg/Node.h +++ b/include/kickmsg/Node.h @@ -99,6 +99,16 @@ namespace kickmsg Subscriber create_mailbox(char const* tag, channel::Config const& cfg = {}); Publisher open_mailbox(char const* owner_node, char const* tag); + // Relaxed-order mailbox: either side may arrive first. Mirrors + // advertise_or_join / subscribe_or_create for strict pub/sub. + // Both sides must pass `cfg` because either may end up being the + // creator; max_subscribers is forced to 1 to preserve the mailbox + // invariant regardless of cfg. + Subscriber create_or_open_mailbox(char const* tag, + channel::Config const& cfg); + Publisher open_or_create_mailbox(char const* owner_node, char const* tag, + channel::Config const& cfg); + // --- Unlink helpers ----------------------------------------------- // // Thin wrappers that call SharedMemory::unlink() with the same name @@ -142,6 +152,17 @@ namespace kickmsg SharedRegion* find_region(std::string const& shm_name); SharedRegion const* find_region(std::string const& shm_name) const; + // Shared body of every *_or_* method: idempotent find, else + // create_or_open, then touch_registry on both branches. + // Instantiated for Publisher and Subscriber inside Node.cc. + template + Handle create_or_open_handle(std::string const& shm_name, + std::string const& topic_path, + channel::Type channel_type, + registry::Kind kind, + registry::Role role, + channel::Config const& cfg); + Registry& lazy_registry(); /// Register `shm_name` with `role`, or upgrade the existing entry diff --git a/src/Node.cc b/src/Node.cc index df7d6e9..91be54f 100644 --- a/src/Node.cc +++ b/src/Node.cc @@ -166,42 +166,39 @@ namespace kickmsg return Subscriber(it->second); } - Publisher Node::advertise_or_join(char const* topic, channel::Config const& cfg) + template + Handle Node::create_or_open_handle(std::string const& shm_name, + std::string const& topic_path, + channel::Type channel_type, + registry::Kind kind, + registry::Role role, + channel::Config const& cfg) { - auto shm_name = make_topic_name(topic); - auto topic_path = with_leading_slash(topic); if (auto* r = find_region(shm_name)) { - touch_registry(shm_name, topic_path, channel::PubSub, - registry::Pubsub, registry::Publisher); - return Publisher(*r); + touch_registry(shm_name, topic_path, channel_type, kind, role); + return Handle(*r); } auto [it, _] = regions_.emplace( shm_name, SharedRegion::create_or_open( - shm_name.c_str(), channel::PubSub, cfg, name_.c_str())); - touch_registry(shm_name, topic_path, channel::PubSub, - registry::Pubsub, registry::Publisher); - return Publisher(it->second); + shm_name.c_str(), channel_type, cfg, name_.c_str())); + touch_registry(shm_name, topic_path, channel_type, kind, role); + return Handle(it->second); + } + + Publisher Node::advertise_or_join(char const* topic, channel::Config const& cfg) + { + return create_or_open_handle( + make_topic_name(topic), with_leading_slash(topic), + channel::PubSub, registry::Pubsub, registry::Publisher, cfg); } Subscriber Node::subscribe_or_create(char const* topic, channel::Config const& cfg) { - auto shm_name = make_topic_name(topic); - auto topic_path = with_leading_slash(topic); - if (auto* r = find_region(shm_name)) - { - touch_registry(shm_name, topic_path, channel::PubSub, - registry::Pubsub, registry::Subscriber); - return Subscriber(*r); - } - auto [it, _] = regions_.emplace( - shm_name, - SharedRegion::create_or_open( - shm_name.c_str(), channel::PubSub, cfg, name_.c_str())); - touch_registry(shm_name, topic_path, channel::PubSub, - registry::Pubsub, registry::Subscriber); - return Subscriber(it->second); + return create_or_open_handle( + make_topic_name(topic), with_leading_slash(topic), + channel::PubSub, registry::Pubsub, registry::Subscriber, cfg); } BroadcastHandle Node::join_broadcast(char const* channel, channel::Config const& cfg) @@ -256,6 +253,28 @@ namespace kickmsg return Publisher(it->second); } + Subscriber Node::create_or_open_mailbox(char const* tag, + channel::Config const& cfg) + { + channel::Config mbx_cfg = cfg; + mbx_cfg.max_subscribers = 1; + return create_or_open_handle( + make_mailbox_name(name_.c_str(), tag), + mailbox_topic(name_.c_str(), tag), + channel::PubSub, registry::Mailbox, registry::Subscriber, mbx_cfg); + } + + Publisher Node::open_or_create_mailbox(char const* owner_node, char const* tag, + channel::Config const& cfg) + { + channel::Config mbx_cfg = cfg; + mbx_cfg.max_subscribers = 1; + return create_or_open_handle( + make_mailbox_name(owner_node, tag), + mailbox_topic(owner_node, tag), + channel::PubSub, registry::Mailbox, registry::Publisher, mbx_cfg); + } + void Node::unlink_topic(char const* topic) const { SharedMemory::unlink(make_topic_name(topic)); diff --git a/tests/unit/node-t.cc b/tests/unit/node-t.cc index d0ba578..97cf9cf 100644 --- a/tests/unit/node-t.cc +++ b/tests/unit/node-t.cc @@ -357,3 +357,65 @@ TEST_F(NodeTest, MailboxMultipleWriters) EXPECT_TRUE((got1 == m1 && got2 == m2) || (got1 == m2 && got2 == m1)); } + +TEST_F(NodeTest, RelaxedMailboxOwnerFirst) +{ + track("/test_owner_mbx_inbox"); + + auto cfg = small_cfg(); + + kickmsg::Node owner("owner", "test"); + auto inbox = owner.create_or_open_mailbox("inbox", cfg); + + kickmsg::Node sender("snd", "test"); + auto pub = sender.open_or_create_mailbox("owner", "inbox", cfg); + + std::string m = "ping"; + ASSERT_GE(pub.send(m.data(), m.size()), 0); + + auto r = inbox.try_receive(); + ASSERT_TRUE(r.has_value()); + EXPECT_EQ(std::string(static_cast(r->data()), r->len()), m); +} + +TEST_F(NodeTest, RelaxedMailboxSenderFirst) +{ + track("/test_owner_mbx_inbox"); + + auto cfg = small_cfg(); + + // Sender starts before owner: it materializes the region. + kickmsg::Node sender("snd", "test"); + auto pub = sender.open_or_create_mailbox("owner", "inbox", cfg); + + kickmsg::Node owner("owner", "test"); + auto inbox = owner.create_or_open_mailbox("inbox", cfg); + + std::string m = "ping"; + ASSERT_GE(pub.send(m.data(), m.size()), 0); + + auto r = inbox.try_receive(); + ASSERT_TRUE(r.has_value()); + EXPECT_EQ(std::string(static_cast(r->data()), r->len()), m); +} + +TEST_F(NodeTest, RelaxedMailboxForcesMaxSubscribersOne) +{ + // If a caller passes cfg.max_subscribers != 1, the mailbox APIs must + // override it. Verify by reading back the region info. + track("/test_owner_mbx_inbox"); + + auto cfg = small_cfg(); + cfg.max_subscribers = 4; // mailbox should clamp to 1 + + kickmsg::Node owner("owner", "test"); + auto inbox = owner.create_or_open_mailbox("inbox", cfg); + + kickmsg::Node sender("snd", "test"); + auto pub = sender.open_or_create_mailbox("owner", "inbox", cfg); + + std::string m = "data"; + ASSERT_GE(pub.send(m.data(), m.size()), 0); + auto r = inbox.try_receive(); + ASSERT_TRUE(r.has_value()); +}