From f482897b0ee0f7ef5e775ee3b2a16b8f9899fc48 Mon Sep 17 00:00:00 2001 From: Pablo Garrido Date: Thu, 25 Nov 2021 16:41:37 +0100 Subject: [PATCH 1/2] Multithread improvements (#205) * Minor fixes * Add rmw_wait mutex * Protect static_buffer_memory access * Use new ping API * Update ci.yml * Fix * Fix linter * Apply suggestions from code review Co-authored-by: Antonio Cuadros <49162117+Acuadros95@users.noreply.github.com> * Fix * Delete old unlock mutex * Update .github/workflows/ci.yml Co-authored-by: Antonio Cuadros <49162117+Acuadros95@users.noreply.github.com> Co-authored-by: Antonio Cuadros (cherry picked from commit 5de3d7687fff033dffa62594c3ead55ced76c724) # Conflicts: # .github/workflows/ci.yml --- .github/workflows/ci.yml | 5 ++ .../include/rmw_microros/ping.h | 2 + rmw_microxrcedds_c/src/callbacks.c | 48 +++++++++++-------- rmw_microxrcedds_c/src/rmw_init.c | 7 +++ rmw_microxrcedds_c/src/rmw_microros/ping.c | 12 ++--- .../src/rmw_microros_internal/memory.h | 2 +- .../src/rmw_microros_internal/types.h | 8 ++++ rmw_microxrcedds_c/src/rmw_request.c | 5 ++ rmw_microxrcedds_c/src/rmw_response.c | 5 ++ rmw_microxrcedds_c/src/rmw_take.c | 5 ++ rmw_microxrcedds_c/src/rmw_wait.c | 14 ++++-- rmw_microxrcedds_c/src/types.c | 6 +++ 12 files changed, 88 insertions(+), 31 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a8999a7b..8971f77a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,9 +22,14 @@ jobs: - name: Download dependencies run: | git clone -b foxy https://github.com/eProsima/Micro-CDR src/Micro-CDR +<<<<<<< HEAD git clone -b foxy https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client git clone -b main https://github.com/micro-ROS/rosidl_typesupport_microxrcedds src/rosidl_typesupport_microxrcedds git clone -b master https://github.com/ros2/rmw src/rmw +======= + git clone -b ros2 https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client + git clone -b galactic https://github.com/micro-ROS/rosidl_typesupport_microxrcedds src/rosidl_typesupport_microxrcedds +>>>>>>> 5de3d76 (Multithread improvements (#205)) touch src/rosidl_typesupport_microxrcedds/test/COLCON_IGNORE # Install coverage tools apt update && apt install -y python3-pip diff --git a/rmw_microxrcedds_c/include/rmw_microros/ping.h b/rmw_microxrcedds_c/include/rmw_microros/ping.h index aac8c4be..1375475a 100644 --- a/rmw_microxrcedds_c/include/rmw_microros/ping.h +++ b/rmw_microxrcedds_c/include/rmw_microros/ping.h @@ -43,6 +43,7 @@ extern "C" * \brief Check if micro-ROS Agent is up and running. * This function can be called even when the micro-ROS context has not yet been * initialized by the application logics. + * This function cannot be called concurrently with `rmw_init()` or `rmw_shutdown()`. * \param[in] timeout_ms Timeout in ms (per attempt). * \param[in] attempts Number of tries before considering the ping as failed. * \return RMW_RET_OK If micro-ROS Agent is available. @@ -56,6 +57,7 @@ rmw_ret_t rmw_uros_ping_agent( * \brief Check if micro-ROS Agent is up and running using the transport set on the given rmw options. * This function can be called even when the micro-ROS context has not yet been initialized. * The transport will be initialized and closed once during the ping process. + * This function cannot be called concurrently with `rmw_init()` or `rmw_shutdown()`. * \param[in] timeout_ms Timeout in ms (per attempt). * \param[in] attempts Number of tries before considering the ping as failed. * \param[in] rmw_options rmw options with populated transport parameters. diff --git a/rmw_microxrcedds_c/src/callbacks.c b/rmw_microxrcedds_c/src/callbacks.c index f0b504a3..8a827336 100644 --- a/rmw_microxrcedds_c/src/callbacks.c +++ b/rmw_microxrcedds_c/src/callbacks.c @@ -70,10 +70,13 @@ void on_topic( if ((custom_subscription->datareader_id.id == object_id.id) && (custom_subscription->datareader_id.type == object_id.type)) { + UXR_LOCK(&static_buffer_memory.mutex); + rmw_uxrce_mempool_item_t * memory_node = rmw_uxrce_get_static_input_buffer_for_entity( custom_subscription, custom_subscription->qos); if (!memory_node) { RMW_SET_ERROR_MSG("Not available static buffer memory node"); + UXR_UNLOCK(&static_buffer_memory.mutex); return; } @@ -86,14 +89,14 @@ void on_topic( length)) { put_memory(&static_buffer_memory, memory_node); - return; + } else { + static_buffer->owner = (void *) custom_subscription; + static_buffer->length = length; + static_buffer->timestamp = rmw_uros_epoch_nanos(); + static_buffer->entity_type = RMW_UXRCE_ENTITY_TYPE_SUBSCRIPTION; } - static_buffer->owner = (void *) custom_subscription; - static_buffer->length = length; - static_buffer->timestamp = rmw_uros_epoch_nanos(); - static_buffer->entity_type = RMW_UXRCE_ENTITY_TYPE_SUBSCRIPTION; - + UXR_UNLOCK(&static_buffer_memory.mutex); return; } subscription_item = subscription_item->next; @@ -119,10 +122,13 @@ void on_request( // Check if request is related to the service rmw_uxrce_service_t * custom_service = (rmw_uxrce_service_t *)service_item->data; if (custom_service->service_data_resquest == request_id) { + UXR_LOCK(&static_buffer_memory.mutex); + rmw_uxrce_mempool_item_t * memory_node = rmw_uxrce_get_static_input_buffer_for_entity( custom_service, custom_service->qos); if (!memory_node) { RMW_SET_ERROR_MSG("Not available static buffer memory node"); + UXR_UNLOCK(&static_buffer_memory.mutex); return; } @@ -135,14 +141,15 @@ void on_request( length)) { put_memory(&static_buffer_memory, memory_node); - return; + } else { + static_buffer->owner = (void *) custom_service; + static_buffer->length = length; + static_buffer->related.sample_id = *sample_id; + static_buffer->timestamp = rmw_uros_epoch_nanos(); + static_buffer->entity_type = RMW_UXRCE_ENTITY_TYPE_SERVICE; } - static_buffer->owner = (void *) custom_service; - static_buffer->length = length; - static_buffer->related.sample_id = *sample_id; - static_buffer->timestamp = rmw_uros_epoch_nanos(); - static_buffer->entity_type = RMW_UXRCE_ENTITY_TYPE_SERVICE; + UXR_UNLOCK(&static_buffer_memory.mutex); return; } @@ -169,10 +176,13 @@ void on_reply( // Check if reply is related to the client rmw_uxrce_client_t * custom_client = (rmw_uxrce_client_t *)client_item->data; if (custom_client->client_data_request == request_id) { + UXR_LOCK(&static_buffer_memory.mutex); + rmw_uxrce_mempool_item_t * memory_node = rmw_uxrce_get_static_input_buffer_for_entity( custom_client, custom_client->qos); if (!memory_node) { RMW_SET_ERROR_MSG("Not available static buffer memory node"); + UXR_UNLOCK(&static_buffer_memory.mutex); return; } @@ -185,14 +195,14 @@ void on_reply( length)) { put_memory(&static_buffer_memory, memory_node); - return; + } else { + static_buffer->owner = (void *) custom_client; + static_buffer->length = length; + static_buffer->related.reply_id = reply_id; + static_buffer->timestamp = rmw_uros_epoch_nanos(); + static_buffer->entity_type = RMW_UXRCE_ENTITY_TYPE_CLIENT; } - - static_buffer->owner = (void *) custom_client; - static_buffer->length = length; - static_buffer->related.reply_id = reply_id; - static_buffer->timestamp = rmw_uros_epoch_nanos(); - static_buffer->entity_type = RMW_UXRCE_ENTITY_TYPE_CLIENT; + UXR_UNLOCK(&static_buffer_memory.mutex); return; } diff --git a/rmw_microxrcedds_c/src/rmw_init.c b/rmw_microxrcedds_c/src/rmw_init.c index 3f635661..73b2b6e1 100644 --- a/rmw_microxrcedds_c/src/rmw_init.c +++ b/rmw_microxrcedds_c/src/rmw_init.c @@ -204,6 +204,13 @@ rmw_init( context->implementation_identifier = eprosima_microxrcedds_identifier; context->actual_domain_id = options->domain_id; +#ifdef UCLIENT_PROFILE_MULTITHREAD + if (!rmw_uxrce_wait_mutex_initialized) { + UXR_INIT_LOCK(&rmw_uxrce_wait_mutex); + rmw_uxrce_wait_mutex_initialized = true; + } +#endif // UCLIENT_PROFILE_MULTITHREAD + rmw_uxrce_init_session_memory(&session_memory, custom_sessions, RMW_UXRCE_MAX_SESSIONS); rmw_uxrce_init_static_input_buffer_memory( &static_buffer_memory, custom_static_buffers, diff --git a/rmw_microxrcedds_c/src/rmw_microros/ping.c b/rmw_microxrcedds_c/src/rmw_microros/ping.c index b84b1727..17e68cc5 100644 --- a/rmw_microxrcedds_c/src/rmw_microros/ping.c +++ b/rmw_microxrcedds_c/src/rmw_microros/ping.c @@ -33,9 +33,8 @@ rmw_ret_t rmw_uros_ping_agent( { bool success = false; - UXR_LOCK(&session_memory.mutex); - - if (NULL == session_memory.allocateditems) { + if (!session_memory.is_initialized || NULL == session_memory.allocateditems) { + // There is no session available to ping. Init transport is required. #ifdef RMW_UXRCE_TRANSPORT_SERIAL uxrSerialTransport transport; #elif defined(RMW_UXRCE_TRANSPORT_UDP) @@ -54,24 +53,23 @@ rmw_ret_t rmw_uros_ping_agent( rmw_ret_t ret = rmw_uxrce_transport_init(NULL, NULL, (void *)&transport); if (RMW_RET_OK != ret) { - UXR_UNLOCK(&session_memory.mutex); return ret; } success = uxr_ping_agent_attempts(&transport.comm, timeout_ms, attempts); CLOSE_TRANSPORT(&transport); } else { + // There is a session available to ping. Using session. rmw_uxrce_mempool_item_t * item = session_memory.allocateditems; do { rmw_context_impl_t * context = (rmw_context_impl_t *)item->data; - success = uxr_ping_agent_attempts(&context->transport.comm, timeout_ms, attempts); + success = uxr_ping_agent_session(&context->session, timeout_ms, attempts); + item = item->next; } while (NULL != item && !success); } - UXR_UNLOCK(&session_memory.mutex); - return success ? RMW_RET_OK : RMW_RET_ERROR; } diff --git a/rmw_microxrcedds_c/src/rmw_microros_internal/memory.h b/rmw_microxrcedds_c/src/rmw_microros_internal/memory.h index 0c55a8a4..40813664 100644 --- a/rmw_microxrcedds_c/src/rmw_microros_internal/memory.h +++ b/rmw_microxrcedds_c/src/rmw_microros_internal/memory.h @@ -39,7 +39,7 @@ typedef struct rmw_uxrce_mempool_t #ifdef UCLIENT_PROFILE_MULTITHREAD uxrMutex mutex; -#endif // RMW_MICROROS_INTERNAL__UCLIENT_PROFILE_MULTITHREAD +#endif // UCLIENT_PROFILE_MULTITHREAD } rmw_uxrce_mempool_t; bool has_memory( diff --git a/rmw_microxrcedds_c/src/rmw_microros_internal/types.h b/rmw_microxrcedds_c/src/rmw_microros_internal/types.h index a7ea76ac..45cadb25 100644 --- a/rmw_microxrcedds_c/src/rmw_microros_internal/types.h +++ b/rmw_microxrcedds_c/src/rmw_microros_internal/types.h @@ -294,6 +294,14 @@ extern rmw_uxrce_wait_set_t custom_wait_set[RMW_UXRCE_MAX_WAIT_SETS]; extern rmw_uxrce_mempool_t guard_condition_memory; extern rmw_uxrce_guard_condition_t custom_guard_condition[RMW_UXRCE_MAX_GUARD_CONDITION]; +// Global mutexs +#ifdef UCLIENT_PROFILE_MULTITHREAD +// This mutex protects `need_to_be_ran` member of `session_memory` elements +// between concurrent calls to `rmw_wait()` +extern uxrMutex rmw_uxrce_wait_mutex; +extern bool rmw_uxrce_wait_mutex_initialized; +#endif // UCLIENT_PROFILE_MULTITHREAD + // Memory init functions #define RMW_INIT_DEFINE_MEMORY(X) \ diff --git a/rmw_microxrcedds_c/src/rmw_request.c b/rmw_microxrcedds_c/src/rmw_request.c index 74fe6795..1ed7a1c1 100644 --- a/rmw_microxrcedds_c/src/rmw_request.c +++ b/rmw_microxrcedds_c/src/rmw_request.c @@ -83,10 +83,13 @@ rmw_take_request( rmw_uxrce_clean_expired_static_input_buffer(); + UXR_LOCK(&static_buffer_memory.mutex); + // Find first related item in static buffer memory pool rmw_uxrce_mempool_item_t * static_buffer_item = rmw_uxrce_find_static_input_buffer_by_owner((void *) custom_service); if (static_buffer_item == NULL) { + UXR_UNLOCK(&static_buffer_memory.mutex); return RMW_RET_ERROR; } @@ -123,6 +126,8 @@ rmw_take_request( put_memory(&static_buffer_memory, static_buffer_item); + UXR_UNLOCK(&static_buffer_memory.mutex); + if (taken != NULL) { *taken = deserialize_rv; } diff --git a/rmw_microxrcedds_c/src/rmw_response.c b/rmw_microxrcedds_c/src/rmw_response.c index 7fd118ba..e01332ff 100644 --- a/rmw_microxrcedds_c/src/rmw_response.c +++ b/rmw_microxrcedds_c/src/rmw_response.c @@ -97,10 +97,13 @@ rmw_take_response( rmw_uxrce_clean_expired_static_input_buffer(); + UXR_LOCK(&static_buffer_memory.mutex); + // Find first related item in static buffer memory pool rmw_uxrce_mempool_item_t * static_buffer_item = rmw_uxrce_find_static_input_buffer_by_owner((void *) custom_client); if (static_buffer_item == NULL) { + UXR_UNLOCK(&static_buffer_memory.mutex); return RMW_RET_ERROR; } @@ -126,6 +129,8 @@ rmw_take_response( put_memory(&static_buffer_memory, static_buffer_item); + UXR_UNLOCK(&static_buffer_memory.mutex); + if (taken != NULL) { *taken = deserialize_rv; } diff --git a/rmw_microxrcedds_c/src/rmw_take.c b/rmw_microxrcedds_c/src/rmw_take.c index bc3905fa..8efd832a 100644 --- a/rmw_microxrcedds_c/src/rmw_take.c +++ b/rmw_microxrcedds_c/src/rmw_take.c @@ -51,10 +51,13 @@ rmw_take_with_info( rmw_uxrce_clean_expired_static_input_buffer(); + UXR_LOCK(&static_buffer_memory.mutex); + // Find first related item in static buffer memory pool rmw_uxrce_mempool_item_t * static_buffer_item = rmw_uxrce_find_static_input_buffer_by_owner( (void *) custom_subscription); if (static_buffer_item == NULL) { + UXR_UNLOCK(&static_buffer_memory.mutex); return RMW_RET_ERROR; } @@ -73,6 +76,8 @@ rmw_take_with_info( put_memory(&static_buffer_memory, static_buffer_item); + UXR_UNLOCK(&static_buffer_memory.mutex); + if (taken != NULL) { *taken = deserialize_rv; } diff --git a/rmw_microxrcedds_c/src/rmw_wait.c b/rmw_microxrcedds_c/src/rmw_wait.c index ff67af07..32d1964a 100644 --- a/rmw_microxrcedds_c/src/rmw_wait.c +++ b/rmw_microxrcedds_c/src/rmw_wait.c @@ -34,7 +34,14 @@ rmw_wait( (void)events; (void)wait_set; + // With `rmw_uxrce_wait_mutex` member `need_to_be_ran` is protected. + // `session_memory` itself is not protected because it is not modified between + // rmw_init and rmw_shutdown, and rmw_wait cannot be called concurrently with + // those functions. + UXR_LOCK(&rmw_uxrce_wait_mutex); + if (!services && !clients && !subscriptions && !guard_conditions) { + UXR_UNLOCK(&rmw_uxrce_wait_mutex); return RMW_RET_OK; } @@ -53,8 +60,6 @@ rmw_wait( rmw_uxrce_clean_expired_static_input_buffer(); - UXR_LOCK(&session_memory.mutex); - // Clear run flag for all sessions rmw_uxrce_mempool_item_t * item = session_memory.allocateditems; while (item != NULL) { @@ -63,6 +68,7 @@ rmw_wait( item = item->next; } + // TODO(pablogs9): What happens if there already data in one entity? // Enable flag for every XRCE session available in the entities for (size_t i = 0; services && i < services->service_count; ++i) { rmw_uxrce_service_t * custom_service = (rmw_uxrce_service_t *)services->services[i]; @@ -91,7 +97,7 @@ rmw_wait( // There is no context that contais any of the wait set entities. Nothing to wait here. if (available_contexts == 0) { - UXR_UNLOCK(&session_memory.mutex); + UXR_UNLOCK(&rmw_uxrce_wait_mutex); return RMW_RET_OK; } @@ -108,7 +114,7 @@ rmw_wait( item = item->next; } - UXR_UNLOCK(&session_memory.mutex); + UXR_UNLOCK(&rmw_uxrce_wait_mutex); bool buffered_status = false; diff --git a/rmw_microxrcedds_c/src/types.c b/rmw_microxrcedds_c/src/types.c index 0e0d2064..32c55321 100644 --- a/rmw_microxrcedds_c/src/types.c +++ b/rmw_microxrcedds_c/src/types.c @@ -67,6 +67,12 @@ rmw_uxrce_wait_set_t custom_wait_set[RMW_UXRCE_MAX_WAIT_SETS]; rmw_uxrce_mempool_t guard_condition_memory; rmw_uxrce_guard_condition_t custom_guard_condition[RMW_UXRCE_MAX_GUARD_CONDITION]; +// Global mutexs +#ifdef UCLIENT_PROFILE_MULTITHREAD +uxrMutex rmw_uxrce_wait_mutex; +bool rmw_uxrce_wait_mutex_initialized = false; +#endif // UCLIENT_PROFILE_MULTITHREAD + // Memory init functions #define RMW_INIT_MEMORY(X) \ From 24b6fd793cefe7aedbe2f116d5d562381bf5e457 Mon Sep 17 00:00:00 2001 From: Pablo Garrido Date: Thu, 25 Nov 2021 16:44:45 +0100 Subject: [PATCH 2/2] Resolve conflicts Signed-off-by: Pablo Garrido --- .github/workflows/ci.yml | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8971f77a..2217e8fb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,15 +21,10 @@ jobs: - name: Download dependencies run: | - git clone -b foxy https://github.com/eProsima/Micro-CDR src/Micro-CDR -<<<<<<< HEAD - git clone -b foxy https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client + git clone -b ros2 https://github.com/eProsima/Micro-CDR src/Micro-CDR + git clone -b ros2 https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client git clone -b main https://github.com/micro-ROS/rosidl_typesupport_microxrcedds src/rosidl_typesupport_microxrcedds git clone -b master https://github.com/ros2/rmw src/rmw -======= - git clone -b ros2 https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client - git clone -b galactic https://github.com/micro-ROS/rosidl_typesupport_microxrcedds src/rosidl_typesupport_microxrcedds ->>>>>>> 5de3d76 (Multithread improvements (#205)) touch src/rosidl_typesupport_microxrcedds/test/COLCON_IGNORE # Install coverage tools apt update && apt install -y python3-pip