From a9bfc77973abdc169c90a0782cc13fb8cbc9ddab Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Thu, 4 Feb 2021 17:51:59 -0300 Subject: [PATCH 1/9] Add events queue abstract class and implem --- .../rclcpp/executors/events_executor.hpp | 10 +- .../experimental/buffers/events_queue.hpp | 98 +++++++++++++++ .../buffers/performance_events_queue.hpp | 116 ++++++++++++++++++ .../src/rclcpp/executors/events_executor.cpp | 42 ++++--- 4 files changed, 242 insertions(+), 24 deletions(-) create mode 100644 rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp create mode 100644 rclcpp/include/rclcpp/experimental/buffers/performance_events_queue.hpp diff --git a/rclcpp/include/rclcpp/executors/events_executor.hpp b/rclcpp/include/rclcpp/executors/events_executor.hpp index df63a96535..32317897b1 100644 --- a/rclcpp/include/rclcpp/executors/events_executor.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor.hpp @@ -24,6 +24,7 @@ #include "rclcpp/executors/events_executor_entities_collector.hpp" #include "rclcpp/executors/events_executor_notify_waitable.hpp" #include "rclcpp/executors/timers_manager.hpp" +#include "rclcpp/experimental/buffers/events_queue.hpp" #include "rclcpp/node.hpp" #include "rmw/listener_event_types.h" @@ -59,6 +60,7 @@ class EventsExecutor : public rclcpp::Executor */ RCLCPP_PUBLIC explicit EventsExecutor( + rclcpp::experimental::buffers::EventsQueue::UniquePtr events_queue, const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions()); /// Default destrcutor. @@ -190,10 +192,10 @@ class EventsExecutor : public rclcpp::Executor { std::unique_lock lock(this_executor->push_mutex_); - this_executor->event_queue_.push(event); + this_executor->events_queue_->push(event); } // Notify that the event queue has some events in it. - this_executor->event_queue_cv_.notify_one(); + this_executor->events_queue_cv_.notify_one(); } /// Extract and execute events from the queue until it is empty @@ -207,7 +209,7 @@ class EventsExecutor : public rclcpp::Executor execute_event(const rmw_listener_event_t & event); // Queue where entities can push events - EventQueue event_queue_; + rclcpp::experimental::buffers::EventsQueue::SharedPtr events_queue_; EventsExecutorEntitiesCollector::SharedPtr entities_collector_; EventsExecutorNotifyWaitable::SharedPtr executor_notifier_; @@ -215,7 +217,7 @@ class EventsExecutor : public rclcpp::Executor // Mutex to protect the insertion of events in the queue std::mutex push_mutex_; // Variable used to notify when an event is added to the queue - std::condition_variable event_queue_cv_; + std::condition_variable events_queue_cv_; // Timers manager std::shared_ptr timers_manager_; }; diff --git a/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp new file mode 100644 index 0000000000..ef9b61a5f2 --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp @@ -0,0 +1,98 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__EVENTS_QUEUE_HPP_ +#define RCLCPP__EXPERIMENTAL__BUFFERS__EVENTS_QUEUE_HPP_ + +#include + +#include "rclcpp/macros.hpp" + +#include "rmw/listener_event_types.h" + +namespace rclcpp +{ +namespace experimental +{ +namespace buffers +{ + +/** + * @brief This abstract class is intended to be used as + * a wrapper around a queue. The derived classes should chose + * which container to use and the strategies for push and prune + * events from the queue. + */ +class EventsQueue +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(EventsQueue) + + /** + * @brief Destruct the object. + */ + RCLCPP_PUBLIC + virtual ~EventsQueue() = default; + + /** + * @brief push event into the queue + * @param event The event to push into the queue + */ + RCLCPP_PUBLIC + virtual + void + push(rmw_listener_event_t event) = 0; + + /** + * @brief removes front element from the queue + * @return iterator + */ + RCLCPP_PUBLIC + virtual + void + pop() = 0; + + /** + * @brief gets the front event from the queue + * @return the front event + */ + RCLCPP_PUBLIC + virtual + rmw_listener_event_t + front() = 0; + + /** + * @brief Test whether queue is empty + * @return true if the queue's size is 0, false otherwise. + */ + RCLCPP_PUBLIC + virtual + bool + empty() = 0; + + /** + * @brief gets a queue with all events accumulated on it + * @return queue with events + */ + RCLCPP_PUBLIC + virtual + std::queue + get_all_events() = 0; +}; + +} // namespace buffers +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__BUFFERS__EVENTS_QUEUE_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/buffers/performance_events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/performance_events_queue.hpp new file mode 100644 index 0000000000..feeaff059d --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/buffers/performance_events_queue.hpp @@ -0,0 +1,116 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__PERFORMANCE_EVENTS_QUEUE_HPP_ +#define RCLCPP__EXPERIMENTAL__BUFFERS__PERFORMANCE_EVENTS_QUEUE_HPP_ + +#include + +#include "rclcpp/executors/events_queue.hpp" + +namespace rclcpp +{ +namespace experimental +{ +namespace buffers +{ + +/** + * @brief This class provides a simple queue implementation + * based on a std::queue. As the objective is having a CPU peformant + * queue, it does not performs any checks about the size of + * the queue, so the queue size could grow unbounded. + * It does not implement any pruning mechanisms. + */ +class PerformanceEventsQueue : public EventsQueue +{ +public: + + using EventQueue = std::queue; + + RCLCPP_PUBLIC + ~PerformanceEventsQueue() = default; + + /** + * @brief push event into the queue + * @param event The event to push into the queue + */ + RCLCPP_PUBLIC + virtual + void + push(rmw_listener_event_t event) + { + event_queue_.push(event); + } + + /** + * @brief removes front element from the queue + * @return iterator + */ + RCLCPP_PUBLIC + virtual + void + pop() + { + event_queue_.pop(); + } + + /** + * @brief gets the front event from the queue + * @return the front event + */ + RCLCPP_PUBLIC + virtual + rmw_listener_event_t + front() + { + return event_queue_.front(); + } + + /** + * @brief Test whether queue is empty + * @return true if the queue's size is 0, false otherwise. + */ + RCLCPP_PUBLIC + virtual + bool + empty() + { + return event_queue_.empty(); + } + + /** + * @brief gets a queue with all events on it + * @return queue with events + */ + RCLCPP_PUBLIC + virtual + EventQueue + get_all_events() + { + EventQueue local_queue; + std::swap(event_queue_, local_queue); + return local_queue; + } + +private: + EventQueue event_queue_; +}; + +} // namespace buffers +} // namespace experimental +} // namespace rclcpp + + +#endif // RCLCPP__EXPERIMENTAL__BUFFERS__PERFORMANCE_EVENTS_QUEUE_HPP_ diff --git a/rclcpp/src/rclcpp/executors/events_executor.cpp b/rclcpp/src/rclcpp/executors/events_executor.cpp index 97a673c5cf..be688134bf 100644 --- a/rclcpp/src/rclcpp/executors/events_executor.cpp +++ b/rclcpp/src/rclcpp/executors/events_executor.cpp @@ -24,8 +24,10 @@ using namespace std::chrono_literals; using rclcpp::executors::EventsExecutor; +using rclcpp::experimental::buffers::EventsQueue; EventsExecutor::EventsExecutor( + EventsQueue::UniquePtr events_queue, const rclcpp::ExecutorOptions & options) : rclcpp::Executor(options) { @@ -33,6 +35,8 @@ EventsExecutor::EventsExecutor( entities_collector_ = std::make_shared(this); entities_collector_->init(); + events_queue_ = std::move(events_queue); + // Setup the executor notifier to wake up the executor when some guard conditions are tiggered. // The added guard conditions are guaranteed to not go out of scope before the executor itself. executor_notifier_ = std::make_shared(); @@ -51,7 +55,7 @@ EventsExecutor::spin() RCLCPP_SCOPE_EXIT(this->spinning.store(false);); // When condition variable is notified, check this predicate to proceed - auto has_event_predicate = [this]() {return !event_queue_.empty();}; + auto has_event_predicate = [this]() {return !events_queue_->empty();}; // Local event queue to allow entities to push events while we execute them EventQueue execution_event_queue; @@ -61,10 +65,10 @@ EventsExecutor::spin() while (rclcpp::ok(context_) && spinning.load()) { std::unique_lock push_lock(push_mutex_); // We wait here until something has been pushed to the event queue - event_queue_cv_.wait(push_lock, has_event_predicate); - // We got an event! Swap queues - std::swap(execution_event_queue, event_queue_); - // After swapping the queues, we don't need the lock anymore + events_queue_cv_.wait(push_lock, has_event_predicate); + // Local event queue to allow entities to push events while we execute them + execution_event_queue = events_queue_->get_all_events(); + // Unlock the mutex push_lock.unlock(); // Consume all available events, this queue will be empty at the end of the function this->consume_all_events(execution_event_queue); @@ -91,10 +95,8 @@ EventsExecutor::spin_some(std::chrono::nanoseconds max_duration) // - An executor event is received and processed // When condition variable is notified, check this predicate to proceed - auto has_event_predicate = [this]() {return !event_queue_.empty();}; + auto has_event_predicate = [this]() {return !events_queue_->empty();}; - // Local event queue to allow entities to push events while we execute them - EventQueue execution_event_queue; // Select the smallest between input max_duration and timer timeout auto next_timer_timeout = timers_manager_->get_head_timeout(); @@ -104,10 +106,10 @@ EventsExecutor::spin_some(std::chrono::nanoseconds max_duration) std::unique_lock push_lock(push_mutex_); // Wait until timeout or event - event_queue_cv_.wait_for(push_lock, max_duration, has_event_predicate); - // Time to swap queues as the wait is over - std::swap(execution_event_queue, event_queue_); - // After swapping the queues, we don't need the lock anymore + events_queue_cv_.wait_for(push_lock, max_duration, has_event_predicate); + // Local event queue to allow entities to push events while we execute them + EventQueue execution_event_queue = events_queue_->get_all_events(); + // We don't need the lock anymore push_lock.unlock(); // Execute all ready timers @@ -129,7 +131,7 @@ EventsExecutor::spin_all(std::chrono::nanoseconds max_duration) RCLCPP_SCOPE_EXIT(this->spinning.store(false);); // When condition variable is notified, check this predicate to proceed - auto has_event_predicate = [this]() {return !event_queue_.empty();}; + auto has_event_predicate = [this]() {return !events_queue_->empty();}; // Local event queue to allow entities to push events while we execute them EventQueue execution_event_queue; @@ -149,7 +151,7 @@ EventsExecutor::spin_all(std::chrono::nanoseconds max_duration) { // Wait once until timeout or event std::unique_lock push_lock(push_mutex_); - event_queue_cv_.wait_for(push_lock, max_duration, has_event_predicate); + events_queue_cv_.wait_for(push_lock, max_duration, has_event_predicate); } auto timeout = timers_manager_->get_head_timeout(); @@ -157,7 +159,7 @@ EventsExecutor::spin_all(std::chrono::nanoseconds max_duration) // Keep executing until no more work to do or timeout expired while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) { std::unique_lock push_lock(push_mutex_); - std::swap(execution_event_queue, event_queue_); + execution_event_queue = events_queue_->get_all_events(); push_lock.unlock(); // Exit if there is no more work to do @@ -188,7 +190,7 @@ EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout) } // When condition variable is notified, check this predicate to proceed - auto has_event_predicate = [this]() {return !event_queue_.empty();}; + auto has_event_predicate = [this]() {return !events_queue_->empty();}; rmw_listener_event_t event; bool has_event = false; @@ -196,13 +198,13 @@ EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout) { // Wait until timeout or event arrives std::unique_lock lock(push_mutex_); - event_queue_cv_.wait_for(lock, timeout, has_event_predicate); + events_queue_cv_.wait_for(lock, timeout, has_event_predicate); // Grab first event from queue if it exists - has_event = !event_queue_.empty(); + has_event = !events_queue_->empty(); if (has_event) { - event = event_queue_.front(); - event_queue_.pop(); + event = events_queue_->front(); + events_queue_->pop(); } } From 5b568fb8124f86b939790bc16934ffd3e43ca44e Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Fri, 5 Feb 2021 12:47:22 -0300 Subject: [PATCH 2/9] Apply PR suggestions --- .../rclcpp/executors/events_executor.hpp | 5 +- .../experimental/buffers/events_queue.hpp | 22 +++++++-- ...ents_queue.hpp => simple_events_queue.hpp} | 49 +++++++++++++------ .../src/rclcpp/executors/events_executor.cpp | 9 ++-- 4 files changed, 60 insertions(+), 25 deletions(-) rename rclcpp/include/rclcpp/experimental/buffers/{performance_events_queue.hpp => simple_events_queue.hpp} (60%) diff --git a/rclcpp/include/rclcpp/executors/events_executor.hpp b/rclcpp/include/rclcpp/executors/events_executor.hpp index 32317897b1..a584908b88 100644 --- a/rclcpp/include/rclcpp/executors/events_executor.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor.hpp @@ -25,6 +25,7 @@ #include "rclcpp/executors/events_executor_notify_waitable.hpp" #include "rclcpp/executors/timers_manager.hpp" #include "rclcpp/experimental/buffers/events_queue.hpp" +#include "rclcpp/experimental/buffers/simple_events_queue.hpp" #include "rclcpp/node.hpp" #include "rmw/listener_event_types.h" @@ -56,11 +57,13 @@ class EventsExecutor : public rclcpp::Executor /// Default constructor. See the default constructor for Executor. /** + * \param[in] events_queue The queue used to store events. * \param[in] options Options used to configure the executor. */ RCLCPP_PUBLIC explicit EventsExecutor( - rclcpp::experimental::buffers::EventsQueue::UniquePtr events_queue, + rclcpp::experimental::buffers::EventsQueue::UniquePtr events_queue = + std::make_unique(), const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions()); /// Default destrcutor. diff --git a/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp index ef9b61a5f2..492183ccff 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp @@ -17,6 +17,7 @@ #include +#include "rclcpp/executors/events_executor_entities_collector.hpp" #include "rclcpp/macros.hpp" #include "rmw/listener_event_types.h" @@ -52,11 +53,12 @@ class EventsQueue RCLCPP_PUBLIC virtual void - push(rmw_listener_event_t event) = 0; + push(const rmw_listener_event_t & event) = 0; /** * @brief removes front element from the queue - * @return iterator + * The element removed is the "oldest" element in the queue whose + * value can be retrieved by calling member front(). */ RCLCPP_PUBLIC virtual @@ -70,7 +72,7 @@ class EventsQueue RCLCPP_PUBLIC virtual rmw_listener_event_t - front() = 0; + front() const = 0; /** * @brief Test whether queue is empty @@ -79,10 +81,20 @@ class EventsQueue RCLCPP_PUBLIC virtual bool - empty() = 0; + empty() const = 0; /** - * @brief gets a queue with all events accumulated on it + * @brief Initializes the queue + * @param entities_collector The entities collector associated with the executor + */ + RCLCPP_PUBLIC + virtual + void + init(rclcpp::executors::EventsExecutorEntitiesCollector::SharedPtr entities_collector) = 0; + + /** + * @brief gets a queue with all events accumulated on it since + * the last call. The member queue is empty when the call returns. * @return queue with events */ RCLCPP_PUBLIC diff --git a/rclcpp/include/rclcpp/experimental/buffers/performance_events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp similarity index 60% rename from rclcpp/include/rclcpp/experimental/buffers/performance_events_queue.hpp rename to rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp index feeaff059d..1854ab0810 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/performance_events_queue.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__PERFORMANCE_EVENTS_QUEUE_HPP_ -#define RCLCPP__EXPERIMENTAL__BUFFERS__PERFORMANCE_EVENTS_QUEUE_HPP_ +#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__SIMPLE_EVENTS_QUEUE_HPP_ +#define RCLCPP__EXPERIMENTAL__BUFFERS__SIMPLE_EVENTS_QUEUE_HPP_ #include @@ -33,14 +33,12 @@ namespace buffers * the queue, so the queue size could grow unbounded. * It does not implement any pruning mechanisms. */ -class PerformanceEventsQueue : public EventsQueue +class SimpleEventsQueue : public EventsQueue { public: - using EventQueue = std::queue; - RCLCPP_PUBLIC - ~PerformanceEventsQueue() = default; + ~SimpleEventsQueue() = default; /** * @brief push event into the queue @@ -49,14 +47,15 @@ class PerformanceEventsQueue : public EventsQueue RCLCPP_PUBLIC virtual void - push(rmw_listener_event_t event) + push(const rmw_listener_event_t & event) { event_queue_.push(event); } /** * @brief removes front element from the queue - * @return iterator + * The element removed is the "oldest" element in the queue whose + * value can be retrieved by calling member front(). */ RCLCPP_PUBLIC virtual @@ -73,7 +72,7 @@ class PerformanceEventsQueue : public EventsQueue RCLCPP_PUBLIC virtual rmw_listener_event_t - front() + front() const { return event_queue_.front(); } @@ -85,27 +84,45 @@ class PerformanceEventsQueue : public EventsQueue RCLCPP_PUBLIC virtual bool - empty() + empty() const { return event_queue_.empty(); } /** - * @brief gets a queue with all events on it - * @return queue with events + * @brief Initializes the queue + * @param entities_collector The entities collector associated with the executor + */ + RCLCPP_PUBLIC + virtual + void + init(rclcpp::executors::EventsExecutorEntitiesCollector::SharedPtr entities_collector) + { + // Entities collector not used in this queue implementation + (void)entities_collector; + // Make sure the queue is empty when we start + std::queue local_queue; + std::swap(event_queue_, local_queue); + } + + + /** + * @brief gets a queue with all events accumulated on it since + * the last call. The member queue is empty when the call returns. + * @return std::queue with events */ RCLCPP_PUBLIC virtual - EventQueue + std::queue get_all_events() { - EventQueue local_queue; + std::queue local_queue; std::swap(event_queue_, local_queue); return local_queue; } private: - EventQueue event_queue_; + std::queue event_queue_; }; } // namespace buffers @@ -113,4 +130,4 @@ class PerformanceEventsQueue : public EventsQueue } // namespace rclcpp -#endif // RCLCPP__EXPERIMENTAL__BUFFERS__PERFORMANCE_EVENTS_QUEUE_HPP_ +#endif // RCLCPP__EXPERIMENTAL__BUFFERS__SIMPLE_EVENTS_QUEUE_HPP_ diff --git a/rclcpp/src/rclcpp/executors/events_executor.cpp b/rclcpp/src/rclcpp/executors/events_executor.cpp index be688134bf..d0857d949e 100644 --- a/rclcpp/src/rclcpp/executors/events_executor.cpp +++ b/rclcpp/src/rclcpp/executors/events_executor.cpp @@ -24,10 +24,9 @@ using namespace std::chrono_literals; using rclcpp::executors::EventsExecutor; -using rclcpp::experimental::buffers::EventsQueue; EventsExecutor::EventsExecutor( - EventsQueue::UniquePtr events_queue, + rclcpp::experimental::buffers::EventsQueue::UniquePtr events_queue, const rclcpp::ExecutorOptions & options) : rclcpp::Executor(options) { @@ -35,7 +34,6 @@ EventsExecutor::EventsExecutor( entities_collector_ = std::make_shared(this); entities_collector_->init(); - events_queue_ = std::move(events_queue); // Setup the executor notifier to wake up the executor when some guard conditions are tiggered. // The added guard conditions are guaranteed to not go out of scope before the executor itself. @@ -44,6 +42,11 @@ EventsExecutor::EventsExecutor( executor_notifier_->add_guard_condition(&interrupt_guard_condition_); executor_notifier_->set_events_executor_callback(this, &EventsExecutor::push_event); entities_collector_->add_waitable(executor_notifier_); + + // Get ownership of the queue used to store events. + events_queue_ = std::move(events_queue); + // Init the events queue + events_queue_->init(entities_collector_); } void From f73cbddd3730bce88c58a536cfe94fa5a053c02b Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Fri, 5 Feb 2021 14:13:30 -0300 Subject: [PATCH 3/9] Fix ctests --- rclcpp/include/rclcpp/executors/events_executor.hpp | 4 ++-- .../rclcpp/experimental/buffers/simple_events_queue.hpp | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rclcpp/include/rclcpp/executors/events_executor.hpp b/rclcpp/include/rclcpp/executors/events_executor.hpp index a584908b88..6b5ef5ec11 100644 --- a/rclcpp/include/rclcpp/executors/events_executor.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor.hpp @@ -62,8 +62,8 @@ class EventsExecutor : public rclcpp::Executor */ RCLCPP_PUBLIC explicit EventsExecutor( - rclcpp::experimental::buffers::EventsQueue::UniquePtr events_queue = - std::make_unique(), + rclcpp::experimental::buffers::EventsQueue::UniquePtr events_queue = std::make_unique< + rclcpp::experimental::buffers::SimpleEventsQueue>(), const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions()); /// Default destrcutor. diff --git a/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp index 1854ab0810..cb656a67e9 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp @@ -16,6 +16,7 @@ #define RCLCPP__EXPERIMENTAL__BUFFERS__SIMPLE_EVENTS_QUEUE_HPP_ #include +#include #include "rclcpp/executors/events_queue.hpp" @@ -36,7 +37,6 @@ namespace buffers class SimpleEventsQueue : public EventsQueue { public: - RCLCPP_PUBLIC ~SimpleEventsQueue() = default; @@ -74,7 +74,7 @@ class SimpleEventsQueue : public EventsQueue rmw_listener_event_t front() const { - return event_queue_.front(); + return event_queue_.front(); } /** From 9c5f037ad98a78613c278ba11a9b414ae84ded9b Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Fri, 5 Feb 2021 16:04:42 -0300 Subject: [PATCH 4/9] EntitiesCollector not needed on events_queue --- rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp | 3 +-- .../rclcpp/experimental/buffers/simple_events_queue.hpp | 5 +---- rclcpp/src/rclcpp/executors/events_executor.cpp | 2 +- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp index 492183ccff..d00a00705d 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp @@ -85,12 +85,11 @@ class EventsQueue /** * @brief Initializes the queue - * @param entities_collector The entities collector associated with the executor */ RCLCPP_PUBLIC virtual void - init(rclcpp::executors::EventsExecutorEntitiesCollector::SharedPtr entities_collector) = 0; + init() = 0; /** * @brief gets a queue with all events accumulated on it since diff --git a/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp index cb656a67e9..b0685e7ad1 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp @@ -91,15 +91,12 @@ class SimpleEventsQueue : public EventsQueue /** * @brief Initializes the queue - * @param entities_collector The entities collector associated with the executor */ RCLCPP_PUBLIC virtual void - init(rclcpp::executors::EventsExecutorEntitiesCollector::SharedPtr entities_collector) + init() { - // Entities collector not used in this queue implementation - (void)entities_collector; // Make sure the queue is empty when we start std::queue local_queue; std::swap(event_queue_, local_queue); diff --git a/rclcpp/src/rclcpp/executors/events_executor.cpp b/rclcpp/src/rclcpp/executors/events_executor.cpp index d0857d949e..f70cd2772f 100644 --- a/rclcpp/src/rclcpp/executors/events_executor.cpp +++ b/rclcpp/src/rclcpp/executors/events_executor.cpp @@ -46,7 +46,7 @@ EventsExecutor::EventsExecutor( // Get ownership of the queue used to store events. events_queue_ = std::move(events_queue); // Init the events queue - events_queue_->init(entities_collector_); + events_queue_->init(); } void From 780af848f1aca6080a4c0333b42c5c665d998976 Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Fri, 5 Feb 2021 16:36:13 -0300 Subject: [PATCH 5/9] Add BoundedEventsQueue --- .../buffers/bounded_events_queue.hpp | 146 ++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 rclcpp/include/rclcpp/experimental/buffers/bounded_events_queue.hpp diff --git a/rclcpp/include/rclcpp/experimental/buffers/bounded_events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/bounded_events_queue.hpp new file mode 100644 index 0000000000..2fb2970ba5 --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/buffers/bounded_events_queue.hpp @@ -0,0 +1,146 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__BOUNDED_EVENTS_QUEUE_HPP_ +#define RCLCPP__EXPERIMENTAL__BUFFERS__BOUNDED_EVENTS_QUEUE_HPP_ + +#include +#include + +#include "rclcpp/executors/events_queue.hpp" + +namespace rclcpp +{ +namespace experimental +{ +namespace buffers +{ + +/** + * @brief This class provides a bounded queue implementation + * based on a std::queue. Before pushing events into the queue + * checks the queue size. In case of exceeding the size it performs + * a prune of the queue. + */ +class BoundedEventsQueue : public EventsQueue +{ +public: + RCLCPP_PUBLIC + ~BoundedEventsQueue() = default; + + /** + * @brief push event into the queue + * @param event The event to push into the queue + */ + RCLCPP_PUBLIC + virtual + void + push(const rmw_listener_event_t & event) + { + if (event_queue_.size() >= queue_size_limit_) { + // Simple prune strategy: Remove all elements in the queue. + this->init(); + } + event_queue_.push(event); + } + + /** + * @brief removes front element from the queue + * The element removed is the "oldest" element in the queue whose + * value can be retrieved by calling member front(). + */ + RCLCPP_PUBLIC + virtual + void + pop() + { + event_queue_.pop(); + } + + /** + * @brief gets the front event from the queue + * @return the front event + */ + RCLCPP_PUBLIC + virtual + rmw_listener_event_t + front() const + { + return event_queue_.front(); + } + + /** + * @brief Test whether queue is empty + * @return true if the queue's size is 0, false otherwise. + */ + RCLCPP_PUBLIC + virtual + bool + empty() const + { + return event_queue_.empty(); + } + + /** + * @brief Initializes the queue + */ + RCLCPP_PUBLIC + virtual + void + init() + { + // Make sure the queue is empty when we start + std::queue local_queue; + std::swap(event_queue_, local_queue); + } + + + /** + * @brief gets a queue with all events accumulated on it since + * the last call. The member queue is empty when the call returns. + * @return std::queue with events + */ + RCLCPP_PUBLIC + virtual + std::queue + get_all_events() + { + std::queue local_queue; + std::swap(event_queue_, local_queue); + return local_queue; + } + + /** + * @brief sets the queue size limit + * @param limit The queue size limit + */ + RCLCPP_PUBLIC + void + set_queue_size_limit(size_t queue_size_limit) + { + queue_size_limit_ = queue_size_limit; + } + +private: + std::queue event_queue_; + + size_t queue_size_limit_ = 1000; +}; + +} // namespace buffers +} // namespace experimental +} // namespace rclcpp + + +#endif // RCLCPP__EXPERIMENTAL__BUFFERS__BOUNDED_EVENTS_QUEUE_HPP_ From 0317e7774a8f0ea40f074936c9bfc6801194eb46 Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Fri, 5 Feb 2021 18:31:58 -0300 Subject: [PATCH 6/9] Add unit tests for Simple and Bounded queues --- rclcpp/test/rclcpp/CMakeLists.txt | 9 ++ .../rclcpp/executors/test_events_queue.cpp | 147 ++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 rclcpp/test/rclcpp/executors/test_events_queue.cpp diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index b950c9b354..13d41a7fca 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -568,6 +568,15 @@ if(TARGET test_events_executor) target_link_libraries(test_events_executor ${PROJECT_NAME} mimick) endif() +ament_add_gtest(test_events_queue executors/test_events_queue.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}") +if(TARGET test_events_queue) + ament_target_dependencies(test_events_queue + "rcl" + "test_msgs") + target_link_libraries(test_events_queue ${PROJECT_NAME} mimick) +endif() + ament_add_gtest(test_events_executor_entities_collector executors/test_events_executor_entities_collector.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}") if(TARGET test_events_executor_entities_collector) diff --git a/rclcpp/test/rclcpp/executors/test_events_queue.cpp b/rclcpp/test/rclcpp/executors/test_events_queue.cpp new file mode 100644 index 0000000000..5cf5db7b8e --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_events_queue.cpp @@ -0,0 +1,147 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include +#include +#include + +#include "rclcpp/executors/events_executor.hpp" +#include "rclcpp/experimental/buffers/bounded_events_queue.hpp" + +#include "test_msgs/msg/empty.hpp" + +using namespace std::chrono_literals; + +using rclcpp::executors::EventsExecutor; + +class TestEventsQueue : public ::testing::Test +{ +public: + void SetUp() + { + rclcpp::init(0, nullptr); + } + + void TearDown() + { + rclcpp::shutdown(); + } +}; + +TEST_F(TestEventsQueue, BoundedQueue) +{ + // Create BoundedEventsQueue and set limit to 10 events. + auto bounded_queue = std::make_unique(); + bounded_queue->set_queue_size_limit(10); + + // Create an events executor using the bounded queue + EventsExecutor executor_sub(std::move(bounded_queue)); + + // Create a subscription node + auto node_sub = std::make_shared("node_sub"); + + size_t callback_count = 0; + + auto subscription = + node_sub->create_subscription( + "topic", + rclcpp::QoS(10), + [&](test_msgs::msg::Empty::SharedPtr) { + callback_count++; + }); + + // Add susbscription node to the executor, so the queue can start getting events + executor_sub.add_node(node_sub); + + // Create a publisher node + auto node_pub = std::make_shared("node_pub"); + auto publisher = node_pub->create_publisher("topic", rclcpp::QoS(10)); + + // Let subscriptions executor spin to execute any previous events + // not related to the subscription, so we start with an empty queue + executor_sub.spin_some(10ms); + + // Publish 11 messages, the eleventh msg should prune the queue + // and we should end up with only one event on it + for (int i = 0; i < 11; i++) { + publisher->publish(std::make_unique()); + std::this_thread::sleep_for(1ms); + } + + // Let subscriptions executor spin + executor_sub.spin_some(10ms); + + // The callback count should be 1 + EXPECT_EQ(1u, callback_count); + + // Reset callback count + callback_count = 0; + + // Publish 5 messages + for (int i = 0; i < 5; i++) { + publisher->publish(std::make_unique()); + std::this_thread::sleep_for(1ms); + } + + // Let subscriptions executor spin + executor_sub.spin_some(10ms); + + // The callback count should be 5, the queue shouldn't have been pruned + EXPECT_EQ(5u, callback_count); +} + +TEST_F(TestEventsQueue, SimpleQueue) +{ + // Create SimpleEventsQueue + auto simple_queue = std::make_unique(); + + // Create an events executor using the simple queue + EventsExecutor executor_sub(std::move(simple_queue)); + + // Create a subscription node with QoS->depth = 10 + auto node_sub = std::make_shared("node_sub"); + + size_t callback_count = 0; + + auto subscription = + node_sub->create_subscription( + "topic", + rclcpp::QoS(10), + [&](test_msgs::msg::Empty::SharedPtr) { + callback_count++; + }); + + // Add susbscription node to the executor, so the queue can start getting events + executor_sub.add_node(node_sub); + + // Create a publisher node + auto node_pub = std::make_shared("node_pub"); + auto publisher = node_pub->create_publisher("topic", rclcpp::QoS(10)); + + // Let subscriptions executor spin to execute any previous events + // not related to the subscription, so we start with an empty queue + executor_sub.spin_some(10ms); + + // Publish 11 messages, but as the subscription has only 10 as queue depth, + // we should lost a message (the events queue finish with 11 messages but one is not valid) + for (int i = 0; i < 11; i++) { + publisher->publish(std::make_unique()); + std::this_thread::sleep_for(1ms); + } + + // Let subscriptions executor spin + executor_sub.spin_some(10ms); + + // The callback count should be 10 + EXPECT_EQ(10u, callback_count); +} From 5780ccf0256d72c19be1b37006a95845a1c47389 Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Fri, 5 Feb 2021 18:38:51 -0300 Subject: [PATCH 7/9] Set size limit in constructor --- .../buffers/bounded_events_queue.hpp | 20 +++++++------------ .../rclcpp/executors/test_events_queue.cpp | 3 +-- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/buffers/bounded_events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/bounded_events_queue.hpp index 2fb2970ba5..147389da5f 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/bounded_events_queue.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/bounded_events_queue.hpp @@ -36,6 +36,12 @@ namespace buffers class BoundedEventsQueue : public EventsQueue { public: + RCLCPP_PUBLIC + explicit BoundedEventsQueue(size_t queue_size_limit) + { + queue_size_limit_ = queue_size_limit; + } + RCLCPP_PUBLIC ~BoundedEventsQueue() = default; @@ -121,21 +127,9 @@ class BoundedEventsQueue : public EventsQueue return local_queue; } - /** - * @brief sets the queue size limit - * @param limit The queue size limit - */ - RCLCPP_PUBLIC - void - set_queue_size_limit(size_t queue_size_limit) - { - queue_size_limit_ = queue_size_limit; - } - private: std::queue event_queue_; - - size_t queue_size_limit_ = 1000; + size_t queue_size_limit_; }; } // namespace buffers diff --git a/rclcpp/test/rclcpp/executors/test_events_queue.cpp b/rclcpp/test/rclcpp/executors/test_events_queue.cpp index 5cf5db7b8e..bcdb2c6144 100644 --- a/rclcpp/test/rclcpp/executors/test_events_queue.cpp +++ b/rclcpp/test/rclcpp/executors/test_events_queue.cpp @@ -41,8 +41,7 @@ class TestEventsQueue : public ::testing::Test TEST_F(TestEventsQueue, BoundedQueue) { // Create BoundedEventsQueue and set limit to 10 events. - auto bounded_queue = std::make_unique(); - bounded_queue->set_queue_size_limit(10); + auto bounded_queue = std::make_unique(10); // Create an events executor using the bounded queue EventsExecutor executor_sub(std::move(bounded_queue)); From 6fb2463ee4e436e319209f1c789f78a85e606c4f Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Mon, 8 Feb 2021 12:53:56 -0300 Subject: [PATCH 8/9] Test all events queue APIs --- .../buffers/bounded_events_queue.hpp | 2 +- .../buffers/simple_events_queue.hpp | 2 +- .../rclcpp/executors/test_events_queue.cpp | 159 +++++++++--------- 3 files changed, 77 insertions(+), 86 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/buffers/bounded_events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/bounded_events_queue.hpp index 147389da5f..f153dfbdfb 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/bounded_events_queue.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/bounded_events_queue.hpp @@ -18,7 +18,7 @@ #include #include -#include "rclcpp/executors/events_queue.hpp" +#include "rclcpp/experimental/buffers/events_queue.hpp" namespace rclcpp { diff --git a/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp index b0685e7ad1..53c99dba7d 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp @@ -18,7 +18,7 @@ #include #include -#include "rclcpp/executors/events_queue.hpp" +#include "rclcpp/experimental/buffers/events_queue.hpp" namespace rclcpp { diff --git a/rclcpp/test/rclcpp/executors/test_events_queue.cpp b/rclcpp/test/rclcpp/executors/test_events_queue.cpp index bcdb2c6144..4888d62c32 100644 --- a/rclcpp/test/rclcpp/executors/test_events_queue.cpp +++ b/rclcpp/test/rclcpp/executors/test_events_queue.cpp @@ -12,18 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. #include -#include +#include + #include -#include "rclcpp/executors/events_executor.hpp" +#include "rclcpp/experimental/buffers/simple_events_queue.hpp" #include "rclcpp/experimental/buffers/bounded_events_queue.hpp" -#include "test_msgs/msg/empty.hpp" - using namespace std::chrono_literals; -using rclcpp::executors::EventsExecutor; - class TestEventsQueue : public ::testing::Test { public: @@ -38,109 +35,103 @@ class TestEventsQueue : public ::testing::Test } }; -TEST_F(TestEventsQueue, BoundedQueue) +TEST_F(TestEventsQueue, SimpleQueueTest) { - // Create BoundedEventsQueue and set limit to 10 events. - auto bounded_queue = std::make_unique(10); + // Create a SimpleEventsQueue and a local queue + auto simple_queue = std::make_unique(); + std::queue local_events_queue; + + // Make sure the queue is empty after init + simple_queue->init(); + EXPECT_TRUE(simple_queue->empty()); - // Create an events executor using the bounded queue - EventsExecutor executor_sub(std::move(bounded_queue)); + // Push 11 messages + for (int i = 0; i < 11; i++) { + rmw_listener_event_t stub_event; + simple_queue->push(stub_event); + } - // Create a subscription node - auto node_sub = std::make_shared("node_sub"); + // Pop one message + simple_queue->pop(); - size_t callback_count = 0; + local_events_queue = simple_queue->get_all_events(); - auto subscription = - node_sub->create_subscription( - "topic", - rclcpp::QoS(10), - [&](test_msgs::msg::Empty::SharedPtr) { - callback_count++; - }); + // We should have (11 - 1) events in the local queue + size_t local_queue_size = local_events_queue.size(); - // Add susbscription node to the executor, so the queue can start getting events - executor_sub.add_node(node_sub); + // The local queue size should be 10 + EXPECT_EQ(10u, local_queue_size); - // Create a publisher node - auto node_pub = std::make_shared("node_pub"); - auto publisher = node_pub->create_publisher("topic", rclcpp::QoS(10)); + // The simple queue should be empty after taking all events + EXPECT_TRUE(simple_queue->empty()); - // Let subscriptions executor spin to execute any previous events - // not related to the subscription, so we start with an empty queue - executor_sub.spin_some(10ms); + // Lets push an event into the queue and get it back + rmw_listener_event_t push_event = {simple_queue.get(), SUBSCRIPTION_EVENT}; - // Publish 11 messages, the eleventh msg should prune the queue - // and we should end up with only one event on it - for (int i = 0; i < 11; i++) { - publisher->publish(std::make_unique()); - std::this_thread::sleep_for(1ms); - } + simple_queue->push(push_event); - // Let subscriptions executor spin - executor_sub.spin_some(10ms); + rmw_listener_event_t front_event = simple_queue->front(); - // The callback count should be 1 - EXPECT_EQ(1u, callback_count); + // The events should be equal + EXPECT_EQ(push_event.entity, front_event.entity); + EXPECT_EQ(push_event.type, front_event.type); +} - // Reset callback count - callback_count = 0; - // Publish 5 messages - for (int i = 0; i < 5; i++) { - publisher->publish(std::make_unique()); - std::this_thread::sleep_for(1ms); +TEST_F(TestEventsQueue, BoundedQueueTest) +{ + // Create a BoundedEventsQueue with limit of 10 events and a local events queue + auto bounded_queue = std::make_unique(10); + std::queue local_events_queue; + + // Make sure the queue is empty after init + bounded_queue->init(); + EXPECT_TRUE(bounded_queue->empty()); + + // Push 11 messages, the eleventh msg should prune the queue + // and we should end up with only one event on it + for (int i = 0; i < 11; i++) { + rmw_listener_event_t stub_event; + bounded_queue->push(stub_event); } - // Let subscriptions executor spin - executor_sub.spin_some(10ms); + local_events_queue = bounded_queue->get_all_events(); - // The callback count should be 5, the queue shouldn't have been pruned - EXPECT_EQ(5u, callback_count); -} + size_t local_queue_size = local_events_queue.size(); -TEST_F(TestEventsQueue, SimpleQueue) -{ - // Create SimpleEventsQueue - auto simple_queue = std::make_unique(); + // The queue size should be 1 + EXPECT_EQ(1u, local_queue_size); + + // The bounded queue should be empty after taking all events + EXPECT_TRUE(bounded_queue->empty()); - // Create an events executor using the simple queue - EventsExecutor executor_sub(std::move(simple_queue)); + // Push 5 messages + for (int i = 0; i < 5; i++) { + rmw_listener_event_t stub_event; + bounded_queue->push(stub_event); + } - // Create a subscription node with QoS->depth = 10 - auto node_sub = std::make_shared("node_sub"); + // Pop one message + bounded_queue->pop(); - size_t callback_count = 0; + local_events_queue = bounded_queue->get_all_events(); - auto subscription = - node_sub->create_subscription( - "topic", - rclcpp::QoS(10), - [&](test_msgs::msg::Empty::SharedPtr) { - callback_count++; - }); + local_queue_size = local_events_queue.size(); - // Add susbscription node to the executor, so the queue can start getting events - executor_sub.add_node(node_sub); + // The local queue size should be 4 as the bounded queue shouldn't have been pruned + EXPECT_EQ(4u, local_queue_size); - // Create a publisher node - auto node_pub = std::make_shared("node_pub"); - auto publisher = node_pub->create_publisher("topic", rclcpp::QoS(10)); + // The bounded queue should be empty after taking all events + EXPECT_TRUE(bounded_queue->empty()); - // Let subscriptions executor spin to execute any previous events - // not related to the subscription, so we start with an empty queue - executor_sub.spin_some(10ms); + // Lets push an event into the queue and get it back + rmw_listener_event_t push_event = {bounded_queue.get(), WAITABLE_EVENT}; - // Publish 11 messages, but as the subscription has only 10 as queue depth, - // we should lost a message (the events queue finish with 11 messages but one is not valid) - for (int i = 0; i < 11; i++) { - publisher->publish(std::make_unique()); - std::this_thread::sleep_for(1ms); - } + bounded_queue->push(push_event); - // Let subscriptions executor spin - executor_sub.spin_some(10ms); + rmw_listener_event_t front_event = bounded_queue->front(); - // The callback count should be 10 - EXPECT_EQ(10u, callback_count); + // The events should be the equal + EXPECT_EQ(push_event.entity, front_event.entity); + EXPECT_EQ(push_event.type, front_event.type); } From c4ac6f9ee7f4836cc8aa0b7ff7a7d1e4093a0005 Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Mon, 8 Feb 2021 13:55:51 -0300 Subject: [PATCH 9/9] Remove BoundedQueue, not ready yet for this --- .../buffers/bounded_events_queue.hpp | 140 ------------------ .../rclcpp/executors/test_events_queue.cpp | 60 -------- 2 files changed, 200 deletions(-) delete mode 100644 rclcpp/include/rclcpp/experimental/buffers/bounded_events_queue.hpp diff --git a/rclcpp/include/rclcpp/experimental/buffers/bounded_events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/bounded_events_queue.hpp deleted file mode 100644 index f153dfbdfb..0000000000 --- a/rclcpp/include/rclcpp/experimental/buffers/bounded_events_queue.hpp +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright 2021 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__BOUNDED_EVENTS_QUEUE_HPP_ -#define RCLCPP__EXPERIMENTAL__BUFFERS__BOUNDED_EVENTS_QUEUE_HPP_ - -#include -#include - -#include "rclcpp/experimental/buffers/events_queue.hpp" - -namespace rclcpp -{ -namespace experimental -{ -namespace buffers -{ - -/** - * @brief This class provides a bounded queue implementation - * based on a std::queue. Before pushing events into the queue - * checks the queue size. In case of exceeding the size it performs - * a prune of the queue. - */ -class BoundedEventsQueue : public EventsQueue -{ -public: - RCLCPP_PUBLIC - explicit BoundedEventsQueue(size_t queue_size_limit) - { - queue_size_limit_ = queue_size_limit; - } - - RCLCPP_PUBLIC - ~BoundedEventsQueue() = default; - - /** - * @brief push event into the queue - * @param event The event to push into the queue - */ - RCLCPP_PUBLIC - virtual - void - push(const rmw_listener_event_t & event) - { - if (event_queue_.size() >= queue_size_limit_) { - // Simple prune strategy: Remove all elements in the queue. - this->init(); - } - event_queue_.push(event); - } - - /** - * @brief removes front element from the queue - * The element removed is the "oldest" element in the queue whose - * value can be retrieved by calling member front(). - */ - RCLCPP_PUBLIC - virtual - void - pop() - { - event_queue_.pop(); - } - - /** - * @brief gets the front event from the queue - * @return the front event - */ - RCLCPP_PUBLIC - virtual - rmw_listener_event_t - front() const - { - return event_queue_.front(); - } - - /** - * @brief Test whether queue is empty - * @return true if the queue's size is 0, false otherwise. - */ - RCLCPP_PUBLIC - virtual - bool - empty() const - { - return event_queue_.empty(); - } - - /** - * @brief Initializes the queue - */ - RCLCPP_PUBLIC - virtual - void - init() - { - // Make sure the queue is empty when we start - std::queue local_queue; - std::swap(event_queue_, local_queue); - } - - - /** - * @brief gets a queue with all events accumulated on it since - * the last call. The member queue is empty when the call returns. - * @return std::queue with events - */ - RCLCPP_PUBLIC - virtual - std::queue - get_all_events() - { - std::queue local_queue; - std::swap(event_queue_, local_queue); - return local_queue; - } - -private: - std::queue event_queue_; - size_t queue_size_limit_; -}; - -} // namespace buffers -} // namespace experimental -} // namespace rclcpp - - -#endif // RCLCPP__EXPERIMENTAL__BUFFERS__BOUNDED_EVENTS_QUEUE_HPP_ diff --git a/rclcpp/test/rclcpp/executors/test_events_queue.cpp b/rclcpp/test/rclcpp/executors/test_events_queue.cpp index 4888d62c32..7996731ea2 100644 --- a/rclcpp/test/rclcpp/executors/test_events_queue.cpp +++ b/rclcpp/test/rclcpp/executors/test_events_queue.cpp @@ -17,7 +17,6 @@ #include #include "rclcpp/experimental/buffers/simple_events_queue.hpp" -#include "rclcpp/experimental/buffers/bounded_events_queue.hpp" using namespace std::chrono_literals; @@ -76,62 +75,3 @@ TEST_F(TestEventsQueue, SimpleQueueTest) EXPECT_EQ(push_event.entity, front_event.entity); EXPECT_EQ(push_event.type, front_event.type); } - - -TEST_F(TestEventsQueue, BoundedQueueTest) -{ - // Create a BoundedEventsQueue with limit of 10 events and a local events queue - auto bounded_queue = std::make_unique(10); - std::queue local_events_queue; - - // Make sure the queue is empty after init - bounded_queue->init(); - EXPECT_TRUE(bounded_queue->empty()); - - // Push 11 messages, the eleventh msg should prune the queue - // and we should end up with only one event on it - for (int i = 0; i < 11; i++) { - rmw_listener_event_t stub_event; - bounded_queue->push(stub_event); - } - - local_events_queue = bounded_queue->get_all_events(); - - size_t local_queue_size = local_events_queue.size(); - - // The queue size should be 1 - EXPECT_EQ(1u, local_queue_size); - - // The bounded queue should be empty after taking all events - EXPECT_TRUE(bounded_queue->empty()); - - // Push 5 messages - for (int i = 0; i < 5; i++) { - rmw_listener_event_t stub_event; - bounded_queue->push(stub_event); - } - - // Pop one message - bounded_queue->pop(); - - local_events_queue = bounded_queue->get_all_events(); - - local_queue_size = local_events_queue.size(); - - // The local queue size should be 4 as the bounded queue shouldn't have been pruned - EXPECT_EQ(4u, local_queue_size); - - // The bounded queue should be empty after taking all events - EXPECT_TRUE(bounded_queue->empty()); - - // Lets push an event into the queue and get it back - rmw_listener_event_t push_event = {bounded_queue.get(), WAITABLE_EVENT}; - - bounded_queue->push(push_event); - - rmw_listener_event_t front_event = bounded_queue->front(); - - // The events should be the equal - EXPECT_EQ(push_event.entity, front_event.entity); - EXPECT_EQ(push_event.type, front_event.type); -}