Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions include/kickmsg/Node.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ namespace kickmsg
Subscriber create_mailbox(char const* tag, channel::Config const& cfg = {});
Publisher open_mailbox(char const* owner_node, char const* tag);

// Relaxed-order mailbox: either side may arrive first. Mirrors
// advertise_or_join / subscribe_or_create for strict pub/sub.
// Both sides must pass `cfg` because either may end up being the
// creator; max_subscribers is forced to 1 to preserve the mailbox
// invariant regardless of cfg.
Subscriber create_or_open_mailbox(char const* tag,
channel::Config const& cfg);
Publisher open_or_create_mailbox(char const* owner_node, char const* tag,
channel::Config const& cfg);

// --- Unlink helpers -----------------------------------------------
//
// Thin wrappers that call SharedMemory::unlink() with the same name
Expand Down Expand Up @@ -142,6 +152,17 @@ namespace kickmsg
SharedRegion* find_region(std::string const& shm_name);
SharedRegion const* find_region(std::string const& shm_name) const;

// Shared body of every *_or_* method: idempotent find, else
// create_or_open, then touch_registry on both branches.
// Instantiated for Publisher and Subscriber inside Node.cc.
template <typename Handle>
Handle create_or_open_handle(std::string const& shm_name,
std::string const& topic_path,
channel::Type channel_type,
registry::Kind kind,
registry::Role role,
channel::Config const& cfg);

Registry& lazy_registry();

/// Register `shm_name` with `role`, or upgrade the existing entry
Expand Down
69 changes: 44 additions & 25 deletions src/Node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,42 +166,39 @@ namespace kickmsg
return Subscriber(it->second);
}

Publisher Node::advertise_or_join(char const* topic, channel::Config const& cfg)
template <typename Handle>
Handle Node::create_or_open_handle(std::string const& shm_name,
std::string const& topic_path,
channel::Type channel_type,
registry::Kind kind,
registry::Role role,
channel::Config const& cfg)
{
auto shm_name = make_topic_name(topic);
auto topic_path = with_leading_slash(topic);
if (auto* r = find_region(shm_name))
{
touch_registry(shm_name, topic_path, channel::PubSub,
registry::Pubsub, registry::Publisher);
return Publisher(*r);
touch_registry(shm_name, topic_path, channel_type, kind, role);
return Handle(*r);
}
auto [it, _] = regions_.emplace(
shm_name,
SharedRegion::create_or_open(
shm_name.c_str(), channel::PubSub, cfg, name_.c_str()));
touch_registry(shm_name, topic_path, channel::PubSub,
registry::Pubsub, registry::Publisher);
return Publisher(it->second);
shm_name.c_str(), channel_type, cfg, name_.c_str()));
touch_registry(shm_name, topic_path, channel_type, kind, role);
return Handle(it->second);
}

Publisher Node::advertise_or_join(char const* topic, channel::Config const& cfg)
{
return create_or_open_handle<Publisher>(
make_topic_name(topic), with_leading_slash(topic),
channel::PubSub, registry::Pubsub, registry::Publisher, cfg);
}

Subscriber Node::subscribe_or_create(char const* topic, channel::Config const& cfg)
{
auto shm_name = make_topic_name(topic);
auto topic_path = with_leading_slash(topic);
if (auto* r = find_region(shm_name))
{
touch_registry(shm_name, topic_path, channel::PubSub,
registry::Pubsub, registry::Subscriber);
return Subscriber(*r);
}
auto [it, _] = regions_.emplace(
shm_name,
SharedRegion::create_or_open(
shm_name.c_str(), channel::PubSub, cfg, name_.c_str()));
touch_registry(shm_name, topic_path, channel::PubSub,
registry::Pubsub, registry::Subscriber);
return Subscriber(it->second);
return create_or_open_handle<Subscriber>(
make_topic_name(topic), with_leading_slash(topic),
channel::PubSub, registry::Pubsub, registry::Subscriber, cfg);
}

BroadcastHandle Node::join_broadcast(char const* channel, channel::Config const& cfg)
Expand Down Expand Up @@ -256,6 +253,28 @@ namespace kickmsg
return Publisher(it->second);
}

Subscriber Node::create_or_open_mailbox(char const* tag,
channel::Config const& cfg)
{
channel::Config mbx_cfg = cfg;
mbx_cfg.max_subscribers = 1;
return create_or_open_handle<Subscriber>(
make_mailbox_name(name_.c_str(), tag),
mailbox_topic(name_.c_str(), tag),
channel::PubSub, registry::Mailbox, registry::Subscriber, mbx_cfg);
}

Publisher Node::open_or_create_mailbox(char const* owner_node, char const* tag,
channel::Config const& cfg)
{
channel::Config mbx_cfg = cfg;
mbx_cfg.max_subscribers = 1;
return create_or_open_handle<Publisher>(
make_mailbox_name(owner_node, tag),
mailbox_topic(owner_node, tag),
channel::PubSub, registry::Mailbox, registry::Publisher, mbx_cfg);
}

void Node::unlink_topic(char const* topic) const
{
SharedMemory::unlink(make_topic_name(topic));
Expand Down
62 changes: 62 additions & 0 deletions tests/unit/node-t.cc
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,65 @@ TEST_F(NodeTest, MailboxMultipleWriters)

EXPECT_TRUE((got1 == m1 && got2 == m2) || (got1 == m2 && got2 == m1));
}

TEST_F(NodeTest, RelaxedMailboxOwnerFirst)
{
track("/test_owner_mbx_inbox");

auto cfg = small_cfg();

kickmsg::Node owner("owner", "test");
auto inbox = owner.create_or_open_mailbox("inbox", cfg);

kickmsg::Node sender("snd", "test");
auto pub = sender.open_or_create_mailbox("owner", "inbox", cfg);

std::string m = "ping";
ASSERT_GE(pub.send(m.data(), m.size()), 0);

auto r = inbox.try_receive();
ASSERT_TRUE(r.has_value());
EXPECT_EQ(std::string(static_cast<char const*>(r->data()), r->len()), m);
}

TEST_F(NodeTest, RelaxedMailboxSenderFirst)
{
track("/test_owner_mbx_inbox");

auto cfg = small_cfg();

// Sender starts before owner: it materializes the region.
kickmsg::Node sender("snd", "test");
auto pub = sender.open_or_create_mailbox("owner", "inbox", cfg);

kickmsg::Node owner("owner", "test");
auto inbox = owner.create_or_open_mailbox("inbox", cfg);

std::string m = "ping";
ASSERT_GE(pub.send(m.data(), m.size()), 0);

auto r = inbox.try_receive();
ASSERT_TRUE(r.has_value());
EXPECT_EQ(std::string(static_cast<char const*>(r->data()), r->len()), m);
}

TEST_F(NodeTest, RelaxedMailboxForcesMaxSubscribersOne)
{
// If a caller passes cfg.max_subscribers != 1, the mailbox APIs must
// override it. Verify by reading back the region info.
track("/test_owner_mbx_inbox");

auto cfg = small_cfg();
cfg.max_subscribers = 4; // mailbox should clamp to 1

kickmsg::Node owner("owner", "test");
auto inbox = owner.create_or_open_mailbox("inbox", cfg);

kickmsg::Node sender("snd", "test");
auto pub = sender.open_or_create_mailbox("owner", "inbox", cfg);

std::string m = "data";
ASSERT_GE(pub.send(m.data(), m.size()), 0);
auto r = inbox.try_receive();
ASSERT_TRUE(r.has_value());
}
Loading