From ed46261ab52717e32e350fcda3cf9c1750c8865d Mon Sep 17 00:00:00 2001 From: Adam Mitchell Date: Wed, 20 May 2026 10:35:36 +0000 Subject: [PATCH] Add publish(Suset) overload to ThingSetServer --- include/thingset++/ThingSetServer.hpp | 55 ++++++++++++++++++++++++ tests/TestAsioIpPublishSubscribe.cpp | 61 +++++++++++++++++++++++++++ 2 files changed, 116 insertions(+) diff --git a/include/thingset++/ThingSetServer.hpp b/include/thingset++/ThingSetServer.hpp index db37375..098664c 100644 --- a/include/thingset++/ThingSetServer.hpp +++ b/include/thingset++/ThingSetServer.hpp @@ -8,6 +8,7 @@ #include "thingset++/Eui.hpp" #include "thingset++/StringLiteral.hpp" #include "thingset++/ThingSetProperty.hpp" +#include "thingset++/ThingSetRegistry.hpp" #include "thingset++/ThingSetRequestContext.hpp" #include "thingset++/ThingSetServerTransport.hpp" #include "thingset++/ThingSetStatus.hpp" @@ -130,6 +131,60 @@ class ThingSetServer : public _ThingSetServer return encoder.flush(); } + /// @brief Broadcasts every property tagged with the given subset as a single report. + /// @tparam SubsetType Type of the subset enum. + /// @param subset The subset to publish. All registered nodes whose subset mask + /// contains every bit of subset are included. + /// @return True if publishing succeeded. + template + requires std::is_enum_v + bool publish(SubsetType subset) + { +#ifdef ENABLE_ENHANCED_REPORTING + bool enhanced = true; +#else + bool enhanced = false; +#endif + Encoder encoder = _transport.getPublishingEncoder(enhanced); + + if (enhanced) { + if (!encoder.encode(ThingSet::Eui::getValue())) { + return false; + } + } + + if (!encoder.encode(0)) { + return false; + } + + size_t count = 0; + for (auto *n : ThingSetRegistry::nodesInSubset(subset)) { + (void)n; + ++count; + } + + if (!encoder.encodeMapStart(count)) { + return false; + } + + for (ThingSetNode *node : ThingSetRegistry::nodesInSubset(subset)) { + void *target = nullptr; + if (!node->tryCastTo(ThingSetNodeType::encodable, &target)) { + return false; + } + auto *encodable = reinterpret_cast(target); + if (!encoder.encode(node->getId()) || !encodable->encode(encoder)) { + return false; + } + } + + if (!encoder.encodeMapEnd()) { + return false; + } + + return encoder.flush(); + } + private: int requestCallback(Identifier &, uint8_t *request, size_t requestLen, uint8_t *response, size_t responseLen) { diff --git a/tests/TestAsioIpPublishSubscribe.cpp b/tests/TestAsioIpPublishSubscribe.cpp index a6ca4b1..c753823 100644 --- a/tests/TestAsioIpPublishSubscribe.cpp +++ b/tests/TestAsioIpPublishSubscribe.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include using namespace ThingSet; @@ -64,3 +65,63 @@ TEST(AsioIpPublishSubscribe, Name) ASSERT_EQ(5, receiveCount); } + +TEST(AsioIpPublishSubscribe, Subset) +{ + // Four flavours of property: tagged live, tagged live|persisted (union), + // tagged persisted-only, and untagged. publish(Subset::live) must include + // the first two and exclude the rest. + ThingSetReadWriteProperty bothSubsets { 0x900, 0, "bothSubsets", 1.5f }; + ThingSetReadWriteProperty liveOnly { 0x901, 0, "liveOnly", 7u }; + ThingSetReadWriteProperty persistedOnly { 0x902, 0, "persistedOnly", 99u }; + ThingSetReadWriteProperty untagged { 0x903, 0, "untagged", 42u }; + + io_context serverContext(1); + ThingSetAsyncSocketServerTransport serverTransport(serverContext); + auto server = ThingSetServerBuilder::build(serverTransport); + server.listen(); + + io_context clientContext(1); + ThingSetAsyncSocketSubscriptionTransport subscriptionTransport(clientContext); + auto listener = ThingSetListenerBuilder::build(subscriptionTransport); + + std::map hits; + listener.subscribe([&](auto, auto id) { + hits[id]++; + // Stop once the last id from the third publish has arrived + if (hits[0x901] == 3) { + clientContext.stop(); + } + }); + + std::thread serverThread([&]() { + for (int i = 0; i < 3; i++) { + server.publish(Subset::live); + std::this_thread::sleep_for(std::chrono::milliseconds(25)); + } + }); + std::thread clientThread([&]() { + std::this_thread::sleep_for(std::chrono::milliseconds(125)); + clientContext.run_for(std::chrono::seconds(5)); + serverContext.stop(); + }); + clientThread.join(); + serverThread.join(); + + EXPECT_EQ(3u, hits[0x900]); // live | persisted matches live filter (union) + EXPECT_EQ(3u, hits[0x901]); // live matches + EXPECT_EQ(0u, hits[0x902]); // persisted-only excluded + EXPECT_EQ(0u, hits[0x903]); // untagged excluded +} + +TEST(AsioIpPublishSubscribe, SubsetEmpty) +{ + // No properties tagged Subset::live in this scope, so publish must still + // succeed and produce an empty map. + io_context serverContext(1); + ThingSetAsyncSocketServerTransport serverTransport(serverContext); + auto server = ThingSetServerBuilder::build(serverTransport); + server.listen(); + + EXPECT_TRUE(server.publish(Subset::live)); +}