diff --git a/rclcpp/include/rclcpp/executors/events_executor.hpp b/rclcpp/include/rclcpp/executors/events_executor.hpp index df63a96535..6b5ef5ec11 100644 --- a/rclcpp/include/rclcpp/executors/events_executor.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor.hpp @@ -24,6 +24,8 @@ #include "rclcpp/executors/events_executor_entities_collector.hpp" #include "rclcpp/executors/events_executor_notify_waitable.hpp" #include "rclcpp/executors/timers_manager.hpp" +#include "rclcpp/experimental/buffers/events_queue.hpp" +#include "rclcpp/experimental/buffers/simple_events_queue.hpp" #include "rclcpp/node.hpp" #include "rmw/listener_event_types.h" @@ -55,10 +57,13 @@ class EventsExecutor : public rclcpp::Executor /// Default constructor. See the default constructor for Executor. /** + * \param[in] events_queue The queue used to store events. * \param[in] options Options used to configure the executor. */ RCLCPP_PUBLIC explicit EventsExecutor( + rclcpp::experimental::buffers::EventsQueue::UniquePtr events_queue = std::make_unique< + rclcpp::experimental::buffers::SimpleEventsQueue>(), const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions()); /// Default destrcutor. @@ -190,10 +195,10 @@ class EventsExecutor : public rclcpp::Executor { std::unique_lock lock(this_executor->push_mutex_); - this_executor->event_queue_.push(event); + this_executor->events_queue_->push(event); } // Notify that the event queue has some events in it. - this_executor->event_queue_cv_.notify_one(); + this_executor->events_queue_cv_.notify_one(); } /// Extract and execute events from the queue until it is empty @@ -207,7 +212,7 @@ class EventsExecutor : public rclcpp::Executor execute_event(const rmw_listener_event_t & event); // Queue where entities can push events - EventQueue event_queue_; + rclcpp::experimental::buffers::EventsQueue::SharedPtr events_queue_; EventsExecutorEntitiesCollector::SharedPtr entities_collector_; EventsExecutorNotifyWaitable::SharedPtr executor_notifier_; @@ -215,7 +220,7 @@ class EventsExecutor : public rclcpp::Executor // Mutex to protect the insertion of events in the queue std::mutex push_mutex_; // Variable used to notify when an event is added to the queue - std::condition_variable event_queue_cv_; + std::condition_variable events_queue_cv_; // Timers manager std::shared_ptr timers_manager_; }; diff --git a/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp new file mode 100644 index 0000000000..d00a00705d --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp @@ -0,0 +1,109 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__EVENTS_QUEUE_HPP_ +#define RCLCPP__EXPERIMENTAL__BUFFERS__EVENTS_QUEUE_HPP_ + +#include + +#include "rclcpp/executors/events_executor_entities_collector.hpp" +#include "rclcpp/macros.hpp" + +#include "rmw/listener_event_types.h" + +namespace rclcpp +{ +namespace experimental +{ +namespace buffers +{ + +/** + * @brief This abstract class is intended to be used as + * a wrapper around a queue. The derived classes should chose + * which container to use and the strategies for push and prune + * events from the queue. + */ +class EventsQueue +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(EventsQueue) + + /** + * @brief Destruct the object. + */ + RCLCPP_PUBLIC + virtual ~EventsQueue() = default; + + /** + * @brief push event into the queue + * @param event The event to push into the queue + */ + RCLCPP_PUBLIC + virtual + void + push(const rmw_listener_event_t & event) = 0; + + /** + * @brief removes front element from the queue + * The element removed is the "oldest" element in the queue whose + * value can be retrieved by calling member front(). + */ + RCLCPP_PUBLIC + virtual + void + pop() = 0; + + /** + * @brief gets the front event from the queue + * @return the front event + */ + RCLCPP_PUBLIC + virtual + rmw_listener_event_t + front() const = 0; + + /** + * @brief Test whether queue is empty + * @return true if the queue's size is 0, false otherwise. + */ + RCLCPP_PUBLIC + virtual + bool + empty() const = 0; + + /** + * @brief Initializes the queue + */ + RCLCPP_PUBLIC + virtual + void + init() = 0; + + /** + * @brief gets a queue with all events accumulated on it since + * the last call. The member queue is empty when the call returns. + * @return queue with events + */ + RCLCPP_PUBLIC + virtual + std::queue + get_all_events() = 0; +}; + +} // namespace buffers +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__BUFFERS__EVENTS_QUEUE_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp new file mode 100644 index 0000000000..53c99dba7d --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp @@ -0,0 +1,130 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__SIMPLE_EVENTS_QUEUE_HPP_ +#define RCLCPP__EXPERIMENTAL__BUFFERS__SIMPLE_EVENTS_QUEUE_HPP_ + +#include +#include + +#include "rclcpp/experimental/buffers/events_queue.hpp" + +namespace rclcpp +{ +namespace experimental +{ +namespace buffers +{ + +/** + * @brief This class provides a simple queue implementation + * based on a std::queue. As the objective is having a CPU peformant + * queue, it does not performs any checks about the size of + * the queue, so the queue size could grow unbounded. + * It does not implement any pruning mechanisms. + */ +class SimpleEventsQueue : public EventsQueue +{ +public: + RCLCPP_PUBLIC + ~SimpleEventsQueue() = default; + + /** + * @brief push event into the queue + * @param event The event to push into the queue + */ + RCLCPP_PUBLIC + virtual + void + push(const rmw_listener_event_t & event) + { + event_queue_.push(event); + } + + /** + * @brief removes front element from the queue + * The element removed is the "oldest" element in the queue whose + * value can be retrieved by calling member front(). + */ + RCLCPP_PUBLIC + virtual + void + pop() + { + event_queue_.pop(); + } + + /** + * @brief gets the front event from the queue + * @return the front event + */ + RCLCPP_PUBLIC + virtual + rmw_listener_event_t + front() const + { + return event_queue_.front(); + } + + /** + * @brief Test whether queue is empty + * @return true if the queue's size is 0, false otherwise. + */ + RCLCPP_PUBLIC + virtual + bool + empty() const + { + return event_queue_.empty(); + } + + /** + * @brief Initializes the queue + */ + RCLCPP_PUBLIC + virtual + void + init() + { + // Make sure the queue is empty when we start + std::queue local_queue; + std::swap(event_queue_, local_queue); + } + + + /** + * @brief gets a queue with all events accumulated on it since + * the last call. The member queue is empty when the call returns. + * @return std::queue with events + */ + RCLCPP_PUBLIC + virtual + std::queue + get_all_events() + { + std::queue local_queue; + std::swap(event_queue_, local_queue); + return local_queue; + } + +private: + std::queue event_queue_; +}; + +} // namespace buffers +} // namespace experimental +} // namespace rclcpp + + +#endif // RCLCPP__EXPERIMENTAL__BUFFERS__SIMPLE_EVENTS_QUEUE_HPP_ diff --git a/rclcpp/src/rclcpp/executors/events_executor.cpp b/rclcpp/src/rclcpp/executors/events_executor.cpp index 97a673c5cf..f70cd2772f 100644 --- a/rclcpp/src/rclcpp/executors/events_executor.cpp +++ b/rclcpp/src/rclcpp/executors/events_executor.cpp @@ -26,6 +26,7 @@ using namespace std::chrono_literals; using rclcpp::executors::EventsExecutor; EventsExecutor::EventsExecutor( + rclcpp::experimental::buffers::EventsQueue::UniquePtr events_queue, const rclcpp::ExecutorOptions & options) : rclcpp::Executor(options) { @@ -33,6 +34,7 @@ EventsExecutor::EventsExecutor( entities_collector_ = std::make_shared(this); entities_collector_->init(); + // Setup the executor notifier to wake up the executor when some guard conditions are tiggered. // The added guard conditions are guaranteed to not go out of scope before the executor itself. executor_notifier_ = std::make_shared(); @@ -40,6 +42,11 @@ EventsExecutor::EventsExecutor( executor_notifier_->add_guard_condition(&interrupt_guard_condition_); executor_notifier_->set_events_executor_callback(this, &EventsExecutor::push_event); entities_collector_->add_waitable(executor_notifier_); + + // Get ownership of the queue used to store events. + events_queue_ = std::move(events_queue); + // Init the events queue + events_queue_->init(); } void @@ -51,7 +58,7 @@ EventsExecutor::spin() RCLCPP_SCOPE_EXIT(this->spinning.store(false);); // When condition variable is notified, check this predicate to proceed - auto has_event_predicate = [this]() {return !event_queue_.empty();}; + auto has_event_predicate = [this]() {return !events_queue_->empty();}; // Local event queue to allow entities to push events while we execute them EventQueue execution_event_queue; @@ -61,10 +68,10 @@ EventsExecutor::spin() while (rclcpp::ok(context_) && spinning.load()) { std::unique_lock push_lock(push_mutex_); // We wait here until something has been pushed to the event queue - event_queue_cv_.wait(push_lock, has_event_predicate); - // We got an event! Swap queues - std::swap(execution_event_queue, event_queue_); - // After swapping the queues, we don't need the lock anymore + events_queue_cv_.wait(push_lock, has_event_predicate); + // Local event queue to allow entities to push events while we execute them + execution_event_queue = events_queue_->get_all_events(); + // Unlock the mutex push_lock.unlock(); // Consume all available events, this queue will be empty at the end of the function this->consume_all_events(execution_event_queue); @@ -91,10 +98,8 @@ EventsExecutor::spin_some(std::chrono::nanoseconds max_duration) // - An executor event is received and processed // When condition variable is notified, check this predicate to proceed - auto has_event_predicate = [this]() {return !event_queue_.empty();}; + auto has_event_predicate = [this]() {return !events_queue_->empty();}; - // Local event queue to allow entities to push events while we execute them - EventQueue execution_event_queue; // Select the smallest between input max_duration and timer timeout auto next_timer_timeout = timers_manager_->get_head_timeout(); @@ -104,10 +109,10 @@ EventsExecutor::spin_some(std::chrono::nanoseconds max_duration) std::unique_lock push_lock(push_mutex_); // Wait until timeout or event - event_queue_cv_.wait_for(push_lock, max_duration, has_event_predicate); - // Time to swap queues as the wait is over - std::swap(execution_event_queue, event_queue_); - // After swapping the queues, we don't need the lock anymore + events_queue_cv_.wait_for(push_lock, max_duration, has_event_predicate); + // Local event queue to allow entities to push events while we execute them + EventQueue execution_event_queue = events_queue_->get_all_events(); + // We don't need the lock anymore push_lock.unlock(); // Execute all ready timers @@ -129,7 +134,7 @@ EventsExecutor::spin_all(std::chrono::nanoseconds max_duration) RCLCPP_SCOPE_EXIT(this->spinning.store(false);); // When condition variable is notified, check this predicate to proceed - auto has_event_predicate = [this]() {return !event_queue_.empty();}; + auto has_event_predicate = [this]() {return !events_queue_->empty();}; // Local event queue to allow entities to push events while we execute them EventQueue execution_event_queue; @@ -149,7 +154,7 @@ EventsExecutor::spin_all(std::chrono::nanoseconds max_duration) { // Wait once until timeout or event std::unique_lock push_lock(push_mutex_); - event_queue_cv_.wait_for(push_lock, max_duration, has_event_predicate); + events_queue_cv_.wait_for(push_lock, max_duration, has_event_predicate); } auto timeout = timers_manager_->get_head_timeout(); @@ -157,7 +162,7 @@ EventsExecutor::spin_all(std::chrono::nanoseconds max_duration) // Keep executing until no more work to do or timeout expired while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) { std::unique_lock push_lock(push_mutex_); - std::swap(execution_event_queue, event_queue_); + execution_event_queue = events_queue_->get_all_events(); push_lock.unlock(); // Exit if there is no more work to do @@ -188,7 +193,7 @@ EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout) } // When condition variable is notified, check this predicate to proceed - auto has_event_predicate = [this]() {return !event_queue_.empty();}; + auto has_event_predicate = [this]() {return !events_queue_->empty();}; rmw_listener_event_t event; bool has_event = false; @@ -196,13 +201,13 @@ EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout) { // Wait until timeout or event arrives std::unique_lock lock(push_mutex_); - event_queue_cv_.wait_for(lock, timeout, has_event_predicate); + events_queue_cv_.wait_for(lock, timeout, has_event_predicate); // Grab first event from queue if it exists - has_event = !event_queue_.empty(); + has_event = !events_queue_->empty(); if (has_event) { - event = event_queue_.front(); - event_queue_.pop(); + event = events_queue_->front(); + events_queue_->pop(); } } diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index b950c9b354..13d41a7fca 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -568,6 +568,15 @@ if(TARGET test_events_executor) target_link_libraries(test_events_executor ${PROJECT_NAME} mimick) endif() +ament_add_gtest(test_events_queue executors/test_events_queue.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}") +if(TARGET test_events_queue) + ament_target_dependencies(test_events_queue + "rcl" + "test_msgs") + target_link_libraries(test_events_queue ${PROJECT_NAME} mimick) +endif() + ament_add_gtest(test_events_executor_entities_collector executors/test_events_executor_entities_collector.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}") if(TARGET test_events_executor_entities_collector) diff --git a/rclcpp/test/rclcpp/executors/test_events_queue.cpp b/rclcpp/test/rclcpp/executors/test_events_queue.cpp new file mode 100644 index 0000000000..7996731ea2 --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_events_queue.cpp @@ -0,0 +1,77 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include +#include + +#include + +#include "rclcpp/experimental/buffers/simple_events_queue.hpp" + +using namespace std::chrono_literals; + +class TestEventsQueue : public ::testing::Test +{ +public: + void SetUp() + { + rclcpp::init(0, nullptr); + } + + void TearDown() + { + rclcpp::shutdown(); + } +}; + +TEST_F(TestEventsQueue, SimpleQueueTest) +{ + // Create a SimpleEventsQueue and a local queue + auto simple_queue = std::make_unique(); + std::queue local_events_queue; + + // Make sure the queue is empty after init + simple_queue->init(); + EXPECT_TRUE(simple_queue->empty()); + + // Push 11 messages + for (int i = 0; i < 11; i++) { + rmw_listener_event_t stub_event; + simple_queue->push(stub_event); + } + + // Pop one message + simple_queue->pop(); + + local_events_queue = simple_queue->get_all_events(); + + // We should have (11 - 1) events in the local queue + size_t local_queue_size = local_events_queue.size(); + + // The local queue size should be 10 + EXPECT_EQ(10u, local_queue_size); + + // The simple queue should be empty after taking all events + EXPECT_TRUE(simple_queue->empty()); + + // Lets push an event into the queue and get it back + rmw_listener_event_t push_event = {simple_queue.get(), SUBSCRIPTION_EVENT}; + + simple_queue->push(push_event); + + rmw_listener_event_t front_event = simple_queue->front(); + + // The events should be equal + EXPECT_EQ(push_event.entity, front_event.entity); + EXPECT_EQ(push_event.type, front_event.type); +}