Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ jobs:

- name: Download dependencies
run: |
git clone -b foxy https://github.com/eProsima/Micro-CDR src/Micro-CDR
git clone -b foxy https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client
git clone -b ros2 https://github.com/eProsima/Micro-CDR src/Micro-CDR
git clone -b ros2 https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client
git clone -b main https://github.com/micro-ROS/rosidl_typesupport_microxrcedds src/rosidl_typesupport_microxrcedds
git clone -b master https://github.com/ros2/rmw src/rmw
touch src/rosidl_typesupport_microxrcedds/test/COLCON_IGNORE
Expand Down
2 changes: 2 additions & 0 deletions rmw_microxrcedds_c/include/rmw_microros/ping.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ extern "C"
* \brief Check if micro-ROS Agent is up and running.
* This function can be called even when the micro-ROS context has not yet been
* initialized by the application logics.
* This function cannot be called concurrently with `rmw_init()` or `rmw_shutdown()`.
* \param[in] timeout_ms Timeout in ms (per attempt).
* \param[in] attempts Number of tries before considering the ping as failed.
* \return RMW_RET_OK If micro-ROS Agent is available.
Expand All @@ -56,6 +57,7 @@ rmw_ret_t rmw_uros_ping_agent(
* \brief Check if micro-ROS Agent is up and running using the transport set on the given rmw options.
* This function can be called even when the micro-ROS context has not yet been initialized.
* The transport will be initialized and closed once during the ping process.
* This function cannot be called concurrently with `rmw_init()` or `rmw_shutdown()`.
* \param[in] timeout_ms Timeout in ms (per attempt).
* \param[in] attempts Number of tries before considering the ping as failed.
* \param[in] rmw_options rmw options with populated transport parameters.
Expand Down
48 changes: 29 additions & 19 deletions rmw_microxrcedds_c/src/callbacks.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,13 @@ void on_topic(
if ((custom_subscription->datareader_id.id == object_id.id) &&
(custom_subscription->datareader_id.type == object_id.type))
{
UXR_LOCK(&static_buffer_memory.mutex);

rmw_uxrce_mempool_item_t * memory_node = rmw_uxrce_get_static_input_buffer_for_entity(
custom_subscription, custom_subscription->qos);
if (!memory_node) {
RMW_SET_ERROR_MSG("Not available static buffer memory node");
UXR_UNLOCK(&static_buffer_memory.mutex);
return;
}

Expand All @@ -86,14 +89,14 @@ void on_topic(
length))
{
put_memory(&static_buffer_memory, memory_node);
return;
} else {
static_buffer->owner = (void *) custom_subscription;
static_buffer->length = length;
static_buffer->timestamp = rmw_uros_epoch_nanos();
static_buffer->entity_type = RMW_UXRCE_ENTITY_TYPE_SUBSCRIPTION;
}

static_buffer->owner = (void *) custom_subscription;
static_buffer->length = length;
static_buffer->timestamp = rmw_uros_epoch_nanos();
static_buffer->entity_type = RMW_UXRCE_ENTITY_TYPE_SUBSCRIPTION;

UXR_UNLOCK(&static_buffer_memory.mutex);
return;
}
subscription_item = subscription_item->next;
Expand All @@ -119,10 +122,13 @@ void on_request(
// Check if request is related to the service
rmw_uxrce_service_t * custom_service = (rmw_uxrce_service_t *)service_item->data;
if (custom_service->service_data_resquest == request_id) {
UXR_LOCK(&static_buffer_memory.mutex);

rmw_uxrce_mempool_item_t * memory_node = rmw_uxrce_get_static_input_buffer_for_entity(
custom_service, custom_service->qos);
if (!memory_node) {
RMW_SET_ERROR_MSG("Not available static buffer memory node");
UXR_UNLOCK(&static_buffer_memory.mutex);
return;
}

Expand All @@ -135,14 +141,15 @@ void on_request(
length))
{
put_memory(&static_buffer_memory, memory_node);
return;
} else {
static_buffer->owner = (void *) custom_service;
static_buffer->length = length;
static_buffer->related.sample_id = *sample_id;
static_buffer->timestamp = rmw_uros_epoch_nanos();
static_buffer->entity_type = RMW_UXRCE_ENTITY_TYPE_SERVICE;
}

static_buffer->owner = (void *) custom_service;
static_buffer->length = length;
static_buffer->related.sample_id = *sample_id;
static_buffer->timestamp = rmw_uros_epoch_nanos();
static_buffer->entity_type = RMW_UXRCE_ENTITY_TYPE_SERVICE;
UXR_UNLOCK(&static_buffer_memory.mutex);

return;
}
Expand All @@ -169,10 +176,13 @@ void on_reply(
// Check if reply is related to the client
rmw_uxrce_client_t * custom_client = (rmw_uxrce_client_t *)client_item->data;
if (custom_client->client_data_request == request_id) {
UXR_LOCK(&static_buffer_memory.mutex);

rmw_uxrce_mempool_item_t * memory_node = rmw_uxrce_get_static_input_buffer_for_entity(
custom_client, custom_client->qos);
if (!memory_node) {
RMW_SET_ERROR_MSG("Not available static buffer memory node");
UXR_UNLOCK(&static_buffer_memory.mutex);
return;
}

Expand All @@ -185,14 +195,14 @@ void on_reply(
length))
{
put_memory(&static_buffer_memory, memory_node);
return;
} else {
static_buffer->owner = (void *) custom_client;
static_buffer->length = length;
static_buffer->related.reply_id = reply_id;
static_buffer->timestamp = rmw_uros_epoch_nanos();
static_buffer->entity_type = RMW_UXRCE_ENTITY_TYPE_CLIENT;
}

static_buffer->owner = (void *) custom_client;
static_buffer->length = length;
static_buffer->related.reply_id = reply_id;
static_buffer->timestamp = rmw_uros_epoch_nanos();
static_buffer->entity_type = RMW_UXRCE_ENTITY_TYPE_CLIENT;
UXR_UNLOCK(&static_buffer_memory.mutex);

return;
}
Expand Down
7 changes: 7 additions & 0 deletions rmw_microxrcedds_c/src/rmw_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ rmw_init(
context->implementation_identifier = eprosima_microxrcedds_identifier;
context->actual_domain_id = options->domain_id;

#ifdef UCLIENT_PROFILE_MULTITHREAD
if (!rmw_uxrce_wait_mutex_initialized) {
UXR_INIT_LOCK(&rmw_uxrce_wait_mutex);
rmw_uxrce_wait_mutex_initialized = true;
}
#endif // UCLIENT_PROFILE_MULTITHREAD

rmw_uxrce_init_session_memory(&session_memory, custom_sessions, RMW_UXRCE_MAX_SESSIONS);
rmw_uxrce_init_static_input_buffer_memory(
&static_buffer_memory, custom_static_buffers,
Expand Down
12 changes: 5 additions & 7 deletions rmw_microxrcedds_c/src/rmw_microros/ping.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ rmw_ret_t rmw_uros_ping_agent(
{
bool success = false;

UXR_LOCK(&session_memory.mutex);

if (NULL == session_memory.allocateditems) {
if (!session_memory.is_initialized || NULL == session_memory.allocateditems) {
// There is no session available to ping. Init transport is required.
#ifdef RMW_UXRCE_TRANSPORT_SERIAL
uxrSerialTransport transport;
#elif defined(RMW_UXRCE_TRANSPORT_UDP)
Expand All @@ -54,24 +53,23 @@ rmw_ret_t rmw_uros_ping_agent(
rmw_ret_t ret = rmw_uxrce_transport_init(NULL, NULL, (void *)&transport);

if (RMW_RET_OK != ret) {
UXR_UNLOCK(&session_memory.mutex);
return ret;
}

success = uxr_ping_agent_attempts(&transport.comm, timeout_ms, attempts);
CLOSE_TRANSPORT(&transport);
} else {
// There is a session available to ping. Using session.
rmw_uxrce_mempool_item_t * item = session_memory.allocateditems;
do {
rmw_context_impl_t * context = (rmw_context_impl_t *)item->data;

success = uxr_ping_agent_attempts(&context->transport.comm, timeout_ms, attempts);
success = uxr_ping_agent_session(&context->session, timeout_ms, attempts);

item = item->next;
} while (NULL != item && !success);
}

UXR_UNLOCK(&session_memory.mutex);

return success ? RMW_RET_OK : RMW_RET_ERROR;
}

Expand Down
2 changes: 1 addition & 1 deletion rmw_microxrcedds_c/src/rmw_microros_internal/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ typedef struct rmw_uxrce_mempool_t

#ifdef UCLIENT_PROFILE_MULTITHREAD
uxrMutex mutex;
#endif // RMW_MICROROS_INTERNAL__UCLIENT_PROFILE_MULTITHREAD
#endif // UCLIENT_PROFILE_MULTITHREAD
} rmw_uxrce_mempool_t;

bool has_memory(
Expand Down
8 changes: 8 additions & 0 deletions rmw_microxrcedds_c/src/rmw_microros_internal/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,14 @@ extern rmw_uxrce_wait_set_t custom_wait_set[RMW_UXRCE_MAX_WAIT_SETS];
extern rmw_uxrce_mempool_t guard_condition_memory;
extern rmw_uxrce_guard_condition_t custom_guard_condition[RMW_UXRCE_MAX_GUARD_CONDITION];

// Global mutexs
#ifdef UCLIENT_PROFILE_MULTITHREAD
// This mutex protects `need_to_be_ran` member of `session_memory` elements
// between concurrent calls to `rmw_wait()`
extern uxrMutex rmw_uxrce_wait_mutex;
extern bool rmw_uxrce_wait_mutex_initialized;
#endif // UCLIENT_PROFILE_MULTITHREAD

// Memory init functions

#define RMW_INIT_DEFINE_MEMORY(X) \
Expand Down
5 changes: 5 additions & 0 deletions rmw_microxrcedds_c/src/rmw_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,13 @@ rmw_take_request(

rmw_uxrce_clean_expired_static_input_buffer();

UXR_LOCK(&static_buffer_memory.mutex);

// Find first related item in static buffer memory pool
rmw_uxrce_mempool_item_t * static_buffer_item =
rmw_uxrce_find_static_input_buffer_by_owner((void *) custom_service);
if (static_buffer_item == NULL) {
UXR_UNLOCK(&static_buffer_memory.mutex);
return RMW_RET_ERROR;
}

Expand Down Expand Up @@ -123,6 +126,8 @@ rmw_take_request(

put_memory(&static_buffer_memory, static_buffer_item);

UXR_UNLOCK(&static_buffer_memory.mutex);

if (taken != NULL) {
*taken = deserialize_rv;
}
Expand Down
5 changes: 5 additions & 0 deletions rmw_microxrcedds_c/src/rmw_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,13 @@ rmw_take_response(

rmw_uxrce_clean_expired_static_input_buffer();

UXR_LOCK(&static_buffer_memory.mutex);

// Find first related item in static buffer memory pool
rmw_uxrce_mempool_item_t * static_buffer_item =
rmw_uxrce_find_static_input_buffer_by_owner((void *) custom_client);
if (static_buffer_item == NULL) {
UXR_UNLOCK(&static_buffer_memory.mutex);
return RMW_RET_ERROR;
}

Expand All @@ -126,6 +129,8 @@ rmw_take_response(

put_memory(&static_buffer_memory, static_buffer_item);

UXR_UNLOCK(&static_buffer_memory.mutex);

if (taken != NULL) {
*taken = deserialize_rv;
}
Expand Down
5 changes: 5 additions & 0 deletions rmw_microxrcedds_c/src/rmw_take.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,13 @@ rmw_take_with_info(

rmw_uxrce_clean_expired_static_input_buffer();

UXR_LOCK(&static_buffer_memory.mutex);

// Find first related item in static buffer memory pool
rmw_uxrce_mempool_item_t * static_buffer_item = rmw_uxrce_find_static_input_buffer_by_owner(
(void *) custom_subscription);
if (static_buffer_item == NULL) {
UXR_UNLOCK(&static_buffer_memory.mutex);
return RMW_RET_ERROR;
}

Expand All @@ -73,6 +76,8 @@ rmw_take_with_info(

put_memory(&static_buffer_memory, static_buffer_item);

UXR_UNLOCK(&static_buffer_memory.mutex);

if (taken != NULL) {
*taken = deserialize_rv;
}
Expand Down
14 changes: 10 additions & 4 deletions rmw_microxrcedds_c/src/rmw_wait.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,14 @@ rmw_wait(
(void)events;
(void)wait_set;

// With `rmw_uxrce_wait_mutex` member `need_to_be_ran` is protected.
// `session_memory` itself is not protected because it is not modified between
// rmw_init and rmw_shutdown, and rmw_wait cannot be called concurrently with
// those functions.
UXR_LOCK(&rmw_uxrce_wait_mutex);

if (!services && !clients && !subscriptions && !guard_conditions) {
UXR_UNLOCK(&rmw_uxrce_wait_mutex);
return RMW_RET_OK;
}

Expand All @@ -53,8 +60,6 @@ rmw_wait(

rmw_uxrce_clean_expired_static_input_buffer();

UXR_LOCK(&session_memory.mutex);

// Clear run flag for all sessions
rmw_uxrce_mempool_item_t * item = session_memory.allocateditems;
while (item != NULL) {
Expand All @@ -63,6 +68,7 @@ rmw_wait(
item = item->next;
}

// TODO(pablogs9): What happens if there already data in one entity?
// Enable flag for every XRCE session available in the entities
for (size_t i = 0; services && i < services->service_count; ++i) {
rmw_uxrce_service_t * custom_service = (rmw_uxrce_service_t *)services->services[i];
Expand Down Expand Up @@ -91,7 +97,7 @@ rmw_wait(

// There is no context that contais any of the wait set entities. Nothing to wait here.
if (available_contexts == 0) {
UXR_UNLOCK(&session_memory.mutex);
UXR_UNLOCK(&rmw_uxrce_wait_mutex);
return RMW_RET_OK;
}

Expand All @@ -108,7 +114,7 @@ rmw_wait(
item = item->next;
}

UXR_UNLOCK(&session_memory.mutex);
UXR_UNLOCK(&rmw_uxrce_wait_mutex);

bool buffered_status = false;

Expand Down
6 changes: 6 additions & 0 deletions rmw_microxrcedds_c/src/types.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ rmw_uxrce_wait_set_t custom_wait_set[RMW_UXRCE_MAX_WAIT_SETS];
rmw_uxrce_mempool_t guard_condition_memory;
rmw_uxrce_guard_condition_t custom_guard_condition[RMW_UXRCE_MAX_GUARD_CONDITION];

// Global mutexs
#ifdef UCLIENT_PROFILE_MULTITHREAD
uxrMutex rmw_uxrce_wait_mutex;
bool rmw_uxrce_wait_mutex_initialized = false;
#endif // UCLIENT_PROFILE_MULTITHREAD

// Memory init functions

#define RMW_INIT_MEMORY(X) \
Expand Down