From 0f841c678f1b8f185079b6fe4f079159c38c3fa2 Mon Sep 17 00:00:00 2001 From: disganaitis Date: Wed, 14 Jan 2026 13:03:33 +0200 Subject: [PATCH 01/12] Copied files from C_port onto the updated dds-rtps, modified log_message, updated C Port to match updates to dds-rtps --- generate_xlsx_report.py | 4 + srcC/cyclone-dds-cmake/CMakeLists.txt | 13 + srcC/shape.idl | 9 + srcC/shape_configurator_cyclone_dds.h | 50 + srcC/shape_main.c | 1727 +++++++++++++++++++++++++ 5 files changed, 1803 insertions(+) create mode 100644 srcC/cyclone-dds-cmake/CMakeLists.txt create mode 100644 srcC/shape.idl create mode 100644 srcC/shape_configurator_cyclone_dds.h create mode 100644 srcC/shape_main.c diff --git a/generate_xlsx_report.py b/generate_xlsx_report.py index 9a4351e7..d03cbf7f 100644 --- a/generate_xlsx_report.py +++ b/generate_xlsx_report.py @@ -63,6 +63,8 @@ def get_company_name(product:str) -> str: return 'eProsima' elif 'dust' in product.lower(): return 'S2E Software Systems' + elif 'cyclone' in product.lower(): + return 'Eclipse Foundation' else: raise RuntimeError('Impossible to get company name: ' + product) @@ -82,6 +84,8 @@ def get_product_name(product:str) -> str: return 'FastDDS ' + re.search(r'([\d.]+)', product).group(1) elif 'dust_dds' in product.lower(): return 'Dust DDS ' + re.search(r'([\d.]+)', product).group(1) + elif 'cyclone' in product.lower(): + return 'Cyclone DDS' else: raise RuntimeError('Impossible to get product name: ' + product) diff --git a/srcC/cyclone-dds-cmake/CMakeLists.txt b/srcC/cyclone-dds-cmake/CMakeLists.txt new file mode 100644 index 00000000..f2ddf0f3 --- /dev/null +++ b/srcC/cyclone-dds-cmake/CMakeLists.txt @@ -0,0 +1,13 @@ +cmake_minimum_required(VERSION 3.16) +project(shape LANGUAGES C) + +# Find the CycloneDDS package. +find_package(CycloneDDS REQUIRED) +set(EXECUTABLE_NAME "eclipse_cyclone-${CycloneDDS_VERSION}_shape_main_linux") +idlc_generate(TARGET shape_lib FILES "../shape.idl" WARNINGS no-implicit-extensibility) + +add_executable(${EXECUTABLE_NAME} ../shape_main.c) + +target_compile_definitions(${EXECUTABLE_NAME} PUBLIC -DCYCLONE_DDS) + +target_link_libraries(${EXECUTABLE_NAME} shape_lib CycloneDDS::ddsc m) \ No newline at end of file diff --git a/srcC/shape.idl b/srcC/shape.idl new file mode 100644 index 00000000..54a282f2 --- /dev/null +++ b/srcC/shape.idl @@ -0,0 +1,9 @@ +@appendable +struct ShapeType { + @key + string<128> color; + int32 x; + int32 y; + int32 shapesize; + sequence additional_payload_size; +}; \ No newline at end of file diff --git a/srcC/shape_configurator_cyclone_dds.h b/srcC/shape_configurator_cyclone_dds.h new file mode 100644 index 00000000..138001f2 --- /dev/null +++ b/srcC/shape_configurator_cyclone_dds.h @@ -0,0 +1,50 @@ +#ifndef _SHAPE_CONFIG_CYCLONE_ +#define _SHAPE_CONFIG_CYCLONE_ + +#include "shape.h" +#include "dds/dds.h" +#include +#include +#include +#include +#include +#include +#include + +const char* get_qos_policy_name(uint32_t last_policy_id) { + + switch (last_policy_id) { + case DDS_INVALID_QOS_POLICY_ID: return "INVALID"; + case DDS_USERDATA_QOS_POLICY_ID: return "USERDATA"; + case DDS_DURABILITY_QOS_POLICY_ID: return "DURABILITY"; + case DDS_PRESENTATION_QOS_POLICY_ID: return "PRESENTATION"; + case DDS_DEADLINE_QOS_POLICY_ID: return "DEADLINE"; + case DDS_LATENCYBUDGET_QOS_POLICY_ID: return "LATENCYBUDGET"; + case DDS_OWNERSHIP_QOS_POLICY_ID: return "OWNERSHIP"; + case DDS_OWNERSHIPSTRENGTH_QOS_POLICY_ID: return "OWNERSHIPSTRENGTH"; + case DDS_LIVELINESS_QOS_POLICY_ID: return "LIVELINESS"; + case DDS_TIMEBASEDFILTER_QOS_POLICY_ID: return "TIMEBASEDFILTER"; + case DDS_PARTITION_QOS_POLICY_ID: return "PARTITION"; + case DDS_RELIABILITY_QOS_POLICY_ID: return "RELIABILITY"; + case DDS_DESTINATIONORDER_QOS_POLICY_ID: return "DESTINATIONORDER"; + case DDS_HISTORY_QOS_POLICY_ID: return "HISTORY"; + case DDS_RESOURCELIMITS_QOS_POLICY_ID: return "RESOURCELIMITS"; + case DDS_ENTITYFACTORY_QOS_POLICY_ID: return "ENTITYFACTORY"; + case DDS_WRITERDATALIFECYCLE_QOS_POLICY_ID: return "WRITERDATALIFECYCLE"; + case DDS_READERDATALIFECYCLE_QOS_POLICY_ID: return "READERDATALIFECYCLE"; + case DDS_TOPICDATA_QOS_POLICY_ID: return "TOPICDATA"; + case DDS_GROUPDATA_QOS_POLICY_ID: return "GROUPDATA"; + case DDS_TRANSPORTPRIORITY_QOS_POLICY_ID: return "TRANSPORTPRIORITY"; + case DDS_LIFESPAN_QOS_POLICY_ID: return "LIFESPAN"; + case DDS_DURABILITYSERVICE_QOS_POLICY_ID: return "DURABILITYSERVICE"; + case DDS_PROPERTY_QOS_POLICY_ID: return "PROPERTY"; + case DDS_TYPE_CONSISTENCY_ENFORCEMENT_QOS_POLICY_ID: return "TYPE_CONSISTENCY_ENFORCEMENT"; + case DDS_DATA_REPRESENTATION_QOS_POLICY_ID: return "DATAREPRESENTATION"; + default: + return 0; + } +} + +#endif + + diff --git a/srcC/shape_main.c b/srcC/shape_main.c new file mode 100644 index 00000000..527f9e32 --- /dev/null +++ b/srcC/shape_main.c @@ -0,0 +1,1727 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include "shape_configurator_cyclone_dds.h" +#include +#include +#include +#include +#include +#include + +const size_t default_buffer_size = 256; +#define MAX_SAMPLES 500 +#define ERROR_PARSING_ARGUMENTS 1 +#define ERROR_INITIALIZING 2 +#define ERROR_RUNNING 3 + +/*************************************************************/ +int all_done = 0; +/*************************************************************/ +void +handle_sig(int sig) +{ + if (sig == SIGINT) { + all_done = 1; + } +} + +/*************************************************************/ +int +install_sig_handlers() +{ + struct sigaction int_action; + int_action.sa_handler = handle_sig; + sigemptyset(&int_action.sa_mask); + sigaddset(&int_action.sa_mask, SIGINT); + int_action.sa_flags = 0; + sigaction(SIGINT, &int_action, NULL); + return 0; +} + +typedef enum { + ERROR = 1, + DEBUG = 2, +} Verbosity; + + +const char* to_string_reliability(dds_reliability_kind_t reliability_value) +{ + if (reliability_value == DDS_RELIABILITY_BEST_EFFORT){ + return "BEST_EFFORT"; + } else if (reliability_value == DDS_RELIABILITY_RELIABLE){ + return "RELIABLE"; + } + return "Error stringifying Reliability kind."; +} + +const char* to_string_durability(dds_durability_kind_t durability_value) +{ + if ( durability_value == DDS_DURABILITY_VOLATILE){ + return "VOLATILE"; + } else if (durability_value == DDS_DURABILITY_TRANSIENT_LOCAL){ + return "TRANSIENT_LOCAL"; + } else if (durability_value == DDS_DURABILITY_TRANSIENT){ + return "TRANSIENT"; + } else if (durability_value == DDS_DURABILITY_PERSISTENT){ + return "PERSISTENT"; + } + return "Error stringifying Durability kind."; +} + +const char* to_string_data_representation(dds_data_representation_id_t data_representation_value) +{ + if (data_representation_value == DDS_DATA_REPRESENTATION_XCDR1){ + return "XCDR"; + } else if (data_representation_value == DDS_DATA_REPRESENTATION_XCDR2){ + return "XCDR2"; + } + return "Error stringifying DataRepresentation."; +} + +const char* to_string_verbosity(Verbosity verbosity_value) +{ + switch (verbosity_value) + { + case ERROR: + return "ERROR"; + break; + + case DEBUG: + return "DEBUG"; + break; + + default: + break; + } + return "Error stringifying verbosity."; +} + +const char* to_string_ownership(dds_ownership_kind_t ownership_kind_value) +{ + if (ownership_kind_value == DDS_OWNERSHIP_SHARED){ + return "SHARED"; + } else if (ownership_kind_value == DDS_OWNERSHIP_EXCLUSIVE){ + return "EXCLUSIVE"; + } + return "Error stringifying Ownership kind."; +} + +const char* to_string_history(dds_history_kind_t history_kind_value) +{ + if (history_kind_value == DDS_HISTORY_KEEP_ALL){ + return "KEEP_ALL"; + } else if (history_kind_value == DDS_HISTORY_KEEP_LAST){ + return "KEEP_LAST"; + } + return "Error stringifying History kind."; +} + +const char* to_string_presentation(dds_presentation_access_scope_kind_t presentation_value) +{ + if (presentation_value == DDS_PRESENTATION_INSTANCE){ + return "INSTANCE_PRESENTATION_QOS"; + } else if (presentation_value == DDS_PRESENTATION_TOPIC){ + return "TOPIC_PRESENTATION_QOS"; + } else if (presentation_value == DDS_PRESENTATION_GROUP){ + return "GROUP_PRESENTATION_QOS"; + } + return "error stringifying Access Scope kind"; +} + + +typedef struct { + Verbosity verbosity_; +} Logger; + +Verbosity get_verbosity(Logger* logger) +{ + return logger->verbosity_; +} + +void set_verbosity(Logger* logger, Verbosity v) +{ + logger->verbosity_ = v; + return; +} + +void log_message(Logger* logger, Verbosity level_verbosity, const char* format, ...) +{ + if (level_verbosity <= logger->verbosity_) { + va_list arglist; + va_start(arglist, format); + vprintf(format, arglist); + va_end(arglist); + printf("\n"); + } +} + +typedef struct ShapeOptions { + dds_domainid_t domain_id; + dds_reliability_kind_t reliability_kind; + dds_durability_kind_t durability_kind; + dds_data_representation_id_t data_representation; + int history_depth; + int ownership_strength; + dds_presentation_access_scope_kind_t coherent_set_access_scope; + + char *topic_name; + char *color; + char *partition; + + bool publish; + bool subscribe; + + useconds_t timebasedfilter_interval_us; + useconds_t deadline_interval_us; + useconds_t lifespan_us; + + int da_width; + int da_height; + + int xvel; + int yvel; + int shapesize; + + bool print_writer_samples; + + bool use_read; + + useconds_t write_period_us; + useconds_t read_period_us; + unsigned int num_iterations; + + unsigned int num_instances; + unsigned int num_topics; + + bool unregister; + bool dispose; + + bool coherent_set_access_scope_set; + bool coherent_set_enabled; + bool ordered_access_enabled; + unsigned int coherent_set_sample_count; + + unsigned int additional_payload_size; + + bool take_read_next_instance; + + useconds_t periodic_announcement_period_us; +} ShapeOptions_t; + + +void shape_options_init(ShapeOptions_t* shape_options) +{ + shape_options->domain_id = 0; + shape_options->reliability_kind = DDS_RELIABILITY_RELIABLE; + shape_options->durability_kind = DDS_DURABILITY_VOLATILE; + shape_options->data_representation = DDS_DATA_REPRESENTATION_XCDR1; + shape_options->history_depth = -1; /* means default */ + shape_options->ownership_strength = -1; /* means shared */ + + shape_options->topic_name = NULL; + shape_options->color = NULL; + shape_options->partition = NULL; + + shape_options->publish = false; + shape_options->subscribe = false; + + shape_options->timebasedfilter_interval_us = 0; /* off */ + shape_options->deadline_interval_us = 0; /* off */ + shape_options->lifespan_us = 0; /* off */ + + shape_options->da_width = 240; + shape_options->da_height = 270; + + shape_options->xvel = 3; + shape_options->yvel = 3; + shape_options->shapesize = 20; + + shape_options->print_writer_samples = false; + + shape_options->use_read = false; + + shape_options->write_period_us = 33000; /* 33ms */ + shape_options->read_period_us = 100000; /* 100ms */ + + shape_options->num_iterations = 0; + shape_options->num_instances = 1; + shape_options->num_topics = 1; + + shape_options->unregister = false; + shape_options->dispose = false; + + shape_options->coherent_set_enabled = false; + shape_options->ordered_access_enabled = false; + shape_options->coherent_set_access_scope_set = false; + shape_options->coherent_set_access_scope = DDS_PRESENTATION_INSTANCE; + shape_options->coherent_set_sample_count = 0; + + shape_options->additional_payload_size = 0; + + shape_options->take_read_next_instance = true; + + shape_options->periodic_announcement_period_us = 0; + + return; +} + +void shape_options_free(ShapeOptions_t* shape_options){ + free(shape_options->topic_name); + free(shape_options->color); + free(shape_options->partition); + return; +} + +void print_usage( const char *prog ) +{ + printf("%s: \n", prog); + printf(" --help, -h : print this menu\n"); + printf(" -v [e|d] : set log message verbosity [e: ERROR, d: DEBUG]\n"); + printf(" -P : publish samples\n"); + printf(" -S : subscribe samples\n"); + printf(" -d : domain id (default: 0)\n"); + printf(" -b : BEST_EFFORT reliability\n"); + printf(" -r : RELIABLE reliability\n"); + printf(" -k : keep history depth [0: KEEP_ALL]\n"); + printf(" -f : set a 'deadline' with interval (ms) [0: OFF]\n"); + printf(" -s : set ownership strength [-1: SHARED]\n"); + printf(" -t : set the topic name\n"); + printf(" -c : set color to publish (filter if subscriber)\n"); + printf(" -p : set a 'partition' string\n"); + printf(" -D [v|l|t|p] : set durability [v: VOLATILE, l: TRANSIENT_LOCAL]\n"); + printf(" t: TRANSIENT, p: PERSISTENT]\n"); + printf(" -x [1|2] : set data representation [1: XCDR, 2: XCDR2]\n"); + printf(" -w : print Publisher's samples\n"); + printf(" -z : set shapesize (0: increase the size for every sample)\n"); + printf(" -R : use 'read()' instead of 'take()'\n"); + printf(" --write-period : waiting period between 'write()' operations in ms.\n"); + printf(" Default: 33ms\n"); + printf(" --read-period : waiting period between 'read()' or 'take()' operations\n"); + printf(" in ms. Default: 100ms\n"); + printf(" --time-filter : apply 'time based filter' with interval \n"); + printf(" in ms [0: OFF]\n"); + printf(" --lifespan : indicates the lifespan of a sample in ms\n"); + printf(" --num-iterations : indicates the number of iterations of the main loop\n"); + printf(" After that, the application will exit.\n"); + printf(" Default: infinite\n"); + printf(" --num-instances : indicates the number of iterations of the main loop\n"); + printf(" if the value is > 1, the additional instances are\n"); + printf(" created by appending a number. For example, if the\n"); + printf(" original color is \"BLUE\" the instances used are\n"); + printf(" \"BLUE\", \"BLUE1\", \"BLUE2\"...\n"); + printf(" --num-topics : indicates the number of topics created (using the same\n"); + printf(" type). This also creates a DataReader or DataWriter per\n"); + printf(" topic. If the value is > 1, the additional topic names\n"); + printf(" are created by appending a number: For example, if the\n"); + printf(" original topic name is \"Square\", the topics created are\n"); + printf(" \"Square\", \"Square1\", \"Square2\"...\n"); + printf(" --final-instance-state [u|d]: indicates the action performed after the\n"); + printf(" DataWriter finishes its execution (before\n"); + printf(" deleting it):\n"); + printf(" - u: unregister\n"); + printf(" - d: dispose\n"); + printf(" --access-scope [i|t|g]: sets Presentation.access_scope to INSTANCE, TOPIC\n"); + printf(" or GROUP\n"); + printf(" --coherent : sets Presentation.coherent_access = true\n"); + printf(" --ordered : sets Presentation.ordered_access = true\n"); + printf(" --coherent-sample-count : amount of samples sent for each DataWriter\n"); + printf(" and instance that are grouped in a coherent\n"); + printf(" set\n"); + printf(" --additional-payload-size : indicates the amount of bytes added to\n"); + printf(" the samples written (for example to use\n"); + printf(" large data)\n"); + printf(" --take-read : uses take()/read() instead of take_next_instance()\n"); + printf(" read_next_instance()\n"); + printf(" --periodic-announcement : indicates the periodic participant\n"); + printf(" announcement period in ms. Default 0 (off)\n"); +} + +bool validate(Logger* logger, ShapeOptions_t* shape_options) { + if (shape_options->topic_name == NULL) { + log_message(logger, ERROR, "please specify topic name [-t]"); + return false; + } + if ( (!shape_options->publish) && (!shape_options->subscribe) ) { + log_message(logger, ERROR, "please specify publish [-P] or subscribe [-S]"); + return false; + } + if ( shape_options->publish && shape_options->subscribe ) { + log_message(logger, ERROR, "please specify only one of: publish [-P] or subscribe [-S]"); + return false; + } + if (shape_options->publish && (shape_options->color == NULL) ) { + shape_options->color = strdup("BLUE"); + log_message(logger, ERROR, "warning: color was not specified, defaulting to \"BLUE\""); + } + if (shape_options->publish && (shape_options->timebasedfilter_interval_us > 0)){ + log_message(logger, ERROR, "warning: time base filter [--time-filter] ignored on publisher applications"); + } + if (shape_options->publish && shape_options->use_read) { + log_message(logger, ERROR, "warning: use read [-R] ignored on publisher applications"); + } + if (shape_options->publish && (!shape_options->take_read_next_instance)) { + log_message(logger, ERROR, "warning: --take-read ignored on publisher applications"); + } + if (shape_options->subscribe && (shape_options->shapesize != 20)){ + log_message(logger, ERROR, "warning: shapesize [-z] ignored on subscriber applications"); + } + if (shape_options->subscribe && (shape_options->lifespan_us > 0)) { + log_message(logger, ERROR, "warning: --lifespan ignored on subscriber applications"); + } + if (shape_options->subscribe && (shape_options->num_instances > 1)) { + log_message(logger, ERROR, "warning: --num-instances ignored on subscriber applications"); + } + if (shape_options->subscribe && (shape_options->unregister || shape_options->dispose)) { + log_message(logger, ERROR, "warning: --final-instance-state ignored on subscriber applications"); + } + if (shape_options->subscribe && (shape_options->coherent_set_sample_count > 0)) { + log_message(logger, ERROR, "warning: --coherent-sample-count ignored on subscriber applications"); + } + if (!shape_options->coherent_set_enabled && !shape_options->ordered_access_enabled && shape_options->coherent_set_access_scope_set) { + log_message(logger, ERROR, "warning: --access-scope ignored because not coherent, or ordered access enabled"); + } + + return true; +} + + +bool parse(int argc, char *argv[], Logger* logger, ShapeOptions_t* shape_options) +{ + log_message(logger, DEBUG, "Running parse() function"); + int opt; + bool parse_ok = true; + static struct option long_options[] = { + {"help", no_argument, NULL, 'h'}, + {"write-period", required_argument, NULL, 'W'}, + {"read-period", required_argument, NULL, 'A'}, + {"final-instance-state", required_argument, NULL, 'M'}, + {"access-scope", required_argument, NULL, 'C'}, + {"coherent", required_argument, NULL, 'T'}, + {"ordered", required_argument, NULL, 'O'}, + {"coherent-sample-count", required_argument, NULL, 'H'}, + {"additional-payload-size", required_argument, NULL, 'B'}, + {"num-topics", required_argument, NULL, 'E'}, + {"lifespan", required_argument, NULL, 'l'}, + {"num-instances", required_argument, NULL, 'I'}, + {"num-iterations", required_argument, NULL, 'n'}, + {"take-read", required_argument, NULL, 'K'}, + {"time-filter", required_argument, NULL, 'i'}, + {"periodic-announcement", required_argument, NULL, 'N'}, + {NULL, 0, NULL, 0 } + }; + + // this variable will be used to check input values + // because a lot of things are stored as unsigned ints + // and cannot be checked for negative values + int input_int = 0; + + while ((opt = getopt_long(argc, argv, "hPSbrRwc:d:D:f:k:p:s:x:t:v:z:", + long_options, NULL)) != -1) { + switch (opt) { + case 'v': + if (optarg[0] != '\0') { + switch (optarg[0]) { + case 'd': + set_verbosity(logger, DEBUG); + break; + case 'e': + set_verbosity(logger, ERROR); + break; + default: + log_message(logger, ERROR, "unrecognized value for verbosity %s", &optarg[0]); + parse_ok = false; + } + } + break; + case 'w': + shape_options->print_writer_samples = true; + break; + case 'b': + shape_options->reliability_kind = DDS_RELIABILITY_BEST_EFFORT; + break; + case 'R': + shape_options->use_read = true; + break; + case 'c': + shape_options->color = strdup(optarg); + break; + case 'd': { + int converted_param = sscanf(optarg, "%d", &input_int); + if (converted_param == 0) { + log_message(logger, ERROR, "unrecognized value for domain_id %s", &optarg[0]); + parse_ok = false; + } else if (input_int < 0) { + log_message(logger, ERROR, "incorrect value for domain_id (less than zero)"); + parse_ok = false; + } + shape_options->domain_id = (unsigned int)input_int; + break; + } + case 'D': + if (optarg[0] != '\0') { + switch (optarg[0]) { + case 'v': + shape_options->durability_kind = DDS_DURABILITY_VOLATILE; + break; + case 'l': + shape_options->durability_kind = DDS_DURABILITY_TRANSIENT_LOCAL; + break; + case 't': + shape_options->durability_kind = DDS_DURABILITY_TRANSIENT; + break; + case 'p': + shape_options->durability_kind = DDS_DURABILITY_PERSISTENT; + break; + default: + log_message(logger, ERROR, "unrecognized value for durability %s", &optarg[0]); + parse_ok = false; + } + } + break; + case 'i': { + int64_t time_input = 0; + int converted_param = sscanf(optarg, "%lld", &time_input); + if (converted_param == 0) { + log_message(logger, ERROR, "unrecognized value for timebasedfilter_interval %s", &optarg[0]); + parse_ok = false; + } else if (time_input < 0) { + log_message(logger, ERROR, "incorrect value for timebasedfilter_interval (less than zero)"); + parse_ok = false; + } + shape_options->timebasedfilter_interval_us = time_input * 1000ll; + break; + } + case 'f': { + int64_t time_input = 0; + int converted_param = sscanf(optarg, "%lld", &time_input); + if (converted_param == 0) { + log_message(logger, ERROR, "unrecognized value for deadline_interval %s", &optarg[0]); + parse_ok = false; + } else if (time_input < 0) { + log_message(logger, ERROR, "incorrect value for deadline_interval (less than zero)"); + parse_ok = false; + } + shape_options->deadline_interval_us = time_input * 1000ll; + break; + } + case 'k': { + int converted_param = sscanf(optarg, "%d", &input_int); + if (converted_param == 0){ + log_message(logger, ERROR, "unrecognized value for history_depth %s", &optarg[0]); + parse_ok = false; + } else if (input_int < 0) { + log_message(logger, ERROR, "incorrect value for history_depth (less than zero)"); + parse_ok = false; + } + shape_options->history_depth = (unsigned int)input_int; + break; + } + case 'p': + shape_options->partition = strdup(optarg); + break; + case 'r': + shape_options->reliability_kind = DDS_RELIABILITY_RELIABLE; + break; + case 's': { + int converted_param = sscanf(optarg, "%d", &input_int); + if (converted_param == 0){ + log_message(logger, ERROR, "unrecognized value for ownership_strength %s", &optarg[0]); + parse_ok = false; + } else if (input_int < -1) { + log_message(logger, ERROR, "incorrect value for ownership_strength (less than -1)"); + parse_ok = false; + } + shape_options->ownership_strength = (unsigned int)input_int; + break; + } + case 't': + shape_options->topic_name = strdup(optarg); + break; + case 'P': + shape_options->publish = true; + break; + case 'S': + shape_options->subscribe = true; + break; + case 'h': + print_usage(argv[0]); + exit(0); + break; + case 'x': + if (optarg[0] != '\0') { + switch (optarg[0]) { + case '1': + shape_options->data_representation = DDS_DATA_REPRESENTATION_XCDR1; + break; + case '2': + shape_options->data_representation = DDS_DATA_REPRESENTATION_XCDR2; + break; + default: + log_message(logger, ERROR, "unrecognized value for data representation %s", &optarg[0]); + parse_ok = false; + } + } + break; + case 'z': { + int converted_param = sscanf(optarg, "%d", &input_int); + if (converted_param == 0) { + log_message(logger, ERROR, "unrecognized value for shapesize %s", &optarg[0]); + parse_ok = false; + } else if (input_int < 0) { + log_message(logger, ERROR, "incorrect value for shapesize (less than zero)"); + parse_ok = false; + } + shape_options->shapesize = (unsigned int)input_int; + break; + } + case 'W': { + dds_duration_t converted_param = 0; + if (sscanf(optarg, "%lld", &converted_param) == 0) { + log_message(logger, ERROR, "unrecognized value for write-period %s", &optarg[0]); + parse_ok = false; + } else if (converted_param < 0) { + log_message(logger, ERROR, "incorrect value for write-period (less than zero)"); + parse_ok = false; + } + shape_options->write_period_us = converted_param * 1000ll; + break; + } + case 'A': { + dds_duration_t converted_param = 0; + if (sscanf(optarg, "%lld", &converted_param) == 0) { + log_message(logger, ERROR, "unrecognized value for read-period %s", &optarg[0]); + parse_ok = false; + } else if (converted_param < 0) { + log_message(logger, ERROR, "incorrect value for read-period (less than zero)"); + parse_ok = false; + } + shape_options->read_period_us = converted_param * 1000ll; + break; + } + case 'n': { + if (sscanf(optarg, "%u", &shape_options->num_iterations) == 0) { + log_message(logger, ERROR, "unrecognized value for num-iterations %s", &optarg[0]); + parse_ok = false; + } else if (shape_options->num_iterations < 1) { + log_message(logger, ERROR, "incorrect value for num-iterations, it must be >= 1"); + parse_ok = false; + } + break; + } + case 'l': { + dds_duration_t converted_param = 0; + if (sscanf(optarg, "%lld", &converted_param) == 0) { + log_message(logger, ERROR, "unrecognized value for lifespan %s", &optarg[0]); + parse_ok = false; + } else if (converted_param < 0) { + log_message(logger, ERROR, "incorrect value for lifespan (less than zero)"); + parse_ok = false; + } + shape_options->lifespan_us = converted_param * 1000ll; + break; + } + case 'M': { + if (optarg[0] != '\0') { + switch (optarg[0]) + { + case 'u': + shape_options->unregister = true; + break; + case 'd': + shape_options->dispose = true; + default: + log_message(logger, ERROR, "unrecognized value for final-instance-state %s", &optarg[0]); + parse_ok = false; + } + if (shape_options->unregister && shape_options->dispose){ + log_message(logger, ERROR, "error, cannot confiture unregister and dispose at the same time"); + parse_ok = false; + } + } + break; + } + case 'C': { + shape_options->coherent_set_access_scope_set = true; + if (optarg[0] != '\0') { + switch (optarg[0]) + { + case 'i': + shape_options->coherent_set_access_scope = DDS_PRESENTATION_INSTANCE; + break; + case 't': + shape_options->coherent_set_access_scope = DDS_PRESENTATION_TOPIC; + break; + case 'g': + shape_options->coherent_set_access_scope = DDS_PRESENTATION_GROUP; + break; + default: + log_message(logger, ERROR, "unrecognized value for cogerent-sets %s", &optarg[0]); + parse_ok = false; + shape_options->coherent_set_access_scope_set = false; + } + } + break; + } + case 'T': { + shape_options->coherent_set_enabled = true; + break; + } + case 'O': { + shape_options->ordered_access_enabled = true; + break; + } + case 'I': { + if (sscanf(optarg, "%u", &shape_options->num_instances) == 0) { + log_message(logger, ERROR, "unrecognized value for num-instances %s", &optarg[0]); + parse_ok = false; + } else if (shape_options->num_instances < 1) { + log_message(logger, ERROR, "incorrect value for num-instances, it must be >= 1"); + parse_ok = false; + } + break; + } + case 'E': { + if (sscanf(optarg, "%u", &shape_options->num_topics) == 0) { + log_message(logger, ERROR, "unrecognized value for num-topics %s", &optarg[0]); + parse_ok = false; + } else if (shape_options->num_topics < 1) { + log_message(logger, ERROR, "incorrect value for num-topics, it must be >= 1"); + parse_ok = false; + } + break; + } + case 'B': { + if (sscanf(optarg, "%u", &shape_options->additional_payload_size) == 0) { + log_message(logger, ERROR, "unrecognized value for additional-payload-size %s", &optarg[0]); + parse_ok = false; + } else if (shape_options->additional_payload_size < 1) { + log_message(logger, ERROR, "incorrect value for additional-payload-size, it must be >= 1"); + parse_ok = false; + } + break; + } + case 'H': { + if (sscanf(optarg, "%u", &shape_options->coherent_set_sample_count) == 0) { + log_message(logger, ERROR, "unrecognized value for coherent-sample-count %s", &optarg[0]); + parse_ok = false; + } else if (shape_options->coherent_set_sample_count < 2) { + log_message(logger, ERROR, "incorrecct value for coherent-sample-ount, it must be >= 2"); + parse_ok = false; + } + break; + } + case 'K': { + shape_options->take_read_next_instance = false; + break; + } + case 'N': { + dds_duration_t converted_param = 0; + if (sscanf(optarg, "%lld", &converted_param) == 0){ + log_message(logger, ERROR, "unrecognized value for periodic-announcement %s", &optarg[0]); + parse_ok = false; + } else if (converted_param < 0) { + log_message(logger, ERROR, "incorrect value for periodic-announcement, it must be >= 0"); + parse_ok = false; + } + shape_options->periodic_announcement_period_us = converted_param * 1000ll; + break; + } + case '?': + parse_ok = false; + break; + } + } + + if ( parse_ok ) { + parse_ok = validate(logger, shape_options); + } + if ( !parse_ok ) { + print_usage(argv[0]); + exit(1); + } else if (DEBUG <= logger->verbosity_){ + printf("Shape Options: \n"); + printf(" Verbosity = %d\n", get_verbosity(logger)); + printf(" This application %s a publisher\n", shape_options->publish ? "is" : "is not"); + printf(" This application %s a subscriber\n", shape_options->subscribe ? "is" : "is not"); + printf(" DomainId = %d\n", shape_options->domain_id); + printf(" ReliabilityKind = %d\n", shape_options->reliability_kind); + printf(" DurabilityKind = %d\n", shape_options->durability_kind); + printf(" DataRepresentation = %d\n", shape_options->data_representation); + printf(" HistoryDepth = %d\n", shape_options->history_depth); + printf(" OwnershipStrength = %d\n",shape_options->ownership_strength); + printf(" TimeBasedFilterInterval = %u ms\n",shape_options->timebasedfilter_interval_us / 1000ll); + printf(" DeadlineInterval = %u ms\n", shape_options->deadline_interval_us / 1000ll); + printf(" Shapesize = %d\n", shape_options->shapesize); + printf(" Reading method = %s\n", (shape_options->use_read ? "read_next_instance" : "take_next_instance")); + printf(" Write period = %u ms\n", shape_options->write_period_us / 1000ll); + printf(" Read period = %u ms\n", shape_options->read_period_us / 1000ll); + printf(" Lifespan = %u ms\n", shape_options->lifespan_us / 1000ll); + printf(" Number of iterations = %u\n", shape_options->num_iterations); + printf(" Number of instances = %u\n", shape_options->num_instances); + printf(" Number of entities = %u\n", shape_options->num_topics); + printf(" Coherent sets = %s\n", shape_options->coherent_set_enabled ? "true" : "false"); + printf(" Ordered access = %s\n", shape_options->ordered_access_enabled ? "true" : "false"); + printf(" Access Scope = %s\n", to_string_presentation(shape_options->coherent_set_access_scope)); + printf(" Coherent Sample Count = %u\n", shape_options->coherent_set_sample_count); + printf(" Additional Payload Size = %u\n", shape_options->additional_payload_size); + printf(" Final Instance State = %s\n", + (shape_options->unregister ? "Unregister" : (shape_options->dispose ? "Dispose" : "not specified"))); + printf(" Periodic Announcement Period = %u ms\n", shape_options->periodic_announcement_period_us / 1000ll); + if (shape_options->topic_name != NULL){ + printf(" Topic = %s\n", shape_options->topic_name); + } + if (shape_options->color != NULL) { + printf(" Color = %s\n", shape_options->color); + } + if (shape_options->partition != NULL) { + printf(" Partition = %s\n", shape_options->partition); + } + } + return parse_ok; +} + +typedef struct ShapeApp { + Logger* logger; + + dds_listener_t* dp_listner; + dds_entity_t dp; + dds_entity_t* topics; + dds_entity_t publisher; + dds_entity_t subscriber; + + dds_entity_t* writers; + dds_entity_t* readers; + + char* color; + + int xval; + int yval; + int da_width; + int da_hight; +} ShapeApp_t; + +void on_inconsistent_topic(dds_entity_t topic, const dds_inconsistent_topic_status_t status, void* args) { + Logger* logger = args; + + char topic_name[default_buffer_size]; + char type_name[default_buffer_size]; + + if (dds_get_name(topic, topic_name, default_buffer_size) < 0) { + log_message(logger, ERROR, "Failed to get topic name for topic %d in %s", topic, __FUNCTION__); + + exit(-1); + } + + if (dds_get_type_name(topic, type_name, default_buffer_size) < 0){ + log_message(logger, ERROR, "Failed to get type name for topic: %s in %s", topic_name, __FUNCTION__); + + exit(-1); + } + + printf("%s() topic: '%s' type: '%s'\n", __FUNCTION__, topic_name, type_name); +} + +void on_offered_incompatible_qos(dds_entity_t writer, const dds_offered_incompatible_qos_status_t status, void * args) { + dds_entity_t topic = dds_get_topic(writer); + Logger* logger = args; + + char topic_name[default_buffer_size]; + char type_name[default_buffer_size]; + const char* policy_name = get_qos_policy_name(status.last_policy_id); + + if (policy_name == NULL) { + log_message(logger, ERROR, "Failed to get qos Policy name for policy id: %u in %s", status.last_policy_id, __FUNCTION__); + exit(-1); + } + + if (dds_get_name(topic, topic_name, default_buffer_size) < 0) { + log_message(logger, ERROR, "Failed to get topic name for topic %d in %s",topic,__FUNCTION__); + + exit(-1); + } + + if (dds_get_type_name(topic, type_name, default_buffer_size) < 0){ + log_message(logger, ERROR, "Failed to get type name for topic: %s in %s",topic_name,__FUNCTION__); + + exit(-1); + } + + + printf("%s() topic: '%s' type: '%s' : %d (%s)\n", __FUNCTION__, + topic_name, type_name, + status.last_policy_id, + policy_name ); +} + +void on_publication_matched (dds_entity_t writer, const dds_publication_matched_status_t status, void* args) { + dds_entity_t topic = dds_get_topic(writer); + Logger* logger = args; + + char topic_name[default_buffer_size]; + char type_name[default_buffer_size]; + + if (dds_get_name(topic, topic_name, default_buffer_size) < 0) { + log_message(logger, ERROR, "Failed to get topic name for topic %d in %s",topic,__FUNCTION__); + + exit(-1); + } + + if (dds_get_type_name(topic, type_name, default_buffer_size) < 0){ + log_message(logger, ERROR, "Failed to get type name for topic: %s in %s",topic_name,__FUNCTION__); + + exit(-1); + } + + printf("%s() topic: '%s' type: '%s' : matched readers %d (change = %d)\n", __FUNCTION__, + topic_name, type_name, status.current_count, status.current_count_change); +} + +void on_offered_deadline_missed (dds_entity_t writer, const dds_offered_deadline_missed_status_t status, void* args) { + dds_entity_t topic = dds_get_topic(writer); + Logger* logger = args; + + char topic_name[default_buffer_size]; + char type_name[default_buffer_size]; + + if (dds_get_name(topic, topic_name, default_buffer_size) < 0) { + log_message(logger, ERROR, "Failed to get topic name for topic %d in %s",topic,__FUNCTION__); + + exit(-1); + } + + if (dds_get_type_name(topic, type_name, default_buffer_size) < 0){ + log_message(logger, ERROR, "Failed to get type name for topic: %s in %s",topic_name,__FUNCTION__); + + exit(-1); + } + + printf("%s() topic: '%s' type: '%s' : (total = %d, change = %d)\n", __FUNCTION__, + topic_name, type_name, status.total_count, status.total_count_change); +} + +void on_liveliness_lost (dds_entity_t writer, const dds_liveliness_lost_status_t status, void * args) { + dds_entity_t topic = dds_get_topic(writer); + Logger* logger = args; + + char topic_name[default_buffer_size]; + char type_name[default_buffer_size]; + + if (dds_get_name(topic, topic_name, default_buffer_size) < 0) { + log_message(logger, ERROR, "Failed to get topic name for topic %d in %s",topic,__FUNCTION__); + + exit(-1); + } + + if (dds_get_type_name(topic, type_name, default_buffer_size) < 0){ + log_message(logger, ERROR, "Failed to get type name for topic: %s in %s",topic_name,__FUNCTION__); + + exit(-1); + } + + printf("%s() topic: '%s' type: '%s' : (total = %d, change = %d)\n", __FUNCTION__, + topic_name, type_name, status.total_count, status.total_count_change); +} + +void on_requested_incompatible_qos (dds_entity_t reader, const dds_requested_incompatible_qos_status_t status, void * args) { + dds_entity_t topic = dds_get_topic(reader); + Logger* logger = args; + + char topic_name[default_buffer_size]; + char type_name[default_buffer_size]; + + if (dds_get_name(topic, topic_name, default_buffer_size) < 0) { + log_message(logger, ERROR,"Failed to get topic name for topic %d in %s",topic,__FUNCTION__); + + exit(-1); + } + + if (dds_get_type_name(topic, type_name, default_buffer_size) < 0){ + log_message(logger, ERROR, "Failed to get type name for topic: %s in %s",topic_name,__FUNCTION__); + + exit(-1); + } + + const char *policy_name = get_qos_policy_name(status.last_policy_id); + + if (policy_name == NULL) { + log_message(logger, ERROR, "Failed to get qos Policy name for policy id: % in %s", status.last_policy_id, __FUNCTION__); + + exit(-1); + } + + printf("%s() topic: '%s' type: '%s' : %d (%s)\n", __FUNCTION__, + topic_name, type_name, status.last_policy_id, + policy_name); +} + +void on_subscription_matched (dds_entity_t reader, const dds_subscription_matched_status_t status, void* args) { + dds_entity_t topic = dds_get_topic(reader); + Logger* logger = args; + + char topic_name[default_buffer_size]; + char type_name[default_buffer_size]; + + if (dds_get_name(topic, topic_name, default_buffer_size) < 0) { + log_message(logger, ERROR,"Failed to get topic name for topic %d in %s",topic,__FUNCTION__); + + exit(-1); + } + + if (dds_get_type_name(topic, type_name, default_buffer_size) < 0){ + log_message(logger, ERROR, "Failed to get type name for topic: %s in %s",topic_name,__FUNCTION__); + + exit(-1); + } + + printf("%s() topic: '%s' type: '%s' : matched writers %d (change = %d)\n", __FUNCTION__, + topic_name, type_name, status.current_count, status.current_count_change); +} + +void on_requested_deadline_missed (dds_entity_t reader, const dds_requested_deadline_missed_status_t status, void* args) { + dds_entity_t topic = dds_get_topic(reader); + Logger* logger = args; + + char topic_name[default_buffer_size]; + char type_name[default_buffer_size]; + + if (dds_get_name(topic, topic_name, default_buffer_size) < 0) { + log_message(logger, ERROR,"Failed to get topic name for topic %d in %s",topic,__FUNCTION__); + + exit(-1); + } + + if (dds_get_type_name(topic, type_name, default_buffer_size) < 0){ + log_message(logger, ERROR, "Failed to get type name for topic: %s in %s",topic_name,__FUNCTION__); + + exit(-1); + } + + printf("%s() topic: '%s' type: '%s' : (total = %d, change = %d)\n", __FUNCTION__, + topic_name, type_name, status.total_count, status.total_count_change); +} + +void on_liveliness_changed (dds_entity_t reader, const dds_liveliness_changed_status_t status, void* args) { + dds_entity_t topic = dds_get_topic(reader); + Logger* logger = args; + + char topic_name[default_buffer_size]; + char type_name[default_buffer_size]; + + if (dds_get_name(topic, topic_name, default_buffer_size) < 0) { + log_message(logger, ERROR,"Failed to get topic name for topic %d in %s",topic,__FUNCTION__); + + exit(-1); + } + + if (dds_get_type_name(topic, type_name, default_buffer_size) < 0){ + log_message(logger, ERROR, "Failed to get type name for topic: %s in %s",topic_name,__FUNCTION__); + + exit(-1); + } + + printf("%s() topic: '%s' type: '%s' : (alive = %d, not_alive = %d)\n", __FUNCTION__, + topic_name, type_name, status.alive_count, status.not_alive_count); +} + +void on_sample_rejected (dds_entity_t e, const dds_sample_rejected_status_t status, void* data) {} +void on_data_available (dds_entity_t e, void* data) {} +void on_sample_lost (dds_entity_t e, const dds_sample_lost_status_t status, void* data) {} +void on_data_on_readers (dds_entity_t e, void* data) {} + +void set_reliability(dds_qos_t* qos, dds_reliability_kind_t reliability_kind, Logger* logger) { + dds_time_t duration; + + dds_qset_reliability(qos, reliability_kind, DDS_INFINITY); + if (!dds_qget_reliability(qos, &reliability_kind, &duration)) { + log_message(logger, ERROR, "Failed to get reliability kind"); + } + + log_message(logger, DEBUG, " Reliability = %s", to_string_reliability(reliability_kind)); +} + +void set_durability(dds_qos_t* qos, dds_durability_kind_t durability_kind, Logger* logger){ + dds_qset_durability(qos, durability_kind); + dds_qget_durability(qos, &durability_kind); + + log_message(logger, DEBUG," Durability = %s", to_string_durability(durability_kind)); +} + +void set_data_representation(dds_qos_t* qos, dds_data_representation_id_t data_representation, Logger* logger){ + uint32_t count = 1; + dds_data_representation_id_t* data_representation_list; + dds_qset_data_representation(qos, count, &data_representation); + if (!dds_qget_data_representation(qos,&count, &data_representation_list)) { + printf("Failed to get data representation"); + } + + log_message(logger, DEBUG, " Data_Representation = %s", to_string_data_representation(*data_representation_list)); +} + +void set_ownership(dds_qos_t* qos, int ownership_strength, Logger* logger) { + if (ownership_strength != -1) { + dds_qset_ownership(qos, DDS_OWNERSHIP_EXCLUSIVE); + dds_qset_ownership_strength(qos, ownership_strength); + } else { + dds_qset_ownership(qos, DDS_OWNERSHIP_SHARED); + } + + dds_ownership_kind_t ownership_kind; + dds_qget_ownership(qos, &ownership_kind); + + log_message(logger,DEBUG," Ownership = %s", to_string_ownership(ownership_kind)); + + if(ownership_kind == DDS_OWNERSHIP_EXCLUSIVE) { + dds_qget_ownership_strength(qos, &ownership_strength); + + log_message(logger, DEBUG, " OwnershptStrength = %d", ownership_strength); + } +} + +void set_deadline_interval(dds_qos_t* qos, dds_time_t deadline_interval, Logger* logger) { + dds_time_t duration; + + if (deadline_interval > 0) { + dds_qset_deadline(qos, deadline_interval); + } + + dds_qget_deadline(qos, &duration); + + log_message(logger, DEBUG," DeadlinePeriod = %lldns", duration); +} + +void set_history_depth(dds_qos_t* qos, int history_depth, Logger* logger){ + if (history_depth < 0) { + dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, 1); + } else if (history_depth > 0) { + dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, history_depth); + } else { + dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, 0); + } + + dds_history_kind_t history; + dds_qget_history(qos,&history,&history_depth); + + log_message(logger, DEBUG, " History = %s", to_string_history(history)); + + if (history == DDS_HISTORY_KEEP_LAST) { + log_message(logger, DEBUG," HistoryDepth = %d", history_depth); + } +} + +void set_time_based_filter(dds_qos_t* qos, dds_duration_t timebasedfilter_interval, Logger* logger) { + dds_duration_t duration; + + if(timebasedfilter_interval >= 0 ) { + dds_qset_time_based_filter(qos, timebasedfilter_interval); + } + + dds_qget_time_based_filter(qos, &duration ); + log_message(logger,DEBUG, " TimeBasedFilter = %lldns", duration); +} + +void set_presentation(dds_qos_t* qos, dds_presentation_access_scope_kind_t coherent_set_access_scope, bool coherent_set_enabled, bool ordered_access_enabled, Logger* logger) { + dds_qset_presentation(qos, coherent_set_access_scope, coherent_set_enabled, ordered_access_enabled); + + dds_presentation_access_scope_kind_t access_scope; + bool coherent_access; + bool ordered_access; + dds_qget_presentation(qos, &access_scope, &coherent_access, &ordered_access); + log_message(logger, DEBUG, " Presentation Coherent Access = %s", coherent_access ? "true" : "false"); + log_message(logger, DEBUG, " Presentation Ordered Access = %s", ordered_access ? "true" : "false"); + log_message(logger, DEBUG, " Presentation Access Scope = %s", to_string_presentation(access_scope)); +} + +void set_lifespan(dds_qos_t* qos, dds_duration_t lifespan, Logger* logger) { + if (lifespan > 0) { + dds_qset_lifespan(qos, lifespan); + } + dds_duration_t lfspn; + dds_qget_lifespan(qos, &lfspn); + + log_message(logger, DEBUG, " Lifespan = %lldns", lfspn); +} + +void set_writer_data_lifecycle(dds_qos_t* qos, bool autodispose, Logger* logger){ + dds_qset_writer_data_lifecycle(qos, !autodispose); + + bool atdsp; + dds_qget_writer_data_lifecycle(qos, &atdsp); + log_message(logger, DEBUG, " Autodispose_unregistered_instances = %s", atdsp ? "true" : "false"); +} + +bool init_publisher(const ShapeOptions_t* opts, ShapeApp_t* app) { + log_message(app->logger, DEBUG, "Running init_publisher() function"); + + dds_qos_t* dw_qos = dds_create_qos(); + dds_qos_t* pub_qos = dds_create_qos(); + + if (opts->partition != NULL) { + dds_qset_partition(pub_qos, 1, &opts->partition); + } + + log_message(app->logger, DEBUG, "Publisher QoS:"); + + set_presentation(pub_qos, opts->coherent_set_access_scope, opts->coherent_set_enabled, opts->ordered_access_enabled, app->logger); + + app->publisher = dds_create_publisher(app->dp, pub_qos, NULL); + + if (app->publisher < 0) { + log_message(app->logger, ERROR, "Failed to create publisher"); + + return false; + } + + log_message(app->logger, DEBUG, "Publisher created"); + log_message(app->logger, DEBUG, "Data Writer QoS:"); + + set_reliability(dw_qos, opts->reliability_kind, app->logger); + set_durability(dw_qos, opts->durability_kind, app->logger); + set_data_representation(dw_qos,opts->data_representation, app->logger); + set_ownership(dw_qos, opts->ownership_strength,app->logger); + set_deadline_interval(dw_qos, opts->deadline_interval_us * 1000ll, app->logger); + set_history_depth(dw_qos, opts->history_depth, app->logger); + set_lifespan(dw_qos, opts->lifespan_us * 1000ll, app->logger); + set_writer_data_lifecycle(dw_qos, opts->unregister, app->logger); + + for (unsigned int i = 0; i < opts->num_topics; ++i) { + char temp; + dds_return_t name_len = dds_get_name(app->topics[i], &temp, 1); + if (name_len < 0) { + log_message(app->logger, ERROR, "Failed to get name of topic"); + return false; + } + char* name = calloc(name_len + 1, sizeof(char)); + dds_get_name(app->topics[i], name, name_len + 1); + printf("Create writer for topic: %s color: %s\n", name, opts->color); + app->writers[i] = dds_create_writer(app->publisher, app->topics[i], dw_qos, NULL); + if (app->writers[i] < 0) { + log_message(app->logger, ERROR, "failed to create datawriter[%u] topic: %s", i, name); + free(name); + return false; + } + free(name); + } + + log_message(app->logger, DEBUG, "DataWriters created:"); + for (unsigned int i = 0; i < opts->num_topics; ++i) { + log_message(app->logger, DEBUG, " dws[%u]=%d", i, app->writers[i]); + } + + app->color = strdup(opts->color); + app->xval = opts->xvel; + app->yval = opts->yvel; + app->da_width = opts->da_width; + app->da_hight = opts->da_height; + + return true; +} + +bool color_filter(const void* sample, void* arg) { + const char* color = arg; + ShapeType const * const shape = sample; + + return strcmp(color,shape->color) == 0; +} + +bool init_subscriber(const ShapeOptions_t* opts, ShapeApp_t* app) { + + log_message(app->logger, DEBUG,"Running init_subscriber() function"); + + dds_qos_t* sub_qos = dds_create_qos(); + if (sub_qos == NULL) log_message(app->logger,ERROR,"Failed to create sub_qos"); + + log_message(app->logger, DEBUG, "Subscriber QoS:"); + + set_presentation(sub_qos, opts->coherent_set_access_scope, opts->coherent_set_enabled, opts->ordered_access_enabled, app->logger); + + dds_qos_t* dr_qos = dds_create_qos(); + if(sub_qos == NULL) log_message(app->logger, ERROR, "failed to create data reader qos."); + + if (opts->partition != NULL) { + dds_qset_partition(sub_qos, 1, &opts->partition); + } + + app->subscriber = dds_create_subscriber(app->dp, sub_qos, NULL); + if (app->subscriber < 0 ) { + log_message(app->logger, ERROR, "Failed to create subscriber"); + return false; + } + + log_message(app->logger, DEBUG,"subscriber created"); + log_message(app->logger, DEBUG, "Data Reader QoS: "); + + set_reliability(dr_qos, opts->reliability_kind, app->logger); + set_durability(dr_qos, opts->durability_kind, app->logger); + set_data_representation(dr_qos, opts->data_representation, app->logger); + set_ownership(dr_qos, opts->ownership_strength, app->logger); + set_time_based_filter(dr_qos, opts->timebasedfilter_interval_us * 1000ll, app->logger); + set_deadline_interval(dr_qos, opts->deadline_interval_us * 1000ll, app->logger); + set_history_depth(dr_qos, opts->history_depth, app->logger); + + if (opts->color != NULL) { + app->color = opts->color; + + for (unsigned int i = 0; i < opts->num_topics; ++i) { + if (dds_set_topic_filter_and_arg(app->topics[i],color_filter, app->color) >= 0 ) { + log_message(app->logger, DEBUG, " ContentFilterTopic = \"color = %s\"", opts->color); + } else { + log_message(app->logger, ERROR, "failed to create content filtered topic"); + return false; + } + char temp; + size_t name_len = dds_get_name(app->topics[i], &temp, 1); + char* name = malloc(sizeof(char) * (name_len + 1)); + dds_get_name(app->topics[i], name, name_len + 1); + printf("Create reader for topic: %s%s color: %s\n", name, "_filtered", opts->color); + app->readers[i] = dds_create_reader(app->subscriber, app->topics[i], dr_qos, NULL); + if (app->readers[i] == 0) { + log_message(app->logger, ERROR, "Failed to create datareader[%u] topic: %s", i, name); + free(name); + return false; + } + free(name); + } + } else { + for (unsigned int i = 0; i < opts->num_topics; ++i) { + char temp; + size_t name_len = dds_get_name(app->topics[i], &temp, 1); + char* name = malloc(sizeof(char) * (name_len + 1)); + dds_get_name(app->topics[i], name, name_len + 1); + printf("Create reader for topic: %s\n", name); + app->readers[i] = dds_create_reader(app->subscriber, app->topics[i], dr_qos, NULL); + if (app->readers[i] == 0) { + log_message(app->logger, ERROR, "failed to create datareader[%u] topic: %s", i, name); + free(name); + return false; + } + free(name); + } + } + log_message(app->logger, DEBUG,"DataReaders created:"); + for (unsigned int i = 0; i < opts->num_topics; ++i) { + log_message(app->logger, DEBUG, " drs[%u]=%ld", i, app->readers[i]); + } + + return true; +} + + +bool shape_init (ShapeApp_t* app, const ShapeOptions_t* opts, Logger* logger) { + + app->logger = logger; + app->publisher = 0; + app->subscriber = 0; + app->dp = 0; + app->dp_listner = dds_create_listener(logger); + + dds_lset_inconsistent_topic(app->dp_listner, on_inconsistent_topic); + dds_lset_offered_incompatible_qos(app->dp_listner, on_offered_incompatible_qos); + dds_lset_publication_matched(app->dp_listner, on_publication_matched); + dds_lset_offered_deadline_missed(app->dp_listner, on_offered_deadline_missed); + dds_lset_liveliness_lost(app->dp_listner, on_liveliness_lost); + dds_lset_requested_incompatible_qos(app->dp_listner, on_requested_incompatible_qos); + dds_lset_subscription_matched(app->dp_listner, on_subscription_matched); + dds_lset_requested_deadline_missed(app->dp_listner, on_requested_deadline_missed); + dds_lset_liveliness_changed(app->dp_listner, on_liveliness_changed); + dds_lset_sample_rejected(app->dp_listner, on_sample_rejected); + dds_lset_data_available(app->dp_listner, on_data_available); + dds_lset_sample_lost(app->dp_listner,on_sample_lost); + dds_lset_data_on_readers(app->dp_listner, on_data_on_readers); + + dds_qos_t* dp_qos = dds_create_qos(); + + app->dp = dds_create_participant(opts->domain_id, dp_qos, app->dp_listner); + + if (app->dp < 0 ) { + log_message(logger, ERROR, "failed to create participant (missing license?)." ); + + return false; + } + + dds_qos_t* topic_qos = dds_create_qos(); + + set_history_depth(topic_qos,opts->history_depth, app->logger); + dds_qset_resource_limits (topic_qos, MAX_SAMPLES, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED); + + app->topics = (dds_entity_t*) malloc(sizeof(dds_entity_t) * opts->num_topics); + if (app->topics == NULL) { + log_message(logger, ERROR, "Error allocating memory for topics"); + return false; + } + for (unsigned int i = 0; i < opts->num_topics; ++i) { + app->topics[i] = 0; + } + + app->readers = (dds_entity_t*) malloc(sizeof(dds_entity_t) * opts->num_topics); + if (app->readers == NULL) { + log_message(logger, ERROR, "Error allocating memory for DataReaders"); + return false; + } + for (unsigned int i = 0; i < opts->num_topics; ++i) { + app->readers[i] = 0; + } + + app->writers = (dds_entity_t*) malloc(sizeof(dds_entity_t) * opts->num_topics); + if (app->writers == NULL) { + log_message(logger, ERROR, "Error allocating memory for DataWriters"); + return false; + } + for (unsigned int i = 0; i < opts->num_topics; ++i) { + app->writers[i] = 0; + } + + for (unsigned int i = 0; i < opts->num_topics; ++i) { + char* topic_name = calloc(((i > 0) ? (int)log10f(i) : 0) + strlen(opts->topic_name) + 2, sizeof(char)); + if (topic_name == NULL) { + log_message(logger, ERROR, "Error allocating memory for Topic names"); + return false; + } + if (i > 0) { + sprintf(topic_name, "%s%u", opts->topic_name, i); + } else { + sprintf(topic_name, "%s", opts->topic_name); + } + printf("Create topic: %s\n", topic_name); + app->topics[i] = dds_create_topic(app->dp, &ShapeType_desc, topic_name, topic_qos, NULL); + if (app->topics[i] < 0) { + log_message(logger, ERROR, "failed to create topic <%s>", topic_name); + free(topic_name); + return false; + } + free(topic_name); + } + log_message(logger, DEBUG, "Topics created:"); + for (unsigned int i = 0; i < opts->num_topics; ++i) { + log_message(logger, DEBUG, " topic[%d]=%p", i, (void*)app->topics[i]); + } + + if (opts->publish) { + return init_publisher(opts, app); + } else { + return init_subscriber(opts,app); + } +} + +bool run_subscriber(ShapeApp_t app, ShapeOptions_t opts) { + // This is the number of iterations performed + unsigned int n = 0; + + log_message(app.logger, DEBUG, "Running run_subscriber() function"); + bool printed_message = false; + + while(!all_done) { + dds_return_t retval; + dds_instance_handle_t previous_handle; + dds_sample_info_t sample_infos[MAX_SAMPLES]; + void* samples[MAX_SAMPLES]; + + for( size_t i = 0; i < MAX_SAMPLES; i ++) samples[i] = ShapeType__alloc(); + if(opts.coherent_set_enabled) { + printf("Reading coherent sets, iteration %u\n", n); + } + if(opts.ordered_access_enabled) { + printf("Reading with ordered access, iteration %u\n", n); + } + if (opts.coherent_set_enabled || opts.ordered_access_enabled) { + dds_begin_coherent(app.subscriber); + } + + for (unsigned int i = 0; i < opts.num_topics; ++i) { + do { + if(!opts.use_read) { + if(opts.take_read_next_instance) { + if (!printed_message) { + printed_message = true; + log_message(app.logger, DEBUG, "Calling take_next_instance() function"); + } + retval = dds_take_next(app.readers[i], samples,sample_infos); + } else { + if (!printed_message) { + printed_message = true; + log_message(app.logger, DEBUG, "Calling take() function"); + } + retval = dds_take(app.readers[i], samples, sample_infos, MAX_SAMPLES, MAX_SAMPLES); + } + } else { + if(opts.take_read_next_instance) { + if (!printed_message) { + printed_message = true; + log_message(app.logger, DEBUG, "Calling read_next_instance() function"); + } + retval = dds_read_next(app.readers[i], samples,sample_infos); + } else { + if (!printed_message) { + printed_message = true; + log_message(app.logger, DEBUG, "Calling read() function"); + } + retval = dds_read(app.readers[i], samples, sample_infos, MAX_SAMPLES, MAX_SAMPLES); + } + } + + if (retval > DDS_RETCODE_OK) { + log_message(app.logger, DEBUG, "Read %d sample(s), printing them...",retval); + + for (size_t n_sample = 0; n_sample < retval; n_sample++) { + log_message(app.logger, DEBUG, "Processing sample %lu",n_sample); + + ShapeType* sample = samples[n_sample]; + dds_sample_info_t* sample_info = &sample_infos[n_sample]; + + if (sample_info->valid_data) { + char name[default_buffer_size]; + + dds_entity_t topic = dds_get_topic(app.readers[i]); + dds_get_name(topic, name, default_buffer_size); + + printf("%-10s %-10s %03d %03d [%d]",name,sample->color,sample->x,sample->y,sample->shapesize); + if(sample->additional_payload_size._length > 0){ + int additional_payload_index = sample->additional_payload_size._length - 1; + printf(" {%u}", sample->additional_payload_size._buffer[additional_payload_index]); + } + printf("\n"); + } else { + ShapeType shape_key; + dds_instance_get_key(app.readers[i], sample_info->instance_handle, &shape_key); + if (sample_info->instance_state == DDS_IST_NOT_ALIVE_NO_WRITERS) { + dds_entity_t reader_topic = dds_get_topic(app.readers[i]); + char temp; + dds_return_t name_len = dds_get_name(reader_topic, &temp, 1); + char* name = calloc(name_len + 1, sizeof(char)); + dds_get_name(reader_topic, name, name_len + 1); + printf("%-10s %-10s NOT_ALIVE_NO_WRITERS_INSTANCE_STATE\n", name, shape_key.color); + free(name); + } else if (sample_info->instance_state == DDS_IST_NOT_ALIVE_DISPOSED) { + dds_entity_t reader_topic = dds_get_topic(app.readers[i]); + char temp; + dds_return_t name_len = dds_get_name(reader_topic, &temp, 1); + char* name = calloc(name_len + 1, sizeof(char)); + dds_get_name(reader_topic, name, name_len + 1); + printf("%-10s %-10s NOT_ALIVE_DISPOSED_INSTANCE_STATE\n", name, shape_key.color); + free(name); + } + } + + previous_handle = sample_info[0].instance_handle; + dds_return_loan(app.readers[i], (void**)samples, MAX_SAMPLES); + } + } + } while (retval >= DDS_RETCODE_OK); + } + if (opts.coherent_set_enabled || opts.ordered_access_enabled) { + dds_end_coherent(app.subscriber); + } + + n++; + log_message(app.logger, DEBUG, "Subscriber iteration: <%u>", n); + log_message(app.logger, DEBUG, "Max number of iterations <%u>", opts.num_iterations); + if (opts.num_iterations != 0 && opts.num_iterations <= n) { + all_done = 1; + } + + for( size_t i = 0; i < MAX_SAMPLES; i ++) ShapeType_free(samples[i],DDS_FREE_ALL); + usleep(opts.read_period_us); + } + + return true; +} + +void moveShape(ShapeType *shape, ShapeApp_t* app) { + shape->x = shape->x + app->xval; + shape->y = shape->x + app->yval; + + if (shape->x < 0) { + shape->x = 0; + app->xval = -app->xval; + } + if (shape->x > app->da_width) { + shape->x = app->da_width; + app->xval = -app->xval; + } + if (shape->y < 0) { + shape->y = 0; + app->yval = -app->yval; + } + if (shape->y > app->da_hight) { + shape->y = app->da_hight; + app->yval = -app->yval; + } +} + +bool run_publisher(ShapeApp_t app, ShapeOptions_t opts) { + log_message(app.logger, DEBUG, "Running run_publisher() function"); + ShapeType shape; + // number of iterations performed + unsigned int n = 0; + + srand((uint32_t)time(NULL)); + size_t index = 0; + + memcpy(shape.color, app.color, strlen(app.color)); + shape.color[strlen(app.color)] = '\0'; + + shape.shapesize = opts.shapesize; + shape.x = random() % app.da_width; + shape.y = random() % app.da_hight; + + app.xval = ((random() % 5) + 1) * ((random() % 2)?-1:1); + app.yval = ((random() % 5) + 1) * ((random() % 2)?-1:1); + + if (opts.additional_payload_size > 0) { + int size = opts.additional_payload_size; + shape.additional_payload_size._buffer = dds_sequence_uint8_allocbuf(size); + shape.additional_payload_size._maximum = size; + shape.additional_payload_size._length = size; + shape.additional_payload_size._buffer[size - 1] = 255; + } else { + shape.additional_payload_size._buffer = dds_sequence_uint8_allocbuf(0); + shape.additional_payload_size._maximum = 0; + shape.additional_payload_size._length = 0; + } + + while(! all_done) { + moveShape(&shape, &app); + + if (opts.shapesize == 0) shape.shapesize += 1; + + if (opts.coherent_set_enabled || opts.ordered_access_enabled) { + // n also represents the number of samples written per publisher per instance + if (opts.coherent_set_sample_count != 0 && n % opts.coherent_set_sample_count == 0) { + printf("Started Coherent Set\n"); + dds_begin_coherent(app.publisher); + } + } + + for (unsigned int i = 0; i < opts.num_topics; ++i) { + for (unsigned int j = 0; j < opts.num_instances; ++j) { + //Publish different instances with the same content (except for the color) + if (opts.num_instances > 1) { + if (strlen(opts.color) > 0) { + sscanf(shape.color, "%s%u", opts.color, j); + } else { + sscanf(shape.color, "%s", opts.color); + } + if (opts.unregister) { + dds_unregister_instance(app.writers[i], &shape); + } + if (opts.dispose) { + dds_dispose(app.writers[i], &shape); + } + } + dds_return_t rc = dds_write(app.writers[i], &shape); + if (opts.print_writer_samples) { + dds_entity_t writer_topic = dds_get_topic(app.writers[i]); + char temp; + dds_return_t name_len = dds_get_name(writer_topic, &temp, 1); + char* name = calloc(name_len + 1, sizeof(char)); + printf("%-10s %-10s %03d %03d [%d]", name, + shape.color, + shape.x, + shape.y, + shape.shapesize); + if (opts.additional_payload_size > 0) { + int additional_payload_index = opts.additional_payload_size - 1; + printf(" {%u}", shape.additional_payload_size._buffer[additional_payload_index]); + } + printf("\n"); + free(name); + } + } + } + + if (opts.coherent_set_enabled || opts.ordered_access_enabled) { + if (opts.coherent_set_sample_count != 0 + && n % opts.coherent_set_sample_count == opts.coherent_set_sample_count - 1) { + printf("Finished Coherent Set\n"); + dds_end_coherent(app.publisher); + } + } + + usleep(opts.write_period_us); + + n++; + + log_message(app.logger, DEBUG, "Publisher iteration: <%u>", n); + log_message(app.logger, DEBUG, "Max number of iterations <%u>", opts.num_iterations); + + if (opts.num_iterations != 0 && opts.num_iterations <= n){ + all_done = 1; + } + } + + if (opts.dispose || opts.unregister) { + for (unsigned int i = 0; i < opts.num_topics; ++i) { + for (unsigned int j = 0; j < opts.num_instances; ++j) { + if (opts.num_instances > 1) { + if (j > 0) { + sprintf(shape.color, "%s%u", opts.color, j); + } else { + sprintf(shape.color, "%s", opts.color); + } + } + if (opts.unregister) { + dds_unregister_instance(app.writers[i], &shape); + } + if (opts.dispose) { + dds_dispose(app.writers[i], &shape); + } + } + } + } + + for (unsigned int i = 0; i < opts.num_topics; ++i) { + dds_wait_for_acks(app.writers[i], DDS_SECS(1)); + } + + return true; +} + +bool run(ShapeApp_t app, ShapeOptions_t opts) { + log_message(app.logger, DEBUG,"Running run() function"); + + if (app.publisher != 0) { + return run_publisher(app, opts); + } else if (app.subscriber != 0) { + return run_subscriber(app, opts); + } + + return false; +} + +void shape_free(ShapeApp_t * app,const ShapeOptions_t *opts) { + dds_delete(app->dp); + dds_delete_listener(app->dp_listner); +} + +int main(int argc,char **argv) +{ + + ShapeOptions_t opts; + Logger logger; + logger.verbosity_ = ERROR; + + shape_options_init(&opts); + if (!parse(argc, argv, &logger, &opts)) { + return ERROR_PARSING_ARGUMENTS; + } + //set_verbosity(&logger, DEBUG); + + ShapeApp_t shape_app; + + if (!shape_init(&shape_app,&opts, &logger)) { + return ERROR_INITIALIZING; + } + + if (!run(shape_app, opts)) { + return ERROR_RUNNING; + } + + shape_free(&shape_app,&opts); + + return 0; +} From b91fee2201a259e7fe675d72bcd004287913931e Mon Sep 17 00:00:00 2001 From: disganaitis Date: Tue, 10 Feb 2026 10:58:57 +0200 Subject: [PATCH 02/12] Test reliability fixes by adding the apropriate python string literal prefix to regex, fixed code not finding the name for cyclonedds topics --- interoperability_report.py | 11 +++++++---- srcC/shape_main.c | 3 ++- test_suite_functions.py | 32 ++++++++++++++++---------------- 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/interoperability_report.py b/interoperability_report.py index 51c2eaaa..e5c9d540 100644 --- a/interoperability_report.py +++ b/interoperability_report.py @@ -168,7 +168,7 @@ def run_subscriber_shape_main( log_message(f'Subscriber {subscriber_index}: Waiting for data', verbosity) index = child_sub.expect( [ - '\[[0-9]+\]', # index = 0 + r'\[[0-9]+\]', # index = 0 'on_requested_incompatible_qos()', # index = 1 'on_requested_deadline_missed()', # index = 2 pexpect.TIMEOUT, # index = 3 @@ -324,7 +324,7 @@ def run_publisher_shape_main( if '-w ' in parameters or parameters.endswith('-w'): # Step 5: Check whether the writer sends the samples index = child_pub.expect([ - '\[[0-9]+\]', # index = 0 + r'\[[0-9]+\]', # index = 0 'on_offered_deadline_missed()', # index = 1 pexpect.TIMEOUT # index = 2 ], @@ -341,12 +341,15 @@ def run_publisher_shape_main( for x in range(0, MAX_SAMPLES_SAVED, 1): # At this point, at least one sample has been printed # Therefore, that sample is added to samples_sent. - pub_string = re.search('[0-9]+ [0-9]+ \[[0-9]+\]', + pub_string = re.search(r'[0-9]+ [0-9]+ \[[0-9]+\]', child_pub.before + child_pub.after) + if not pub_string: + produced_code[produced_code_index] = ReturnCode.DATA_NOT_CORRECT + break last_sample = pub_string.group(0) samples_sent.put(last_sample) index = child_pub.expect([ - '\[[0-9]+\]', # index = 0 + r'\[[0-9]+\]', # index = 0 'on_offered_deadline_missed()', # index = 1 pexpect.TIMEOUT # index = 2 ], diff --git a/srcC/shape_main.c b/srcC/shape_main.c index 527f9e32..decdd9ce 100644 --- a/srcC/shape_main.c +++ b/srcC/shape_main.c @@ -878,7 +878,7 @@ void on_publication_matched (dds_entity_t writer, const dds_publication_matched_ exit(-1); } - printf("%s() topic: '%s' type: '%s' : matched readers %d (change = %d)\n", __FUNCTION__, + printf("%s() topic: '%s' type: '%s' : matched readers %d (change = %d)\n", "on_publication_matched", topic_name, type_name, status.current_count, status.current_count_change); } @@ -1619,6 +1619,7 @@ bool run_publisher(ShapeApp_t app, ShapeOptions_t opts) { char temp; dds_return_t name_len = dds_get_name(writer_topic, &temp, 1); char* name = calloc(name_len + 1, sizeof(char)); + dds_get_name(writer_topic, name, name_len + 1); printf("%-10s %-10s %03d %03d [%d]", name, shape.color, shape.x, diff --git a/test_suite_functions.py b/test_suite_functions.py index 6baf9e3d..5965169e 100644 --- a/test_suite_functions.py +++ b/test_suite_functions.py @@ -51,7 +51,7 @@ def test_ownership_receivers(child_sub, samples_sent, last_sample_saved, timeout # [shapesize] # Example: child_sub.before contains 'Square BLUE 191 152' # child_sub.after contains '[30]' - sub_string = re.search('[0-9]+ [0-9]+ \[([0-9]+)\]', + sub_string = re.search(r'[0-9]+ [0-9]+ \[([0-9]+)\]', child_sub.before + child_sub.after) # sub_string contains 'x y [shapesize]', example: '191 152 [30]' @@ -81,7 +81,7 @@ def test_ownership_receivers(child_sub, samples_sent, last_sample_saved, timeout # Get the next samples the subscriber is receiving index = child_sub.expect( [ - '\[[0-9]+\]', # index = 0 + r'\[[0-9]+\]', # index = 0 pexpect.TIMEOUT, # index = 1 ], timeout @@ -141,7 +141,7 @@ def test_ownership_receivers_by_samples_sent(child_sub, samples_sent, last_sampl # [shapesize] # Example: child_sub.before contains 'Square BLUE 191 152' # child_sub.after contains '[30]' - sub_string = re.search('[0-9]+ [0-9]+ \[[0-9]+\]', + sub_string = re.search(r'[0-9]+ [0-9]+ \[[0-9]+\]', child_sub.before + child_sub.after) # sub_string contains 'x y [shapesize]', example: '191 152 [30]' @@ -213,7 +213,7 @@ def test_ownership_receivers_by_samples_sent(child_sub, samples_sent, last_sampl # Get the next samples the subscriber is receiving index = child_sub.expect( [ - '\[[0-9]+\]', # index = 0 + r'\[[0-9]+\]', # index = 0 pexpect.TIMEOUT, # index = 1 ], timeout @@ -272,7 +272,7 @@ def test_color_receivers(child_sub, samples_sent, last_sample_saved, timeout): last_sample_saved: not used timeout: time pexpect waits until it matches a pattern. """ - sub_string = re.search('\w\s+(\w+)\s+[0-9]+ [0-9]+ \[[0-9]+\]', + sub_string = re.search(r'\w\s+(\w+)\s+[0-9]+ [0-9]+ \[[0-9]+\]', child_sub.before + child_sub.after) first_sample_color = sub_string.group(1) @@ -288,7 +288,7 @@ def test_color_receivers(child_sub, samples_sent, last_sample_saved, timeout): index = child_sub.expect( [ - '\[[0-9]+\]', # index = 0 + r'\[[0-9]+\]', # index = 0 pexpect.TIMEOUT # index = 1 ], timeout @@ -299,7 +299,7 @@ def test_color_receivers(child_sub, samples_sent, last_sample_saved, timeout): samples_read += 1 - sub_string = re.search('\w\s+(\w+)\s+[0-9]+ [0-9]+ \[[0-9]+\]', + sub_string = re.search(r'\w\s+(\w+)\s+[0-9]+ [0-9]+ \[[0-9]+\]', child_sub.before + child_sub.after) print(f'Samples read: {samples_read}') @@ -319,7 +319,7 @@ def test_reliability_order(child_sub, samples_sent, last_sample_saved, timeout): produced_code = ReturnCode.OK # Read the first sample printed by the subscriber - sub_string = re.search('[0-9]+ [0-9]+ \[([0-9]+)\]', + sub_string = re.search(r'[0-9]+ [0-9]+ \[([0-9]+)\]', child_sub.before + child_sub.after) last_size = 0 @@ -337,7 +337,7 @@ def test_reliability_order(child_sub, samples_sent, last_sample_saved, timeout): # Get the next sample the subscriber is receiving index = child_sub.expect( [ - '\[[0-9]+\]', # index = 0 + r'\[[0-9]+\]', # index = 0 pexpect.TIMEOUT # index = 1 ], timeout @@ -349,7 +349,7 @@ def test_reliability_order(child_sub, samples_sent, last_sample_saved, timeout): samples_read += 1 # search the next received sample by the subscriber app - sub_string = re.search('[0-9]+ [0-9]+ \[([0-9]+)\]', + sub_string = re.search(r'[0-9]+ [0-9]+ \[([0-9]+)\]', child_sub.before + child_sub.after) print(f'Samples read: {samples_read}') @@ -373,7 +373,7 @@ def test_reliability_no_losses(child_sub, samples_sent, last_sample_saved, timeo processed_samples = 0 # take the first sample received by the subscriber - sub_string = re.search('[0-9]+ [0-9]+ \[[0-9]+\]', + sub_string = re.search(r'[0-9]+ [0-9]+ \[[0-9]+\]', child_sub.before + child_sub.after) # This makes sure that at least one sample has been received @@ -425,7 +425,7 @@ def test_reliability_no_losses(child_sub, samples_sent, last_sample_saved, timeo # Get the next sample the subscriber is receiving index = child_sub.expect( [ - '\[[0-9]+\]', # index = 0 + r'\[[0-9]+\]', # index = 0 pexpect.TIMEOUT # index = 1 ], timeout @@ -435,7 +435,7 @@ def test_reliability_no_losses(child_sub, samples_sent, last_sample_saved, timeo break samples_read += 1 # search the next received sample by the subscriber app - sub_string = re.search('[0-9]+ [0-9]+ \[[0-9]+\]', + sub_string = re.search(r'[0-9]+ [0-9]+ \[[0-9]+\]', child_sub.before + child_sub.after) print(f'Samples read: {samples_read}') @@ -460,7 +460,7 @@ def test_durability_volatile(child_sub, samples_sent, last_sample_saved, timeout # Read the first sample, if it has the size > 5, it is using volatile # durability correctly - sub_string = re.search('[0-9]+ [0-9]+ \[([0-9]+)\]', + sub_string = re.search(r'[0-9]+ [0-9]+ \[([0-9]+)\]', child_sub.before + child_sub.after) # Check if the element received is not the first 5 samples (aka size >= 5) @@ -492,7 +492,7 @@ def test_durability_transient_local(child_sub, samples_sent, last_sample_saved, # Read the first sample, if it has the size == 1, it is using transient # local durability correctly - sub_string = re.search('[0-9]+ [0-9]+ \[([0-9]+)\]', + sub_string = re.search(r'[0-9]+ [0-9]+ \[([0-9]+)\]', child_sub.before + child_sub.after) # Check if the element is the first one sent (aka size == 1), which should @@ -530,7 +530,7 @@ def test_deadline_missed(child_sub, samples_sent, last_sample_saved, timeout): return ReturnCode.DEADLINE_MISSED else: index = child_sub.expect([ - '\[[0-9]+\]', # index = 0 + r'\[[0-9]+\]', # index = 0 pexpect.TIMEOUT # index = 1 ], timeout) From c2011710333bd3fa94f54dd7de64505a54047cd4 Mon Sep 17 00:00:00 2001 From: Deividas Isganaitis Date: Tue, 10 Feb 2026 09:11:40 +0000 Subject: [PATCH 03/12] __FUNCTION__ was working fine, forgot to put it back --- srcC/shape_main.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srcC/shape_main.c b/srcC/shape_main.c index decdd9ce..23621bbb 100644 --- a/srcC/shape_main.c +++ b/srcC/shape_main.c @@ -878,7 +878,7 @@ void on_publication_matched (dds_entity_t writer, const dds_publication_matched_ exit(-1); } - printf("%s() topic: '%s' type: '%s' : matched readers %d (change = %d)\n", "on_publication_matched", + printf("%s() topic: '%s' type: '%s' : matched readers %d (change = %d)\n", __FUNCTION__, topic_name, type_name, status.current_count, status.current_count_change); } From 0116ff8c23a35857f0b8b805b39ee1140a08f141 Mon Sep 17 00:00:00 2001 From: disganaitis Date: Wed, 18 Feb 2026 16:11:33 +0000 Subject: [PATCH 04/12] Added -z 0 to Reliability_4 to make sure all samples are unique Adde timeout to the samples_sent queue since a sufficiently fast dds's subscriber would return a sample to pexpect before the queue receives a sample from dds's publisher leading to an empty queue issue where it isn't supposed to happen --- test_suite.py | 2 +- test_suite_functions.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test_suite.py b/test_suite.py index ee0198f6..aca9c766 100644 --- a/test_suite.py +++ b/test_suite.py @@ -173,7 +173,7 @@ # This test checks that data is received in the right order 'Test_Reliability_4' : { - 'apps' : ['-P -t Square -r -k 0 -w', '-S -t Square -r -k 0'], + 'apps' : ['-P -t Square -r -k 0 -z 0 -w', '-S -t Square -r -k 0'], 'expected_codes' : [ReturnCode.OK, ReturnCode.OK], 'check_function' : tsf.test_reliability_no_losses, 'title' : 'Behavior of RELIABLE reliability', diff --git a/test_suite_functions.py b/test_suite_functions.py index 5965169e..34e6129a 100644 --- a/test_suite_functions.py +++ b/test_suite_functions.py @@ -409,7 +409,7 @@ def test_reliability_no_losses(child_sub, samples_sent, last_sample_saved, timeo # a pub_sample so we don't need to get it from the queue first_execution = False else: - pub_sample = samples_sent[0].get(block=False) + pub_sample = samples_sent[0].get(block=True, timeout=0.1) if pub_sample != sub_string.group(0): produced_code = ReturnCode.DATA_NOT_CORRECT From 7fed56f76d52e446597d7e11a210f506760709bd Mon Sep 17 00:00:00 2001 From: disganaitis Date: Fri, 20 Feb 2026 07:47:28 +0000 Subject: [PATCH 05/12] Setting durability_service for cycloneDDS allows for TRANSIENT_LOCAL QoS to function properly fixing Test_Durability_17 --- srcC/shape_main.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/srcC/shape_main.c b/srcC/shape_main.c index 23621bbb..9a20e880 100644 --- a/srcC/shape_main.c +++ b/srcC/shape_main.c @@ -1098,10 +1098,13 @@ void set_deadline_interval(dds_qos_t* qos, dds_time_t deadline_interval, Logger* void set_history_depth(dds_qos_t* qos, int history_depth, Logger* logger){ if (history_depth < 0) { dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, 1); + dds_qset_durability_service(qos, 0, DDS_HISTORY_KEEP_LAST, 1, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED); } else if (history_depth > 0) { dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, history_depth); + dds_qset_durability_service(qos, 0, DDS_HISTORY_KEEP_LAST, history_depth, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED); } else { dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, 0); + dds_qset_durability_service(qos, 0, DDS_HISTORY_KEEP_ALL, 0, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED); } dds_history_kind_t history; From 9fd0145beebbace14fe01baa99eb7f2f35ed51a3 Mon Sep 17 00:00:00 2001 From: disganaitis Date: Fri, 20 Mar 2026 16:59:24 +0000 Subject: [PATCH 06/12] Added Rocket Software CLA --- CLA/CLA_Rocket_Software.md | 61 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 CLA/CLA_Rocket_Software.md diff --git a/CLA/CLA_Rocket_Software.md b/CLA/CLA_Rocket_Software.md new file mode 100644 index 00000000..30a28925 --- /dev/null +++ b/CLA/CLA_Rocket_Software.md @@ -0,0 +1,61 @@ +# OMG DDS INTEROPERABILITY REPOSITORY - CONTRIBUTOR LICENSE AGREEMENT + +**This Contributor License Agreement ("Agreement") specifies the terms under which the individual or corporate entity specified in the signature block below (“You”) agree to make intellectual property contributions to the OMG DDS Interoperability Repository. BY SIGNING BELOW YOU ARE AGREEING TO BE BOUND BY THE TERMS OF THIS AGREEMENT. If You are signing this Agreement in Your capacity as an employee, THEN YOUR EMPLOYER AND YOU ARE BOTH BOUND BY THIS AGREEMENT.** + +1. Definitions + + 1. "OMG DDS Interoperability Repository" (or “Repository”) means the Git repository [https://github.com/omg-dds/dds-rtps](https://github.com/omg-dds/dds-rtps). + + 2. "Moderator" means an entity or individual responsible for authorizing changes to the Repository. + + 3. "Submit" (or “Submitted”) means any submission, including source code, binaries, code, pull requests, issue reports, comments, etc., made to the Moderators for inclusion in the Repository either through the Git repository interface or through electronic file transfer. + + 4. A "Contribution" is any original work of authorship, including any modifications or additions to an existing work, that You Submit to the DDS Interoperability Repository. + + 5. A "User" is anyone who accesses the Repository. + +2. Allowable Contribution Representations + + 1. You represent that You have the necessary rights to the Contribution(s) to meet the obligations of this Agreement. If You are employed, Your employer has authorized Contribution(s) under this Agreement. + + 2. You represent that you have no knowledge of third-party intellectual property rights that are likely to be infringed by the Contribution(s). You represent that you have no knowledge that such infringement or any allegation of misappropriation of intellectual property rights is likely to be claimed or has already been claimed. + +3. License + + You grant Moderators a perpetual, worldwide, non-exclusive, assignable, paid-up license to publish, display, and redistribute the Contribution as part of the Repository. You also license to Moderators under the same terms any other intellectual property rights required to publish, display, and redistribute the Contributions as part of the Repository. You further grant all Users of the Repository a license to the Contribution under the terms of the [OMG DDS Interoperability Testing License](../LICENSE.md) included in the Repository. Moderators are under no obligation to publish Contributions. + +4. No Warranty, Consequential Damages. Limited Liability + + Other than explicitly stated herein, You provide the Contribution(s) "as is" with no warranty nor claims of fitness to any purpose. Neither party shall be liable for consequential or special damages of any kind. Other than for breach of warranty or representations herein, the liability of either party to the other shall be limited to $1000. + +5. General + + 1. If You are an agency of the United States Government, then this Agreement will be governed by the United States federal common law. Otherwise, this Agreement will be governed by the laws of the State of California except with regard to its choice of law rules. + + 2. A party may assign this Agreement to an entity acquiring essentially all of the party’s relevant business. + +6. Electronic Signatures + + "Electronic Signature" means any electronic sound, symbol, or process attached to or logically associated with a record and executed and adopted by a party with the intent to sign such record. + + Each party agrees that the Electronic Signatures, whether digital or encrypted, of the parties included in this Agreement are intended to authenticate this writing and to have the same force and effect as manual signatures. + + +IN WITNESS WHEREOF, You, intending to be legally bound, have executed this Agreement or caused Your employer’s proper and duly authorized officer to execute and deliver this Agreement, for good and valuable consideration, the sufficiency of which is hereby acknowledged, as of the day and year first written below. + +**For:** + +Entity Name: Rocket Software + +Address: 77 4th Avenue, Waltham, MA 02451, USA + + ("**You**") + +**By:** + +Name: Puneet Kohli + +Title: President, Application Modernization + +Date: March 20, 2026 + From c0e5f87e0d075799e3026eddde4c1d7035737416 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Deividas=20I=C5=A1ganaitis?= Date: Mon, 30 Mar 2026 09:15:41 +0000 Subject: [PATCH 07/12] Implemented missing features from master --- rtps_test_utilities.py | 2 +- srcC/shape_bounded.idl | 9 +++ srcC/shape_main.c | 154 +++++++++++++++++++++++++++++++--------- test_suite_functions.py | 6 +- 4 files changed, 135 insertions(+), 36 deletions(-) create mode 100644 srcC/shape_bounded.idl diff --git a/rtps_test_utilities.py b/rtps_test_utilities.py index 4b2e746b..483d0bf5 100644 --- a/rtps_test_utilities.py +++ b/rtps_test_utilities.py @@ -63,7 +63,7 @@ def no_check(child_sub, samples_sent, last_sample_saved, timeout): def basic_check(child_sub, samples_sent, last_sample_saved, timeout): """ Only checks that the data is well formed and size is not zero.""" - sub_string = re.search('\w\s+\w+\s+[0-9]+ [0-9]+ \[([0-9]+)\]', + sub_string = re.search(r'\w\s+\w+\s+[0-9]+ [0-9]+ \[([0-9]+)\]', child_sub.before + child_sub.after) if sub_string is None: diff --git a/srcC/shape_bounded.idl b/srcC/shape_bounded.idl new file mode 100644 index 00000000..3ffde7e5 --- /dev/null +++ b/srcC/shape_bounded.idl @@ -0,0 +1,9 @@ +@appendable +struct ShapeType { + @key + string<128> color; + long x; + long y; + long shapesize; + sequence additional_payload_size; +}; diff --git a/srcC/shape_main.c b/srcC/shape_main.c index 9a20e880..e7dcf10d 100644 --- a/srcC/shape_main.c +++ b/srcC/shape_main.c @@ -212,6 +212,10 @@ typedef struct ShapeOptions { bool take_read_next_instance; useconds_t periodic_announcement_period_us; + + unsigned int datafrag_size; + char* cft_expression; + int size_modulo; } ShapeOptions_t; @@ -268,6 +272,11 @@ void shape_options_init(ShapeOptions_t* shape_options) shape_options->periodic_announcement_period_us = 0; + shape_options->datafrag_size = 0; // Default: 0 (means not set) + shape_options->cft_expression = NULL; + + shape_options->size_modulo = 0; // 0 means disabled + return; } @@ -275,6 +284,7 @@ void shape_options_free(ShapeOptions_t* shape_options){ free(shape_options->topic_name); free(shape_options->color); free(shape_options->partition); + free(shape_options->cft_expression); return; } @@ -340,6 +350,15 @@ void print_usage( const char *prog ) printf(" read_next_instance()\n"); printf(" --periodic-announcement : indicates the periodic participant\n"); printf(" announcement period in ms. Default 0 (off)\n"); + printf(" --datafrag-size : set the data fragment size (default: 0, means\n"); + printf(" not set)\n"); + printf(" --cft : ContentFilteredTopic filter expression (quotes\n"); + printf(" required around the expression). Cannot be used with\n"); + printf(" -c on subscriber applications\n"); + printf(" --size-modulo : If set, the modulo operation is applied to the\n"); + printf(" shapesize. This will make that shapesize is in the\n"); + printf(" range [1,N]. This only applies if shapesize is\n"); + printf(" increased (-z 0)\n"); } bool validate(Logger* logger, ShapeOptions_t* shape_options) { @@ -368,6 +387,9 @@ bool validate(Logger* logger, ShapeOptions_t* shape_options) { if (shape_options->publish && (!shape_options->take_read_next_instance)) { log_message(logger, ERROR, "warning: --take-read ignored on publisher applications"); } + if (shape_options->publish && shape_options->cft_expression != NULL) { + log_message(logger, ERROR, "warning: --cft ignored on publisher applications"); + } if (shape_options->subscribe && (shape_options->shapesize != 20)){ log_message(logger, ERROR, "warning: shapesize [-z] ignored on subscriber applications"); } @@ -386,6 +408,13 @@ bool validate(Logger* logger, ShapeOptions_t* shape_options) { if (!shape_options->coherent_set_enabled && !shape_options->ordered_access_enabled && shape_options->coherent_set_access_scope_set) { log_message(logger, ERROR, "warning: --access-scope ignored because not coherent, or ordered access enabled"); } + if (shape_options->size_modulo > 0 && shape_options->shapesize != 0) { + log_message(logger, ERROR, "warning: --size-modulo has no effect unless shapesize (-z) is set to 0"); + } + if (shape_options->subscribe && shape_options->color != NULL && shape_options->cft_expression != NULL) { + log_message(logger, ERROR, "error: cannot specify both --cft and -c for subscriber applications"); + return false; + } return true; } @@ -413,6 +442,9 @@ bool parse(int argc, char *argv[], Logger* logger, ShapeOptions_t* shape_options {"take-read", required_argument, NULL, 'K'}, {"time-filter", required_argument, NULL, 'i'}, {"periodic-announcement", required_argument, NULL, 'N'}, + {"datafrag-size", required_argument, NULL, 'Z'}, + {"cft", required_argument, NULL, 'F'}, + {"size-modulo", required_argument, NULL, 'Q'}, {NULL, 0, NULL, 0 } }; @@ -732,6 +764,34 @@ bool parse(int argc, char *argv[], Logger* logger, ShapeOptions_t* shape_options shape_options->periodic_announcement_period_us = converted_param * 1000ll; break; } + case 'Z': { + unsigned int converted_param = 0; + if (sscanf(optarg, "%u", &converted_param) == 0) { + log_message(logger, ERROR, "unrecognized value for datafrag-size %c", optarg[0]); + parse_ok = false; + } + // the spec mentions that the fragment size must satisfy: + // fragment size <= 65535 bytes. + if (converted_param > 65535) { + log_message(logger, ERROR, "incorrect value for datafrag-size, " + "it must be <= 65535 bytes%u", converted_param); + parse_ok = false; + } + shape_options->datafrag_size = converted_param; + } + case 'F': + shape_options->cft_expression = strdup(optarg); + break; + case 'Q': { + int converted_param = 0; + if (sscanf(optarg, "%d", &converted_param) == 0 || converted_param < 1) { + log_message(logger, ERROR, "incorrect value for size-modulo, must be >=1"); + parse_ok = false; + } else { + shape_options->size_modulo = converted_param; + } + break; + } case '?': parse_ok = false; break; @@ -758,7 +818,9 @@ bool parse(int argc, char *argv[], Logger* logger, ShapeOptions_t* shape_options printf(" TimeBasedFilterInterval = %u ms\n",shape_options->timebasedfilter_interval_us / 1000ll); printf(" DeadlineInterval = %u ms\n", shape_options->deadline_interval_us / 1000ll); printf(" Shapesize = %d\n", shape_options->shapesize); - printf(" Reading method = %s\n", (shape_options->use_read ? "read_next_instance" : "take_next_instance")); + printf(" Reading method = %s\n", (shape_options->use_read + ? (shape_options->take_read_next_instance ? "read_next_instance" : "read") + : (shape_options->take_read_next_instance ? "take_next_instance" : "take"))); printf(" Write period = %u ms\n", shape_options->write_period_us / 1000ll); printf(" Read period = %u ms\n", shape_options->read_period_us / 1000ll); printf(" Lifespan = %u ms\n", shape_options->lifespan_us / 1000ll); @@ -773,6 +835,7 @@ bool parse(int argc, char *argv[], Logger* logger, ShapeOptions_t* shape_options printf(" Final Instance State = %s\n", (shape_options->unregister ? "Unregister" : (shape_options->dispose ? "Dispose" : "not specified"))); printf(" Periodic Announcement Period = %u ms\n", shape_options->periodic_announcement_period_us / 1000ll); + printf(" Data Fragmentation Size = %u bytes\n", shape_options->datafrag_size); if (shape_options->topic_name != NULL){ printf(" Topic = %s\n", shape_options->topic_name); } @@ -1092,7 +1155,7 @@ void set_deadline_interval(dds_qos_t* qos, dds_time_t deadline_interval, Logger* dds_qget_deadline(qos, &duration); - log_message(logger, DEBUG," DeadlinePeriod = %lldns", duration); + log_message(logger, DEBUG," DeadlinePeriod = %lld nanosecs", duration); } void set_history_depth(dds_qos_t* qos, int history_depth, Logger* logger){ @@ -1125,7 +1188,7 @@ void set_time_based_filter(dds_qos_t* qos, dds_duration_t timebasedfilter_interv } dds_qget_time_based_filter(qos, &duration ); - log_message(logger,DEBUG, " TimeBasedFilter = %lldns", duration); + log_message(logger,DEBUG, " TimeBasedFilter = %lld nanosecs", duration); } void set_presentation(dds_qos_t* qos, dds_presentation_access_scope_kind_t coherent_set_access_scope, bool coherent_set_enabled, bool ordered_access_enabled, Logger* logger) { @@ -1147,7 +1210,7 @@ void set_lifespan(dds_qos_t* qos, dds_duration_t lifespan, Logger* logger) { dds_duration_t lfspn; dds_qget_lifespan(qos, &lfspn); - log_message(logger, DEBUG, " Lifespan = %lldns", lfspn); + log_message(logger, DEBUG, " Lifespan = %lld nanosecs", lfspn); } void set_writer_data_lifecycle(dds_qos_t* qos, bool autodispose, Logger* logger){ @@ -1213,7 +1276,7 @@ bool init_publisher(const ShapeOptions_t* opts, ShapeApp_t* app) { log_message(app->logger, DEBUG, "DataWriters created:"); for (unsigned int i = 0; i < opts->num_topics; ++i) { - log_message(app->logger, DEBUG, " dws[%u]=%d", i, app->writers[i]); + log_message(app->logger, DEBUG, " dws(%u)=%d", i, app->writers[i]); } app->color = strdup(opts->color); @@ -1267,21 +1330,26 @@ bool init_subscriber(const ShapeOptions_t* opts, ShapeApp_t* app) { set_deadline_interval(dr_qos, opts->deadline_interval_us * 1000ll, app->logger); set_history_depth(dr_qos, opts->history_depth, app->logger); - if (opts->color != NULL) { - app->color = opts->color; + if (opts->cft_expression != NULL) { + log_message(app->logger, ERROR, "ContectFilterTopic Not Supported"); + return false; + } + if (opts->cft_expression != NULL || opts->color != NULL) { for (unsigned int i = 0; i < opts->num_topics; ++i) { + char temp; + size_t name_len = dds_get_name(app->topics[i], &temp, 1); + char* name = malloc(sizeof(char) * (name_len + 1)); + dds_get_name(app->topics[i], name, name_len + 1); + if (dds_set_topic_filter_and_arg(app->topics[i],color_filter, app->color) >= 0 ) { log_message(app->logger, DEBUG, " ContentFilterTopic = \"color = %s\"", opts->color); } else { log_message(app->logger, ERROR, "failed to create content filtered topic"); return false; } - char temp; - size_t name_len = dds_get_name(app->topics[i], &temp, 1); - char* name = malloc(sizeof(char) * (name_len + 1)); - dds_get_name(app->topics[i], name, name_len + 1); - printf("Create reader for topic: %s%s color: %s\n", name, "_filtered", opts->color); + + printf("Create reader for topic: %s%s\n", name, "_filtered"); app->readers[i] = dds_create_reader(app->subscriber, app->topics[i], dr_qos, NULL); if (app->readers[i] == 0) { log_message(app->logger, ERROR, "Failed to create datareader[%u] topic: %s", i, name); @@ -1308,7 +1376,7 @@ bool init_subscriber(const ShapeOptions_t* opts, ShapeApp_t* app) { } log_message(app->logger, DEBUG,"DataReaders created:"); for (unsigned int i = 0; i < opts->num_topics; ++i) { - log_message(app->logger, DEBUG, " drs[%u]=%ld", i, app->readers[i]); + log_message(app->logger, DEBUG, " drs(%u)=%ld", i, app->readers[i]); } return true; @@ -1321,6 +1389,9 @@ bool shape_init (ShapeApp_t* app, const ShapeOptions_t* opts, Logger* logger) { app->publisher = 0; app->subscriber = 0; app->dp = 0; + app->topics = NULL; + app->readers = NULL; + app->writers = NULL; app->dp_listner = dds_create_listener(logger); dds_lset_inconsistent_topic(app->dp_listner, on_inconsistent_topic); @@ -1339,6 +1410,16 @@ bool shape_init (ShapeApp_t* app, const ShapeOptions_t* opts, Logger* logger) { dds_qos_t* dp_qos = dds_create_qos(); + if (opts->datafrag_size > 0) { + bool result = false; + if (!result) { + log_message(logger, ERROR, "Error configuring Data Fragmentation Size = %u", opts->datafrag_size); + return false; + } else { + log_message(logger, DEBUG, "Data Fragmentation Size = %u", opts->datafrag_size); + } + } + app->dp = dds_create_participant(opts->domain_id, dp_qos, app->dp_listner); if (app->dp < 0 ) { @@ -1361,22 +1442,24 @@ bool shape_init (ShapeApp_t* app, const ShapeOptions_t* opts, Logger* logger) { app->topics[i] = 0; } - app->readers = (dds_entity_t*) malloc(sizeof(dds_entity_t) * opts->num_topics); - if (app->readers == NULL) { - log_message(logger, ERROR, "Error allocating memory for DataReaders"); - return false; - } - for (unsigned int i = 0; i < opts->num_topics; ++i) { - app->readers[i] = 0; - } - - app->writers = (dds_entity_t*) malloc(sizeof(dds_entity_t) * opts->num_topics); - if (app->writers == NULL) { - log_message(logger, ERROR, "Error allocating memory for DataWriters"); - return false; - } - for (unsigned int i = 0; i < opts->num_topics; ++i) { - app->writers[i] = 0; + if (opts->publish) { + app->writers = (dds_entity_t*) malloc(sizeof(dds_entity_t) * opts->num_topics); + if (app->writers == NULL) { + log_message(logger, ERROR, "Error allocating memory for DataWriters"); + return false; + } + for (unsigned int i = 0; i < opts->num_topics; ++i) { + app->writers[i] = 0; + } + } else { + app->readers = (dds_entity_t*) malloc(sizeof(dds_entity_t) * opts->num_topics); + if (app->readers == NULL) { + log_message(logger, ERROR, "Error allocating memory for DataReaders"); + return false; + } + for (unsigned int i = 0; i < opts->num_topics; ++i) { + app->readers[i] = 0; + } } for (unsigned int i = 0; i < opts->num_topics; ++i) { @@ -1401,7 +1484,7 @@ bool shape_init (ShapeApp_t* app, const ShapeOptions_t* opts, Logger* logger) { } log_message(logger, DEBUG, "Topics created:"); for (unsigned int i = 0; i < opts->num_topics; ++i) { - log_message(logger, DEBUG, " topic[%d]=%p", i, (void*)app->topics[i]); + log_message(logger, DEBUG, " topic(%d)=%p", i, (void*)app->topics[i]); } if (opts->publish) { @@ -1488,7 +1571,8 @@ bool run_subscriber(ShapeApp_t app, ShapeOptions_t opts) { printf(" {%u}", sample->additional_payload_size._buffer[additional_payload_index]); } printf("\n"); - } else { + } + if (sample_info->instance_state != DDS_IST_ALIVE) { ShapeType shape_key; dds_instance_get_key(app.readers[i], sample_info->instance_handle, &shape_key); if (sample_info->instance_state == DDS_IST_NOT_ALIVE_NO_WRITERS) { @@ -1590,7 +1674,13 @@ bool run_publisher(ShapeApp_t app, ShapeOptions_t opts) { while(! all_done) { moveShape(&shape, &app); - if (opts.shapesize == 0) shape.shapesize += 1; + if (opts.shapesize == 0) { + if (opts.size_modulo > 0) { + shape.shapesize = (shape.shapesize % opts.size_modulo) + 1; + } else { + shape.shapesize += 1; + } + } if (opts.coherent_set_enabled || opts.ordered_access_enabled) { // n also represents the number of samples written per publisher per instance diff --git a/test_suite_functions.py b/test_suite_functions.py index 693540d4..40a1779a 100644 --- a/test_suite_functions.py +++ b/test_suite_functions.py @@ -317,7 +317,7 @@ def test_size_less_than_20(child_sub, samples_sent, last_sample_saved, timeout): samples_read = 0 return_code = ReturnCode.OK - sub_string = re.search('[0-9]+ [0-9]+ \[([0-9]+)\]', child_sub.before + child_sub.after) + sub_string = re.search(r'[0-9]+ [0-9]+ \[([0-9]+)\]', child_sub.before + child_sub.after) while sub_string is not None and samples_read < max_samples_received: size = int(sub_string.group(1)) @@ -327,7 +327,7 @@ def test_size_less_than_20(child_sub, samples_sent, last_sample_saved, timeout): index = child_sub.expect( [ - '\[[0-9]+\]', # index = 0 + r'\[[0-9]+\]', # index = 0 pexpect.TIMEOUT, # index = 1 pexpect.EOF # index = 2 ], @@ -338,7 +338,7 @@ def test_size_less_than_20(child_sub, samples_sent, last_sample_saved, timeout): break samples_read += 1 - sub_string = re.search('[0-9]+ [0-9]+ \[([0-9]+)\]', child_sub.before + child_sub.after) + sub_string = re.search(r'[0-9]+ [0-9]+ \[([0-9]+)\]', child_sub.before + child_sub.after) print(f'Samples read: {samples_read}') return return_code From d060f1aeae9ec077a11ce67da30ac671d0752f5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Deividas=20I=C5=A1ganaitis?= Date: Mon, 30 Mar 2026 09:29:32 +0000 Subject: [PATCH 08/12] Catch up with master --- srcC/shape_main.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/srcC/shape_main.c b/srcC/shape_main.c index e7dcf10d..5265a1c0 100644 --- a/srcC/shape_main.c +++ b/srcC/shape_main.c @@ -778,6 +778,7 @@ bool parse(int argc, char *argv[], Logger* logger, ShapeOptions_t* shape_options parse_ok = false; } shape_options->datafrag_size = converted_param; + break; } case 'F': shape_options->cft_expression = strdup(optarg); @@ -1664,7 +1665,9 @@ bool run_publisher(ShapeApp_t app, ShapeOptions_t opts) { shape.additional_payload_size._buffer = dds_sequence_uint8_allocbuf(size); shape.additional_payload_size._maximum = size; shape.additional_payload_size._length = size; - shape.additional_payload_size._buffer[size - 1] = 255; + for (int i = 0; i < size; ++i) { + shape.additional_payload_size._buffer[i] = 255; + } } else { shape.additional_payload_size._buffer = dds_sequence_uint8_allocbuf(0); shape.additional_payload_size._maximum = 0; From 700d271bd60de32c3a18c8a7faa8afc5641465f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Deividas=20I=C5=A1ganaitis?= Date: Mon, 30 Mar 2026 11:52:18 +0000 Subject: [PATCH 09/12] Use take_next_instance instead of take_next Implemented the array of previous_instances Swapped the take_next function to the take_next_instance function Moved Return Loan to a more reasonable location dds_take(_next_instance) returns 0(DDS_RETCODE_OK) instead of DDS_RETCODE_NO_DATA which made the loop never exit, it should be fixed now --- srcC/shape_main.c | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/srcC/shape_main.c b/srcC/shape_main.c index 5265a1c0..10a097a0 100644 --- a/srcC/shape_main.c +++ b/srcC/shape_main.c @@ -1498,13 +1498,22 @@ bool shape_init (ShapeApp_t* app, const ShapeOptions_t* opts, Logger* logger) { bool run_subscriber(ShapeApp_t app, ShapeOptions_t opts) { // This is the number of iterations performed unsigned int n = 0; + dds_instance_handle_t *previous_handles = NULL; log_message(app.logger, DEBUG, "Running run_subscriber() function"); bool printed_message = false; + + previous_handles = (dds_instance_handle_t*) malloc(sizeof(dds_instance_handle_t) * opts.num_topics); + if (previous_handles == NULL) { + log_message(app.logger, ERROR, "Error allocating memory for previous_handles"); + return false; + } + for (int i = 0; i < opts.num_topics; ++i) { + previous_handles[i] = 0; + } while(!all_done) { dds_return_t retval; - dds_instance_handle_t previous_handle; dds_sample_info_t sample_infos[MAX_SAMPLES]; void* samples[MAX_SAMPLES]; @@ -1520,6 +1529,7 @@ bool run_subscriber(ShapeApp_t app, ShapeOptions_t opts) { } for (unsigned int i = 0; i < opts.num_topics; ++i) { + previous_handles[i] = 0; do { if(!opts.use_read) { if(opts.take_read_next_instance) { @@ -1527,7 +1537,7 @@ bool run_subscriber(ShapeApp_t app, ShapeOptions_t opts) { printed_message = true; log_message(app.logger, DEBUG, "Calling take_next_instance() function"); } - retval = dds_take_next(app.readers[i], samples,sample_infos); + retval = dds_take_next_instance(app.readers[i], samples,sample_infos, MAX_SAMPLES, MAX_SAMPLES, previous_handles[i]); } else { if (!printed_message) { printed_message = true; @@ -1541,7 +1551,7 @@ bool run_subscriber(ShapeApp_t app, ShapeOptions_t opts) { printed_message = true; log_message(app.logger, DEBUG, "Calling read_next_instance() function"); } - retval = dds_read_next(app.readers[i], samples,sample_infos); + retval = dds_read_next_instance(app.readers[i], samples,sample_infos, MAX_SAMPLES, MAX_SAMPLES, previous_handles[i]); } else { if (!printed_message) { printed_message = true; @@ -1594,12 +1604,12 @@ bool run_subscriber(ShapeApp_t app, ShapeOptions_t opts) { free(name); } } - - previous_handle = sample_info[0].instance_handle; - dds_return_loan(app.readers[i], (void**)samples, MAX_SAMPLES); } + dds_return_loan(app.readers[i], (void**)samples, MAX_SAMPLES); + previous_handles[i] = sample_infos[0].instance_handle; } - } while (retval >= DDS_RETCODE_OK); + log_message(app.logger, DEBUG, "retval: %d", retval); + } while (retval > DDS_RETCODE_OK); } if (opts.coherent_set_enabled || opts.ordered_access_enabled) { dds_end_coherent(app.subscriber); @@ -1615,6 +1625,8 @@ bool run_subscriber(ShapeApp_t app, ShapeOptions_t opts) { for( size_t i = 0; i < MAX_SAMPLES; i ++) ShapeType_free(samples[i],DDS_FREE_ALL); usleep(opts.read_period_us); } + + free(previous_handles); return true; } From 6da0326edea5c285056d93cb15afe2d24576d7ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Deividas=20I=C5=A1ganaitis?= Date: Tue, 31 Mar 2026 12:12:28 +0000 Subject: [PATCH 10/12] typo --- srcC/shape_main.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srcC/shape_main.c b/srcC/shape_main.c index 10a097a0..a9c08c62 100644 --- a/srcC/shape_main.c +++ b/srcC/shape_main.c @@ -1143,7 +1143,7 @@ void set_ownership(dds_qos_t* qos, int ownership_strength, Logger* logger) { if(ownership_kind == DDS_OWNERSHIP_EXCLUSIVE) { dds_qget_ownership_strength(qos, &ownership_strength); - log_message(logger, DEBUG, " OwnershptStrength = %d", ownership_strength); + log_message(logger, DEBUG, " OwnershiptStrength = %d", ownership_strength); } } From 4b0bb09eab48de4fb368a73ea26a470a3ce958d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Deividas=20I=C5=A1ganaitis?= Date: Tue, 31 Mar 2026 12:23:34 +0000 Subject: [PATCH 11/12] Removed unnecesary dispose Also added j > 0 check --- srcC/shape_main.c | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/srcC/shape_main.c b/srcC/shape_main.c index a9c08c62..3d1b5e38 100644 --- a/srcC/shape_main.c +++ b/srcC/shape_main.c @@ -1709,17 +1709,11 @@ bool run_publisher(ShapeApp_t app, ShapeOptions_t opts) { for (unsigned int j = 0; j < opts.num_instances; ++j) { //Publish different instances with the same content (except for the color) if (opts.num_instances > 1) { - if (strlen(opts.color) > 0) { + if (strlen(opts.color) > 0 && j > 0) { sscanf(shape.color, "%s%u", opts.color, j); } else { sscanf(shape.color, "%s", opts.color); } - if (opts.unregister) { - dds_unregister_instance(app.writers[i], &shape); - } - if (opts.dispose) { - dds_dispose(app.writers[i], &shape); - } } dds_return_t rc = dds_write(app.writers[i], &shape); if (opts.print_writer_samples) { @@ -1727,7 +1721,7 @@ bool run_publisher(ShapeApp_t app, ShapeOptions_t opts) { char temp; dds_return_t name_len = dds_get_name(writer_topic, &temp, 1); char* name = calloc(name_len + 1, sizeof(char)); - dds_get_name(writer_topic, name, name_len + 1); + dds_get_name(writer_topic, name, name_len + 1); printf("%-10s %-10s %03d %03d [%d]", name, shape.color, shape.x, From cbe8e9042194b687285516a28635ced2c07714c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Deividas=20I=C5=A1ganaitis?= Date: Tue, 31 Mar 2026 12:31:01 +0000 Subject: [PATCH 12/12] Added consideration for additional QoS setting required by CycloneDDS --- doc/test_description.template.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/doc/test_description.template.rst b/doc/test_description.template.rst index b110c1b8..7f13f0a8 100644 --- a/doc/test_description.template.rst +++ b/doc/test_description.template.rst @@ -102,4 +102,8 @@ product versions. * Content Filtered Topic disabled +* **CycloneDDS**: + + * Durability Service QoS requires explicit setting when History Depth is set + |TEST_DESCRIPTION|