diff --git a/nexus_capabilities/CMakeLists.txt b/nexus_capabilities/CMakeLists.txt index 1a1d0a07..187dd943 100644 --- a/nexus_capabilities/CMakeLists.txt +++ b/nexus_capabilities/CMakeLists.txt @@ -54,6 +54,14 @@ if(BUILD_TESTING) # MAX_LINE_LENGTH 80 # LANGUAGE CPP #) + + ament_add_catch2(context_test src/context_test.cpp) + target_link_libraries(context_test PRIVATE + ${PROJECT_NAME} + nexus_common::nexus_common + nexus_common::nexus_common_test + rmf_utils::rmf_utils + ) endif() include(GNUInstallDirs) diff --git a/nexus_capabilities/include/nexus_capabilities/context.hpp b/nexus_capabilities/include/nexus_capabilities/context.hpp index e85a5f44..10f75168 100644 --- a/nexus_capabilities/include/nexus_capabilities/context.hpp +++ b/nexus_capabilities/include/nexus_capabilities/context.hpp @@ -39,8 +39,9 @@ //============================================================================== namespace nexus { -class Context +class Context : std::enable_shared_from_this { +private: struct Private{ explicit Private() = default; }; public: using GoalHandlePtr = std::shared_ptr>; public: using TaskState = nexus_orchestrator_msgs::msg::TaskState; @@ -53,11 +54,183 @@ public: Task task; public: std::vector errors; public: std::unique_ptr bt_logging; public: std::unordered_set signals; -public: TaskState task_state; +private: uint8_t task_status = TaskState::STATUS_NONE; public: uint64_t tick_count = 0; +// Used to keep track of the last time the state was changed +public: rclcpp::Time state_transition_time; + +public: Context(Private, rclcpp_lifecycle::LifecycleNode& node) + : node(node), state_transition_time(node.now()) {} +public: static std::shared_ptr make(rclcpp_lifecycle::LifecycleNode& node) { + Private pvt; + return std::make_shared(pvt, node); +} + +public: std::shared_ptr get_ptr() { + return shared_from_this(); +} + +public: void set_task_status(uint8_t status) +{ + this->task_status = status; + this->state_transition_time = node.now(); +} + +public: TaskState get_task_state() const +{ + TaskState task_state; + task_state.task_id = this->task.task_id; + task_state.workcell_id = this->node.get_name(); + task_state.status = this->task_status; + return task_state; +} + +// bool is C1 < C2, smallest gets on top +public: static bool compare(const std::shared_ptr c1, const std::shared_ptr c2) { + using TaskState = nexus_orchestrator_msgs::msg::TaskState; + const auto s1 = c1->task_status; + const auto s2 = c2->task_status; + if (s1 == s2) + { + // For equal task state, prioritize earlier transition time to make sure + // tasks that started earlier are first in the queue + return c1->state_transition_time < c2->state_transition_time; + } + // An executing task will always have priority over a non executing task + if (s1 == TaskState::STATUS_RUNNING) + { + return true; + } + if (s1 == TaskState::STATUS_QUEUED) + { + if (s2 == TaskState::STATUS_RUNNING) + { + // Prioritize the running task + return false; + } + // Prioritize queued task + return true; + } + if (s1 == TaskState::STATUS_FINISHED || s1 == TaskState::STATUS_FAILED) + { + if (s2 == TaskState::STATUS_FINISHED || s2 == TaskState::STATUS_FAILED) + { + // Sort by time + return c1->state_transition_time < c2->state_transition_time; + } + // Push to end of queue + return false; + } + if (s1 == TaskState::STATUS_ASSIGNED) + { + if (s2 == TaskState::STATUS_RUNNING || s2 == TaskState::STATUS_QUEUED) + { + // Prioritize tasks that have been requested or are running already + return false; + } + return true; + } + // Undefined, just sort in order of transition time + return c1->state_transition_time < c2->state_transition_time; +} +}; + +class ContextSet { +private: + std::vector> set; + // TODO(luca) consider map of task_id to iterator + change the above to a list + + void reorder() { + std::sort(set.begin(), set.end(), [](const std::shared_ptr& ctx1, const std::shared_ptr& ctx2) { + return Context::compare(ctx1, ctx2); + }); + } + +public: + ContextSet() = default; + + std::optional> get_at(const std::size_t n) const { + if (n >= this->size()) { + return std::nullopt; + } + return set[n]; + } + + std::optional> get_task_id(const std::string& task_id) const { + for (auto it = set.begin(); it != set.end(); ++it) { + if ((*it)->task.task_id == task_id) { + return *it; + } + } + return std::nullopt; + } + + bool modify_at(const std::size_t n, std::function modifier) { + if (n >= this->size()) { + return false; + } + modifier(*set[n]); + this->reorder(); + return true; + } + + bool modify_task_id(const std::string& task_id, std::function modifier) { + for (auto it = set.begin(); it != set.end(); ++it) { + if ((*it)->task.task_id == task_id) { + modifier(**it); + this->reorder(); + return true; + } + } + return false; + } + + std::size_t size() const { + return set.size(); + } + + bool remove_at(const std::size_t n) { + if (n >= this->size()) { + return false; + } + set.erase(set.begin() + n); + return true; + } + + bool remove(const std::shared_ptr& ptr) { + for (auto it = set.begin(); it != set.end(); ++it) { + if ((*it) == ptr) { + set.erase(it); + return true; + } + } + return false; + } + + bool remove_task_id(const std::string& task_id) { + for (auto it = set.begin(); it != set.end(); ++it) { + if ((*it)->task.task_id == task_id) { + set.erase(it); + return true; + } + } + return false; + } + + bool insert(const std::shared_ptr ptr) { + const auto it = get_task_id(ptr->task.task_id); + if (it != std::nullopt) { + return false; + } + set.emplace_back(std::move(ptr)); + this->reorder(); + return true; + } + + void clear() { + set.clear(); + } -public: Context(rclcpp_lifecycle::LifecycleNode& node) - : node(node) {} }; } // namespace nexus diff --git a/nexus_capabilities/src/context_test.cpp b/nexus_capabilities/src/context_test.cpp new file mode 100644 index 00000000..531555b4 --- /dev/null +++ b/nexus_capabilities/src/context_test.cpp @@ -0,0 +1,85 @@ +#define CATCH_CONFIG_MAIN +#include + +#include +#include +#include "nexus_capabilities/context.hpp" + +#include "nexus_common_test/test_utils.hpp" + +namespace nexus::capabilities::test { + +using TaskState = nexus_orchestrator_msgs::msg::TaskState; + +bool check_order(const ContextSet& set, const std::vector expected_tasks) { + REQUIRE(set.size() == expected_tasks.size()); + std::cout << "BEGIN CHECK" << std::endl; + for (std::size_t i = 0; i < set.size(); ++i) { + const auto it_opt = set.get_at(i); + REQUIRE(it_opt != std::nullopt); + std::cout << (*it_opt)->task.task_id << " " << expected_tasks[i] << std::endl; + if ((*it_opt)->task.task_id != expected_tasks[i]) { + return false; + } + } + return true; +} + +TEST_CASE("test_context_ordering") { + ContextSet contexts; + nexus::common::test::RosFixture uut_fixture; + auto& uut_node = *uut_fixture.node; + + // We don't really need the node so we can reuse it + auto ctx1 = Context::make(uut_node); + auto ctx2 = Context::make(uut_node); + + ctx1->task.task_id = "task_1"; + ctx2->task.task_id = "task_2"; + + // Both tasks are added in a sequence, expect the first to be on top + contexts.insert(ctx1); + contexts.insert(ctx2); + + REQUIRE(contexts.size() == 2); + CHECK(check_order(contexts, {"task_1", "task_2"})); + + // Queue task_2, it should now be on top + contexts.modify_task_id("task_2", [](Context& ctx) { + ctx.set_task_status(TaskState::STATUS_QUEUED); + }); + CHECK(check_order(contexts, {"task_2", "task_1"})); + + // Now execute task_1, then add a new task afterwards, we should get task_1, task_2, task_3 + contexts.modify_task_id("task_1", [](Context& ctx) { + ctx.set_task_status(TaskState::STATUS_RUNNING); + }); + auto ctx3 = Context::make(uut_node); + ctx3->task.task_id = "task_3"; + contexts.insert(ctx3); + CHECK(check_order(contexts, {"task_1", "task_2", "task_3"})); + // If the new task gets queued, nothing changes because one is running already and another + // one was queued before + contexts.modify_task_id("task_3", [](Context& ctx) { + ctx.set_task_status(TaskState::STATUS_QUEUED); + }); + CHECK(check_order(contexts, {"task_1", "task_2", "task_3"})); + // If the task that was queued is set to running, it will still stay in its place because we + // can't pre-empt currently running tasks + contexts.modify_task_id("task_2", [](Context& ctx) { + ctx.set_task_status(TaskState::STATUS_RUNNING); + }); + CHECK(check_order(contexts, {"task_1", "task_2", "task_3"})); + // Once the task finishes or fails, it gets pushed to the end of the queue + contexts.modify_task_id("task_1", [](Context& ctx) { + ctx.set_task_status(TaskState::STATUS_FINISHED); + }); + CHECK(check_order(contexts, {"task_2", "task_3", "task_1"})); + // Unnecessary but for multiple finished / failed the earliest updated is more in front of the queue + contexts.modify_at(0, [](Context& ctx) { + ctx.set_task_status(TaskState::STATUS_FAILED); + }); + CHECK(check_order(contexts, {"task_3", "task_1", "task_2"})); +} + +} diff --git a/nexus_workcell_orchestrator/src/task_results_test.cpp b/nexus_workcell_orchestrator/src/task_results_test.cpp index cd6196b5..7baca4bb 100644 --- a/nexus_workcell_orchestrator/src/task_results_test.cpp +++ b/nexus_workcell_orchestrator/src/task_results_test.cpp @@ -34,7 +34,7 @@ namespace nexus::workcell_orchestrator::test { TEST_CASE("get and set results") { auto fixture = common::test::RosFixture{}; auto ctx_mgr = std::make_shared(); - auto ctx = std::make_shared(Context{*fixture.node}); + auto ctx = Context::make(*fixture.node); ctx_mgr->set_active_context(ctx); BT::BehaviorTreeFactory bt_factory; diff --git a/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp b/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp index fc6b14b9..54e036fb 100644 --- a/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp +++ b/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp @@ -52,7 +52,6 @@ namespace nexus::workcell_orchestrator { -using TaskState = nexus_orchestrator_msgs::msg::TaskState; using WorkcellRequest = endpoints::WorkcellRequestAction::ActionType; using WorkcellStation = nexus_orchestrator_msgs::msg::WorkcellStation; @@ -363,13 +362,8 @@ auto WorkcellOrchestrator::_configure( endpoints::WorkcellRequestAction::ActionType::Goal::ConstSharedPtr goal) { RCLCPP_DEBUG(this->get_logger(), "Got workcell task request"); - const auto it = - std::find_if(this->_ctxs.cbegin(), this->_ctxs.cend(), - [&goal](const std::shared_ptr& ctx) - { - return ctx->task.task_id == goal->task.task_id; - }); - if (it != this->_ctxs.cend() && (*it)->goal_handle) + const auto it = this->_ctxs.get_task_id(goal->task.task_id); + if (it.has_value() && (*it)->goal_handle) { RCLCPP_ERROR(this->get_logger(), "A task with the same id is already executing"); @@ -382,14 +376,9 @@ auto WorkcellOrchestrator::_configure( { RCLCPP_DEBUG(this->get_logger(), "Got cancel request"); const auto& goal = goal_handle->get_goal(); - auto it = - std::find_if(this->_ctxs.begin(), this->_ctxs.end(), - [&goal](const std::shared_ptr& ctx) - { - return goal->task.task_id == ctx->task.task_id; - }); + const auto it = this->_ctxs.get_task_id(goal->task.task_id); - if (it == this->_ctxs.end()) + if (!it.has_value()) { RCLCPP_WARN(this->get_logger(), "Fail to cancel task [%s]: task does not exist", goal->task.task_id.c_str()); @@ -397,9 +386,9 @@ auto WorkcellOrchestrator::_configure( else { // we can just remove a task that is not running - if ((*it)->task_state.status != TaskState::STATUS_RUNNING) + if ((*it)->get_task_state().status != TaskState::STATUS_RUNNING) { - this->_ctxs.erase(it); + this->_ctxs.remove(*it); } } return rclcpp_action::CancelResponse::ACCEPT; @@ -417,19 +406,14 @@ auto WorkcellOrchestrator::_configure( } std::shared_ptr ctx = nullptr; - const auto it = - std::find_if(this->_ctxs.begin(), this->_ctxs.end(), - [&goal_handle](const std::shared_ptr& ctx) - { - return ctx->task.task_id == goal_handle->get_goal()->task.task_id; - }); - if (it != this->_ctxs.end()) + const auto it = this->_ctxs.get_task_id(goal_handle->get_goal()->task.task_id); + if (it.has_value()) { ctx = *it; } else { - ctx = std::make_shared(*this); + ctx = Context::make(*this); } ctx->goal_handle = goal_handle; const auto goal = goal_handle->get_goal(); @@ -447,8 +431,6 @@ auto WorkcellOrchestrator::_configure( result); return; } - ctx->task_state.workcell_id = this->get_name(); - ctx->task_state.task_id = ctx->task.task_id; ctx->bt = this->_create_bt(ctx); ctx->bt_logging = std::make_unique( ctx->bt, @@ -465,21 +447,18 @@ auto WorkcellOrchestrator::_configure( RCLCPP_ERROR_STREAM(this->get_logger(), result->message); goal_handle->abort(result); // make sure to clear previously queued task - this->_ctxs.erase(it); + this->_ctxs.remove(*it); return; } RCLCPP_INFO(this->get_logger(), "Queuing task [%s]", goal->task.task_id.c_str()); - ctx->task_state.status = TaskState::STATUS_QUEUED; + ctx->set_task_status(TaskState::STATUS_QUEUED); auto fb = std::make_shared(); - fb->state = ctx->task_state; + fb->state = ctx->get_task_state(); goal_handle->publish_feedback(fb); - if (it == this->_ctxs.end()) - { - this->_ctxs.emplace_back(ctx); - } + this->_ctxs.insert(ctx); }); this->_wc_state_pub = @@ -533,13 +512,8 @@ auto WorkcellOrchestrator::_configure( ConstSharedPtr req, endpoints::QueueWorkcellTaskService::ServiceType::Response::SharedPtr resp) { - const auto it = - std::find_if(this->_ctxs.begin(), this->_ctxs.end(), - [&req](const std::shared_ptr& ctx) - { - return ctx->task.task_id == req->task_id; - }); - if (it != this->_ctxs.end()) + const auto it = this->_ctxs.get_task_id(req->task_id); + if (it.has_value()) { resp->success = false; resp->message = "A task with the same id already exists"; @@ -547,12 +521,10 @@ auto WorkcellOrchestrator::_configure( req->task_id.c_str(), resp->message.c_str()); return; } - const auto& ctx = - this->_ctxs.emplace_back(std::make_shared(*this)); + auto ctx = Context::make(*this); ctx->task.task_id = req->task_id; - ctx->task_state.task_id = req->task_id; - ctx->task_state.workcell_id = this->get_name(); - ctx->task_state.status = TaskState::STATUS_ASSIGNED; + ctx->set_task_status(TaskState::STATUS_ASSIGNED); + this->_ctxs.insert(ctx); resp->success = true; RCLCPP_INFO(this->get_logger(), "queued task %s", ctx->task.task_id.c_str()); }); @@ -566,21 +538,16 @@ auto WorkcellOrchestrator::_configure( { RCLCPP_DEBUG(this->get_logger(), "received request to remove pending task [%s]", req->task_id.c_str()); - const auto it = - std::find_if(this->_ctxs.begin(), this->_ctxs.end(), - [&req](const std::shared_ptr& ctx) - { - return ctx->task.task_id == req->task_id; - }); - if (it == this->_ctxs.end() || - (*it)->task_state.status != TaskState::STATUS_ASSIGNED) + const auto it = this->_ctxs.get_task_id(req->task_id); + if (!it.has_value() || + (*it)->get_task_state().status != TaskState::STATUS_ASSIGNED) { resp->success = false; resp->message = "task does not exist or is not pending"; RCLCPP_DEBUG_STREAM(this->get_logger(), resp->message); return; } - this->_ctxs.erase(it); + this->_ctxs.remove(*it); RCLCPP_INFO(this->get_logger(), "removed task [%s]", req->task_id.c_str()); resp->success = true; @@ -768,48 +735,50 @@ auto WorkcellOrchestrator::_configure( return CallbackReturn::SUCCESS; } -void WorkcellOrchestrator::_tick_bt(const std::shared_ptr& ctx) +void WorkcellOrchestrator::_tick_bt(Context& ctx) { - if (ctx->task_state.status == TaskState::STATUS_FINISHED || - ctx->task_state.status == TaskState::STATUS_FAILED) + auto task_status = ctx.get_task_state().status; + if (task_status == TaskState::STATUS_FINISHED || + task_status == TaskState::STATUS_FAILED) { RCLCPP_WARN( this->get_logger(), "Not executing task [%s] that is already finished", - ctx->task.task_id.c_str()); + ctx.task.task_id.c_str()); return; } - this->_ctx_mgr->set_active_context(ctx); - const auto& goal_handle = ctx->goal_handle; - ++ctx->tick_count; + this->_ctx_mgr->set_active_context(ctx.get_ptr()); + const auto& goal_handle = ctx.goal_handle; + ++ctx.tick_count; - if (ctx->task_state.status == TaskState::STATUS_ASSIGNED) + if (task_status == TaskState::STATUS_ASSIGNED) { // only print on first tick - if (ctx->tick_count == 1) + if (ctx.tick_count == 1) { RCLCPP_INFO( this->get_logger(), "Task [%s] is pending, call \"%s\" to start this task", - ctx->task.task_id.c_str(), + ctx.task.task_id.c_str(), endpoints::WorkcellRequestAction::action_name(this->get_name()).c_str()); } return; } - if (ctx->task_state.status == TaskState::STATUS_QUEUED) + if (task_status == TaskState::STATUS_QUEUED) { - RCLCPP_INFO(this->get_logger(), "Starting task [%s]", ctx->task.task_id.c_str()); - ctx->task_state.status = TaskState::STATUS_RUNNING; + RCLCPP_INFO(this->get_logger(), "Starting task [%s]", ctx.task.task_id.c_str()); + task_status = TaskState::STATUS_RUNNING; + ctx.set_task_status(task_status); auto fb = std::make_shared(); - fb->state = ctx->task_state; + fb->state = ctx.get_task_state(); goal_handle->publish_feedback(fb); } - if (ctx->task_state.status == TaskState::STATUS_RUNNING) + if (task_status == TaskState::STATUS_RUNNING) { try { - switch (ctx->bt.tickRoot()) + switch (ctx.bt.tickRoot()) { case BT::NodeStatus::RUNNING: return; @@ -825,7 +794,7 @@ void WorkcellOrchestrator::_tick_bt(const std::shared_ptr& ctx) RCLCPP_ERROR( this->get_logger(), "Failed executing task [%s]: unexpected BT status", - ctx->task.task_id.c_str()); + ctx.task.task_id.c_str()); this->_handle_command_failed(ctx); return; } @@ -835,7 +804,7 @@ void WorkcellOrchestrator::_tick_bt(const std::shared_ptr& ctx) { RCLCPP_ERROR(this->get_logger(), "BT failed with exception: %s", e.what()); - ctx->bt.haltTree(); + ctx.bt.haltTree(); this->_handle_command_failed(ctx); return; } @@ -843,8 +812,8 @@ void WorkcellOrchestrator::_tick_bt(const std::shared_ptr& ctx) RCLCPP_WARN( this->get_logger(), "Failed executing task [%s]: unexpected status [%u]", - ctx->task.task_id.c_str(), ctx->task_state.status); - ctx->bt.haltTree(); + ctx.task.task_id.c_str(), task_status); + ctx.bt.haltTree(); this->_handle_command_failed(ctx); return; } @@ -853,14 +822,15 @@ void WorkcellOrchestrator::_tick_all_bts() { // cancelling an ongoing task may leave the workcell in a undetermined state // so we must cancel all tasks when an ongoing task is cancelled. - for (const auto& ctx : this->_ctxs) + for (std::size_t i = 0; i < this->_ctxs.size(); ++i) { - if (ctx->task_state.status == TaskState::STATUS_RUNNING && - ctx->goal_handle->is_canceling()) + const auto& ctx = this->_ctxs.get_at(i); + if ((*ctx)->get_task_state().status == TaskState::STATUS_RUNNING && + (*ctx)->goal_handle->is_canceling()) { RCLCPP_WARN( this->get_logger(), "Cancelling all tasks because an ongoing task [%s] is being cancelled", - ctx->task.task_id.c_str()); + (*ctx)->task.task_id.c_str()); // NOTE: iterators are invalidated, it is important to // not access them after this line. this->_cancel_all_tasks(); @@ -868,36 +838,31 @@ void WorkcellOrchestrator::_tick_all_bts() } } - auto it = this->_ctxs.begin(); for (int i = 0; i < this->_max_parallel_jobs; ++i) { - if (it == this->_ctxs.end()) + uint8_t task_status; + const auto modified = this->_ctxs.modify_at(i, [this, &task_status](Context& ctx) { + this->_tick_bt(ctx); + task_status = ctx.get_task_state().status; + }); + if (!modified) { break; } - this->_tick_bt(*it); - const auto task_status = (*it)->task_state.status; - // cancel all other tasks when any task fails. note that the failing task is // removed from the list first because we don't want to cancel an already // failed task. if (task_status == TaskState::STATUS_FAILED) { - this->_ctxs.erase(it); + this->_ctxs.remove_at(i); this->_cancel_all_tasks(); break; } if (task_status == TaskState::STATUS_FINISHED) { - // NOTE: iterator is invalidated, it is important to - // not access it after this line. - it = this->_ctxs.erase(it); - } - else - { - ++it; + this->_ctxs.remove_at(i); } } @@ -906,8 +871,8 @@ void WorkcellOrchestrator::_tick_all_bts() if (this->_ctxs.size() > 0) { new_state.status = WorkcellState::STATUS_BUSY; - new_state.work_order_id = this->_ctxs.front()->task.work_order_id; - new_state.task_id = this->_ctxs.front()->task.task_id; + new_state.work_order_id = (*this->_ctxs.get_at(0))->task.work_order_id; + new_state.task_id = (*this->_ctxs.get_at(0))->task.task_id; } else if (this->_ctxs.size() == 0) { @@ -926,16 +891,17 @@ void WorkcellOrchestrator::_tick_all_bts() void WorkcellOrchestrator::_cancel_all_tasks() { - for (const auto& ctx: this->_ctxs) + for (std::size_t i = 0; i < this->_ctxs.size(); ++i) { - RCLCPP_INFO(this->get_logger(), "Canceling task [%s]", - ctx->task.task_id.c_str()); - this->_ctx_mgr->set_active_context(ctx); - ctx->bt.haltTree(); - this->_handle_command_failed(ctx); + this->_ctxs.modify_at(i, [this](Context& ctx) { + RCLCPP_INFO(this->get_logger(), "Canceling task [%s]", + ctx.task.task_id.c_str()); + this->_ctx_mgr->set_active_context(ctx.get_ptr()); + ctx.bt.haltTree(); + this->_handle_command_failed(ctx); + }); } this->_ctxs.clear(); - return; } void WorkcellOrchestrator::_register() @@ -1057,26 +1023,23 @@ void WorkcellOrchestrator::_process_signal( endpoints::SignalWorkcellService::ServiceType::Request::ConstSharedPtr req, endpoints::SignalWorkcellService::ServiceType::Response::SharedPtr resp) { - for (const auto& ctx : this->_ctxs) - { - if (ctx->task.task_id == req->task_id) - { - RCLCPP_INFO( - this->get_logger(), - "Received signal [%s] for task [%s] from system orchestrator", - req->signal.c_str(), req->task_id.c_str()); - ctx->signals.emplace(req->signal); - resp->success = true; - return; - } + const auto modified = this->_ctxs.modify_task_id(req->task_id, [this, req, resp](Context& ctx) { + RCLCPP_INFO( + this->get_logger(), + "Received signal [%s] for task [%s] from system orchestrator", + req->signal.c_str(), req->task_id.c_str()); + ctx.signals.emplace(req->signal); + resp->success = true; + }); + + if (!modified) { + std::ostringstream oss; + oss << "Received signal [" << req->signal << "] for task [" << req->task_id << + "] which is not currently running."; + resp->message = oss.str(); + RCLCPP_WARN_STREAM(this->get_logger(), resp->message); + resp->success = false; } - - std::ostringstream oss; - oss << "Received signal [" << req->signal << "] for task [" << req->task_id << - "] which is not currently running."; - resp->message = oss.str(); - RCLCPP_WARN_STREAM(this->get_logger(), resp->message); - resp->success = false; } BT::Tree WorkcellOrchestrator::_create_bt(const std::shared_ptr& ctx) @@ -1099,34 +1062,32 @@ BT::Tree WorkcellOrchestrator::_create_bt(const std::shared_ptr& ctx) bt_name)); } -void WorkcellOrchestrator::_handle_command_success( - const std::shared_ptr& ctx) +void WorkcellOrchestrator::_handle_command_success(Context& ctx) { RCLCPP_INFO(this->get_logger(), "Task finished successfully"); - ctx->task_state.status = TaskState::STATUS_FINISHED; + ctx.set_task_status(TaskState::STATUS_FINISHED); auto result = std::make_shared(); result->success = true; - result->result = YAML::Dump(ctx->task.previous_results); - auto report = ctx->bt_logging->generate_report(); + result->result = YAML::Dump(ctx.task.previous_results); + auto report = ctx.bt_logging->generate_report(); result->message = common::ReportConverter::to_string(report); RCLCPP_INFO(this->get_logger(), "%s", result->message.c_str()); - ctx->goal_handle->succeed(result); + ctx.goal_handle->succeed(result); } -void WorkcellOrchestrator::_handle_command_failed( - const std::shared_ptr& ctx) +void WorkcellOrchestrator::_handle_command_failed(Context& ctx) { RCLCPP_ERROR(this->get_logger(), "Task failed"); - ctx->task_state.status = TaskState::STATUS_FAILED; + ctx.set_task_status(TaskState::STATUS_FAILED); // Publish feedback. auto fb = std::make_shared(); - fb->state = ctx->task_state; - ctx->goal_handle->publish_feedback(std::move(fb)); + fb->state = ctx.get_task_state(); + ctx.goal_handle->publish_feedback(std::move(fb)); // Abort the action request. auto result = std::make_shared(); - ctx->goal_handle->abort(result); + ctx.goal_handle->abort(result); } void WorkcellOrchestrator::_handle_task_doable( diff --git a/nexus_workcell_orchestrator/src/workcell_orchestrator.hpp b/nexus_workcell_orchestrator/src/workcell_orchestrator.hpp index 96cd814a..2a88d67a 100644 --- a/nexus_workcell_orchestrator/src/workcell_orchestrator.hpp +++ b/nexus_workcell_orchestrator/src/workcell_orchestrator.hpp @@ -36,6 +36,8 @@ #include +#include + #include #include @@ -47,6 +49,7 @@ #include #include +#include #include namespace nexus::workcell_orchestrator { @@ -54,6 +57,8 @@ namespace nexus::workcell_orchestrator { class WorkcellOrchestrator : public rclcpp_lifecycle::LifecycleNode { +private: using TaskState = nexus_orchestrator_msgs::msg::TaskState; + private: using CallbackReturn = rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn; @@ -120,7 +125,7 @@ private: rclcpp::Service:: SharedPtr _remove_pending_task_srv; private: std::atomic _paused; -private: std::list> _ctxs; +private: ContextSet _ctxs; private: std::shared_ptr _ctx_mgr; private: std::unique_ptr> _lifecycle_mgr{ nullptr}; @@ -136,7 +141,7 @@ private: CallbackReturn _configure( /** * Tick a behavior tree. */ -private: void _tick_bt(const std::shared_ptr& ctx); +private: void _tick_bt(Context& ctx); private: void _tick_all_bts(); @@ -159,9 +164,9 @@ private: void _process_signal( */ private: BT::Tree _create_bt(const std::shared_ptr& ctx); -private: void _handle_command_success(const std::shared_ptr& ctx); +private: void _handle_command_success(Context& ctx); -private: void _handle_command_failed(const std::shared_ptr& ctx); +private: void _handle_command_failed(Context& ctx); private: void _handle_task_doable( endpoints::IsTaskDoableService::ServiceType::Request::ConstSharedPtr req,