From ecb4901e12a6be29896ab492d3de68f9b1963d6d Mon Sep 17 00:00:00 2001 From: Stefan Kimmer Date: Wed, 18 Mar 2026 12:44:42 +0100 Subject: [PATCH 1/5] make Cft0 work --- srcRs/DustDDS/src/main.rs | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/srcRs/DustDDS/src/main.rs b/srcRs/DustDDS/src/main.rs index 2b69116f..e9e0ead3 100644 --- a/srcRs/DustDDS/src/main.rs +++ b/srcRs/DustDDS/src/main.rs @@ -227,6 +227,16 @@ struct Options { /// uses take()/read() instead of take_next_instance() read_next_instance() #[clap(short = 'K', long = "take-read")] take_read: bool, + + /// ContentFilteredTopic filter expression (quotes required around the expression). Cannot be used with -c on + /// subscriber applications + #[clap(short = 'F', long = "cft")] + cft_expression: Option, + + /// If set, the modulo operation is applied to the shapesize. This will make that shapesize is in the range [1,N]. + /// This only applies if shapesize is increased (-z 0) + #[clap(short = 'Q', long = "size-modulo")] + size_modulo: Option, } impl Options { @@ -240,7 +250,7 @@ impl Options { Ok(()) } - fn color_for_publisher(&self) -> String { + fn interpret_color(&self) -> String { match self.color.clone() { Some(color) => color, None => { @@ -510,7 +520,7 @@ fn init_publisher( println!( "Create writer for topic: {} color: {}", options.topic_name, - options.color_for_publisher() + options.interpret_color() ); let mut data_writer_qos = DataWriterQos { @@ -550,7 +560,7 @@ fn run_publisher( let da_width = 240; let da_height = 270; let mut shape = ShapeType { - color: options.color_for_publisher(), + color: options.interpret_color(), x: random::() % da_width, y: random::() % da_height, shapesize: options.shapesize, @@ -621,18 +631,16 @@ fn init_subscriber( ); } - let data_reader = match options.color { - // filter on specified color - Some(color) => { + let data_reader = match options.cft_expression { + Some(cft_expression) => { let filtered_topic_name = options.topic_name + "_filtered"; - println!( - "Create reader for topic: {} color: {}", - filtered_topic_name, &color - ); + println!("ContentFilterTopic = \"{cft_expression}\""); + println!("Create reader for topic: {} ", filtered_topic_name); + let color = cft_expression.split("=").nth(1).unwrap().trim_matches(&[' ', '\'']).to_string(); let content_filtered_topic = participant.create_contentfilteredtopic( &filtered_topic_name, &topic, - String::from("color = %0"), + cft_expression, vec![color], )?; @@ -643,7 +651,6 @@ fn init_subscriber( NO_STATUS, )? } - // No filter on specified color None => { println!("Create reader for topic: {} ", options.topic_name); subscriber.create_datareader::( From 51bb8962faac79fefe51bc5d02984eea537d403f Mon Sep 17 00:00:00 2001 From: Stefan Kimmer Date: Wed, 18 Mar 2026 17:54:13 +0100 Subject: [PATCH 2/5] update DustDDS --- srcRs/DustDDS/Cargo.toml | 4 +- srcRs/DustDDS/build.rs | 44 +- srcRs/DustDDS/src/main.rs | 1672 +++++++++++++++++++------------------ 3 files changed, 866 insertions(+), 854 deletions(-) diff --git a/srcRs/DustDDS/Cargo.toml b/srcRs/DustDDS/Cargo.toml index 7b824806..c7b28466 100644 --- a/srcRs/DustDDS/Cargo.toml +++ b/srcRs/DustDDS/Cargo.toml @@ -17,7 +17,7 @@ publish = false clap = { version = "4.5.47", features = ["derive", "string"] } rand = "0.8.5" ctrlc = "3.4" -dust_dds = "0.14.0" +dust_dds = version="0.15.0" [build-dependencies] -dust_dds_gen = "0.14.0" +dust_dds_gen = "0.15.0" diff --git a/srcRs/DustDDS/build.rs b/srcRs/DustDDS/build.rs index f30a196e..7b0ee5d8 100644 --- a/srcRs/DustDDS/build.rs +++ b/srcRs/DustDDS/build.rs @@ -1,20 +1,24 @@ -use std::{ - fs::{self, File}, - io::Write, - path::Path, -}; - -fn main() { - let cargo_target_dir = std::env::var("OUT_DIR").unwrap(); - let cargo_manifest_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap(); - let cargo_target_path = Path::new(&cargo_target_dir); - let cargo_manifest_path = Path::new(&cargo_manifest_dir); - let build_path = cargo_target_path.join("idl"); - let idl_path = cargo_manifest_path.join("..").join("..").join("srcCxx").join("shape.idl"); - let compiled_idl = dust_dds_gen::compile_idl(&idl_path).expect("Couldn't parse IDL file"); - let compiled_idl_path = build_path.as_path().join("shape.rs"); - fs::create_dir_all(build_path).expect("Creating build path failed"); - let mut file = File::create(compiled_idl_path).expect("Failed to create file"); - file.write_all(compiled_idl.as_bytes()) - .expect("Failed to write to file"); -} +use std::{ + fs::{self, File}, + io::Write, + path::Path, +}; + +fn main() { + let cargo_target_dir = std::env::var("OUT_DIR").unwrap(); + let cargo_manifest_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap(); + let cargo_target_path = Path::new(&cargo_target_dir); + let cargo_manifest_path = Path::new(&cargo_manifest_dir); + let build_path = cargo_target_path.join("idl"); + let idl_path = cargo_manifest_path + .join("..") + .join("..") + .join("srcCxx") + .join("shape.idl"); + let compiled_idl = dust_dds_gen::compile_idl(&idl_path).expect("Couldn't parse IDL file"); + let compiled_idl_path = build_path.as_path().join("shape.rs"); + fs::create_dir_all(build_path).expect("Creating build path failed"); + let mut file = File::create(compiled_idl_path).expect("Failed to create file"); + file.write_all(compiled_idl.as_bytes()) + .expect("Failed to write to file"); +} diff --git a/srcRs/DustDDS/src/main.rs b/srcRs/DustDDS/src/main.rs index e9e0ead3..40954828 100644 --- a/srcRs/DustDDS/src/main.rs +++ b/srcRs/DustDDS/src/main.rs @@ -1,832 +1,840 @@ -use clap::{Parser, ValueEnum}; -use ctrlc; -use dust_dds::{ - dds_async::topic::TopicAsync, - domain::{ - domain_participant::DomainParticipant, - domain_participant_factory::DomainParticipantFactory, - domain_participant_listener::DomainParticipantListener, - }, - infrastructure::{ - error::DdsError, - qos::{DataReaderQos, DataWriterQos, PublisherQos, QosKind, SubscriberQos}, - qos_policy::{ - self, DataRepresentationQosPolicy, DurabilityQosPolicy, HistoryQosPolicy, - HistoryQosPolicyKind, OwnershipQosPolicy, OwnershipQosPolicyKind, - OwnershipStrengthQosPolicy, PartitionQosPolicy, ReliabilityQosPolicy, - XCDR_DATA_REPRESENTATION, XCDR2_DATA_REPRESENTATION, - }, - sample_info::{ANY_INSTANCE_STATE, ANY_SAMPLE_STATE, ANY_VIEW_STATE}, - status::{InconsistentTopicStatus, NO_STATUS, StatusKind}, - time::DurationKind, - }, - listener::NO_LISTENER, - publication::data_writer::DataWriter, - runtime::DdsRuntime, - std_runtime::StdRuntime, - subscription::data_reader::DataReader, -}; -use rand::{Rng, random, thread_rng}; -use std::{ - fmt::Debug, - io::Write, - process::{ExitCode, Termination}, - sync::mpsc::Receiver, -}; - -include!(concat!(env!("OUT_DIR"), "/idl/shape.rs")); -impl Clone for ShapeType { - fn clone(&self) -> Self { - Self { - color: self.color.clone(), - x: self.x, - y: self.y, - shapesize: self.shapesize, - additional_payload_size: self.additional_payload_size.clone(), - } - } -} - -fn qos_policy_name(id: i32) -> String { - match id { - qos_policy::DATA_REPRESENTATION_QOS_POLICY_ID => "DATAREPRESENTATION", - qos_policy::DEADLINE_QOS_POLICY_ID => "DEADLINE", - qos_policy::DESTINATIONORDER_QOS_POLICY_ID => "DESTINATIONORDER", - qos_policy::DURABILITY_QOS_POLICY_ID => "DURABILITY", - qos_policy::DURABILITYSERVICE_QOS_POLICY_ID => "DURABILITYSERVICE", - qos_policy::ENTITYFACTORY_QOS_POLICY_ID => "ENTITYFACTORY", - qos_policy::GROUPDATA_QOS_POLICY_ID => "GROUPDATA", - qos_policy::HISTORY_QOS_POLICY_ID => "HISTORY", - qos_policy::LATENCYBUDGET_QOS_POLICY_ID => "LATENCYBUDGET", - qos_policy::LIFESPAN_QOS_POLICY_ID => "LIFESPAN", - qos_policy::LIVELINESS_QOS_POLICY_ID => "LIVELINESS", - qos_policy::OWNERSHIP_QOS_POLICY_ID => "OWNERSHIP", - qos_policy::PARTITION_QOS_POLICY_ID => "PARTITION", - qos_policy::PRESENTATION_QOS_POLICY_ID => "PRESENTATION", - qos_policy::READERDATALIFECYCLE_QOS_POLICY_ID => "READERDATALIFECYCLE", - qos_policy::RELIABILITY_QOS_POLICY_ID => "RELIABILITY", - qos_policy::RESOURCELIMITS_QOS_POLICY_ID => "RESOURCELIMITS", - qos_policy::TIMEBASEDFILTER_QOS_POLICY_ID => "TIMEBASEDFILTER", - qos_policy::TOPICDATA_QOS_POLICY_ID => "TOPICDATA", - qos_policy::TRANSPORTPRIORITY_QOS_POLICY_ID => "TRANSPORTPRIORITY", - qos_policy::USERDATA_QOS_POLICY_ID => "USERDATA", - qos_policy::WRITERDATALIFECYCLE_QOS_POLICY_ID => "WRITERDATALIFECYCLE", - _ => "UNKNOWN", - } - .to_string() -} - -#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] -#[clap(rename_all = "kebab_case")] -enum FinalInstanceState { - /// unregister - U, - /// dispose - D, -} - -#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] -#[clap(rename_all = "kebab_case")] -enum AccessScope { - /// INSTANCE - I, - /// TOPIC - T, - /// GROUP - G, -} - -#[derive(Parser, Clone)] -#[command(author, version, about, long_about = None)] -struct Options { - /// publish samples - #[clap(short = 'P', default_value_t = false)] - publish: bool, - - /// subscribe samples - #[clap(short = 'S', default_value_t = false)] - subscribe: bool, - - /// domain id - #[clap(short = 'd', default_value_t = 0)] - domain_id: i32, - - /// BEST_EFFORT reliability - #[clap(short = 'b', default_value_t = false)] - best_effort_reliability: bool, - - /// RELIABLE reliability - #[clap(short = 'r', default_value_t = false)] - reliable_reliability: bool, - - /// keep history depth (-1: use default, 0: KEEP_ALL) - #[clap(short = 'k', default_value_t = -1, allow_negative_numbers = true)] - history_depth: i32, - - /// set a 'deadline' with interval (ms) (0: OFF) - #[clap(short = 'f', default_value_t = 0)] - deadline_interval: u64, - - /// set ownership strength (-1: SHARED) - #[clap(short = 's', default_value_t = -1, allow_negative_numbers = true)] - ownership_strength: i32, - - /// set the topic name - #[clap(short = 't', default_value = "Square")] - topic_name: String, - - /// set color to publish (filter if subscriber) - #[clap(short = 'c', default_value = None)] - color: Option, - - /// set a 'partition' string - #[clap(short = 'p')] - partition: Option, - - /// set durability (v: VOLATILE, l: TRANSIENT_LOCAL, t: TRANSIENT, p: PERSISTENT) - #[clap(short = 'D', default_value_t = 'v')] - durability_kind: char, - - /// set data representation (1: XCDR, 2: XCDR2) - #[clap(short = 'x', default_value_t = 1)] - data_representation: u16, - - /// print Publisher's samples - #[clap(short = 'w', default_value_t = false)] - print_writer_samples: bool, - - /// set shapesize (0: increase the size for every sample) - #[clap(short = 'z', default_value_t = 20)] - shapesize: i32, - - /// use 'read()' instead of 'take()' - #[clap(short = 'R', default_value_t = false)] - use_read: bool, - - /// waiting period between 'write()' operations in ms. - #[clap(long = "write-period", default_value_t = 33)] - write_period_ms: i32, - - /// waiting period between 'read()' or 'take()' operations in ms. - #[clap(long = "read-period", default_value_t = 100)] - read_period_ms: i32, - - /// set log message verbosity (e: ERROR, d: DEBUG) - #[clap(short = 'v', default_value_t = 'e')] - log_message_verbosity: char, - - /// apply 'time based filter' with interval in ms [0: OFF] - #[clap(short = 'i', long = "time-filter")] - time_filter: Option, - - /// indicates the lifespan of a sample in ms - #[clap(short = 'l', long = "lifespan")] - lifespan: Option, - - /// indicates the number of iterations of the main loop - /// After that, the application will exit. Default (0): infinite - #[clap(short = 'n', long = "num-iterations")] - num_iterations: Option, - - /// indicates the number of instances a DataWriter writes If the value is > 1, the additional instances are - /// created by appending a number. For example, if the original color is "BLUE" the instances used are - /// "BLUE", "BLUE1", "BLUE2"... - #[clap(short = 'I', long = "num-instances")] - num_instances: Option, - - /// indicates the number of topics created (using the same type). This also creates a DataReader or DataWriter per - /// topic. If the value is > 1, the additional topic names are created by appending a number: For example, if the - /// original topic name is "Square", the topics created are "Square", "Square1", "Square2"... - #[clap(short = 'E', long = "num-topics")] - num_topics: Option, - - /// indicates the action performed after the DataWriter finishes its execution (before deleting it): - #[clap(short = 'M', long = "final-instance-state")] - final_instance_state: Option, - - /// sets Presentation.access_scope - #[clap(short = 'C', long = "access-scope")] - access_scope: Option, - - /// sets Presentation.coherent_access = true - #[clap(short = 'T', long = "coherent")] - coherent: bool, - - /// sets Presentation.ordered_access = true - #[clap(short = 'O', long = "ordered")] - ordered: bool, - - /// amount of samples sent for each DataWriter and instance that are grouped in a coherent set - #[clap(short = 'H', long = "coherent-sample-count")] - coherent_sample_count: Option, - - /// indicates the amount of bytes added to the samples written (for example to use large data) - #[clap(short = 'B', long = "additional-payload-size")] - additional_payload_size: Option, - - /// uses take()/read() instead of take_next_instance() read_next_instance() - #[clap(short = 'K', long = "take-read")] - take_read: bool, - - /// ContentFilteredTopic filter expression (quotes required around the expression). Cannot be used with -c on - /// subscriber applications - #[clap(short = 'F', long = "cft")] - cft_expression: Option, - - /// If set, the modulo operation is applied to the shapesize. This will make that shapesize is in the range [1,N]. - /// This only applies if shapesize is increased (-z 0) - #[clap(short = 'Q', long = "size-modulo")] - size_modulo: Option, -} - -impl Options { - fn validate(&self) -> Result<(), ParsingError> { - if self.subscribe && self.publish || (!self.subscribe && !self.publish) { - return Err(ParsingError( - "must be either subscribe or publish".to_string(), - )); - } - - Ok(()) - } - - fn interpret_color(&self) -> String { - match self.color.clone() { - Some(color) => color, - None => { - let default_color = "BLUE".to_string(); - println!( - "warning: color was not specified, defaulting to \"{}\"", - default_color - ); - default_color - } - } - } - - fn reliability_qos_policy(&self) -> ReliabilityQosPolicy { - let mut reliability = DataWriterQos::default().reliability; - if self.best_effort_reliability { - reliability.kind = qos_policy::ReliabilityQosPolicyKind::BestEffort; - } - if self.reliable_reliability { - reliability.kind = qos_policy::ReliabilityQosPolicyKind::Reliable; - } - reliability - } - - fn partition_qos_policy(&self) -> PartitionQosPolicy { - if let Some(partition) = &self.partition { - PartitionQosPolicy { - name: vec![partition.to_owned()], - } - } else { - PartitionQosPolicy::default() - } - } - - fn durability_qos_policy(&self) -> DurabilityQosPolicy { - DurabilityQosPolicy { - kind: match self.durability_kind { - 'v' => qos_policy::DurabilityQosPolicyKind::Volatile, - 'l' => qos_policy::DurabilityQosPolicyKind::TransientLocal, - 't' => qos_policy::DurabilityQosPolicyKind::Transient, - 'p' => qos_policy::DurabilityQosPolicyKind::Persistent, - _ => panic!("durability not valid"), - }, - } - } - - fn data_representation_qos_policy(&self) -> DataRepresentationQosPolicy { - let data_representation = match self.data_representation { - 1 => XCDR_DATA_REPRESENTATION, - 2 => XCDR2_DATA_REPRESENTATION, - _ => panic!("Wrong data representation"), - }; - qos_policy::DataRepresentationQosPolicy { - value: vec![data_representation], - } - } - - fn ownership_qos_policy(&self) -> OwnershipQosPolicy { - OwnershipQosPolicy { - kind: match self.ownership_strength { - -1 => qos_policy::OwnershipQosPolicyKind::Shared, - _ => qos_policy::OwnershipQosPolicyKind::Exclusive, - }, - } - } - - fn history_depth_qos_policy(&self) -> HistoryQosPolicy { - match self.history_depth { - -1 => HistoryQosPolicy::default(), - 0 => HistoryQosPolicy { - kind: HistoryQosPolicyKind::KeepAll, - }, - x if x >= 1 => HistoryQosPolicy { - kind: HistoryQosPolicyKind::KeepLast(x as u32), - }, - _ => panic!("history_depth not valid"), - } - } - - fn ownership_strength_qos_policy(&self) -> OwnershipStrengthQosPolicy { - if self.ownership_strength < -1 { - panic!("Ownership strength must be positive or zero") - } - OwnershipStrengthQosPolicy { - value: self.ownership_strength, - } - } -} - -struct Listener; -impl DomainParticipantListener for Listener { - async fn on_inconsistent_topic( - &mut self, - the_topic: TopicAsync, - _status: InconsistentTopicStatus, - ) { - println!( - "on_inconsistent_topic() topic: '{}' type: '{}'", - the_topic.get_name(), - the_topic.get_type_name(), - ); - } - - async fn on_offered_incompatible_qos( - &mut self, - the_writer: dust_dds::dds_async::data_writer::DataWriterAsync, - status: dust_dds::infrastructure::status::OfferedIncompatibleQosStatus, - ) { - let policy_name = qos_policy_name(status.last_policy_id); - println!( - "on_offered_incompatible_qos() topic: '{}' type: '{}' : {:?} ({})", - the_writer.get_topic().get_name(), - the_writer.get_topic().get_type_name(), - status.last_policy_id, - policy_name - ); - } - - async fn on_publication_matched( - &mut self, - the_writer: dust_dds::dds_async::data_writer::DataWriterAsync, - status: dust_dds::infrastructure::status::PublicationMatchedStatus, - ) { - if !the_writer.get_topic().get_name().starts_with("DCPS") { - println!( - "on_publication_matched() topic: '{}' type: '{}' : matched readers {} (change = {})", - the_writer.get_topic().get_name(), - the_writer.get_topic().get_type_name(), - status.current_count, - status.current_count_change - ); - } - } - - async fn on_offered_deadline_missed( - &mut self, - the_writer: dust_dds::dds_async::data_writer::DataWriterAsync, - status: dust_dds::infrastructure::status::OfferedDeadlineMissedStatus, - ) { - println!( - "on_offered_deadline_missed() topic: '{}' type: '{}' : (total = {}, change = {})", - the_writer.get_topic().get_name(), - the_writer.get_topic().get_type_name(), - status.total_count, - status.total_count_change - ); - } - - async fn on_liveliness_lost( - &mut self, - the_writer: dust_dds::dds_async::data_writer::DataWriterAsync, - status: dust_dds::infrastructure::status::LivelinessLostStatus, - ) { - println!( - "on_liveliness_lost() topic: '{}' type: '{}' : (total = {}, change = {})", - the_writer.get_topic().get_name(), - the_writer.get_topic().get_type_name(), - status.total_count, - status.total_count_change - ); - } - - async fn on_requested_incompatible_qos( - &mut self, - the_reader: dust_dds::dds_async::data_reader::DataReaderAsync, - status: dust_dds::infrastructure::status::RequestedIncompatibleQosStatus, - ) { - let policy_name = qos_policy_name(status.last_policy_id); - println!( - "on_requested_incompatible_qos() topic: '{}' type: '{}' : {} ({})\n", - the_reader.get_topicdescription().get_name(), - the_reader.get_topicdescription().get_type_name(), - status.last_policy_id, - policy_name - ); - } - - async fn on_subscription_matched( - &mut self, - the_reader: dust_dds::dds_async::data_reader::DataReaderAsync, - status: dust_dds::infrastructure::status::SubscriptionMatchedStatus, - ) { - if !the_reader - .get_topicdescription() - .get_name() - .starts_with("DCPS") - { - println!( - "on_subscription_matched() topic: '{}' type: '{}' : matched writers {} (change = {})", - the_reader.get_topicdescription().get_name(), - the_reader.get_topicdescription().get_type_name(), - status.current_count, - status.current_count_change - ); - } - } - - async fn on_requested_deadline_missed( - &mut self, - the_reader: dust_dds::dds_async::data_reader::DataReaderAsync, - status: dust_dds::infrastructure::status::RequestedDeadlineMissedStatus, - ) { - println!( - "on_requested_deadline_missed() topic: '{}' type: '{}' : (total = {}, change = {})\n", - the_reader.get_topicdescription().get_name(), - the_reader.get_topicdescription().get_type_name(), - status.total_count, - status.total_count_change - ); - } - - async fn on_liveliness_changed( - &mut self, - the_reader: dust_dds::dds_async::data_reader::DataReaderAsync, - status: dust_dds::infrastructure::status::LivelinessChangedStatus, - ) { - println!( - "on_liveliness_changed() topic: '{}' type: '{}' : (alive = {}, not_alive = {})", - the_reader.get_topicdescription().get_name(), - the_reader.get_topicdescription().get_type_name(), - status.alive_count, - status.not_alive_count, - ); - } -} - -fn move_shape( - shape: &mut ShapeType, - x_vel: &mut i32, - y_vel: &mut i32, - da_width: i32, - da_height: i32, -) { - shape.x = shape.x + *x_vel; - shape.y = shape.y + *y_vel; - if shape.x < 0 { - shape.x = 0; - *x_vel = -*x_vel; - } - if shape.x > da_width { - shape.x = da_width; - *x_vel = -*x_vel; - } - if shape.y < 0 { - shape.y = 0; - *y_vel = -*y_vel; - } - if shape.y > da_height { - shape.y = da_height; - *y_vel = -*y_vel; - } -} - -fn init_publisher( - participant: &DomainParticipant, - options: Options, -) -> Result, InitializeError> { - let topic = participant - .lookup_topicdescription(&options.topic_name) - .expect("lookup_topicdescription succeeds") - .expect("topic existes"); - let publisher_qos = QosKind::Specific(PublisherQos { - partition: options.partition_qos_policy(), - ..Default::default() - }); - let publisher = participant.create_publisher(publisher_qos, NO_LISTENER, NO_STATUS)?; - println!( - "Create writer for topic: {} color: {}", - options.topic_name, - options.interpret_color() - ); - - let mut data_writer_qos = DataWriterQos { - durability: options.durability_qos_policy(), - reliability: options.reliability_qos_policy(), - representation: options.data_representation_qos_policy(), - ownership: options.ownership_qos_policy(), - history: options.history_depth_qos_policy(), - ..Default::default() - }; - if options.deadline_interval > 0 { - data_writer_qos.deadline.period = DurationKind::Finite( - core::time::Duration::from_millis(options.deadline_interval).into(), - ); - } - if options.ownership_qos_policy().kind == OwnershipQosPolicyKind::Exclusive { - data_writer_qos.ownership_strength = options.ownership_strength_qos_policy(); - } - - let data_writer = publisher.create_datawriter::( - &topic, - QosKind::Specific(data_writer_qos), - NO_LISTENER, - NO_STATUS, - )?; - - Ok(data_writer) -} - -fn run_publisher( - data_writer: &DataWriter, - options: Options, - all_done: Receiver<()>, -) -> Result<(), RunningError> { - let mut random_gen = thread_rng(); - - let da_width = 240; - let da_height = 270; - let mut shape = ShapeType { - color: options.interpret_color(), - x: random::() % da_width, - y: random::() % da_height, - shapesize: options.shapesize, - additional_payload_size: vec![], - }; - - // get random non-zero velocity. - let mut x_vel = if random() { - random_gen.gen_range(1..5) - } else { - random_gen.gen_range(-5..-1) - }; - let mut y_vel = if random() { - random_gen.gen_range(1..5) - } else { - random_gen.gen_range(-5..-1) - }; - - while all_done.try_recv().is_err() { - if options.shapesize == 0 { - shape.shapesize += 1; - } - - move_shape(&mut shape, &mut x_vel, &mut y_vel, da_width, da_height); - if options.print_writer_samples { - println!( - "{:10} {:10} {:03} {:03} [{:}]", - options.topic_name.as_str(), - shape.color, - shape.x, - shape.y, - shape.shapesize - ); - } - data_writer.write(shape.clone(), None).ok(); - std::thread::sleep(std::time::Duration::from_millis( - options.write_period_ms as u64, - )); - } - Ok(()) -} - -fn init_subscriber( - participant: &DomainParticipant, - options: Options, -) -> Result, InitializeError> { - let topic = participant - .lookup_topicdescription(&options.topic_name) - .expect("lookup_topicdescription succeeds") - .expect("topic existes"); - let subscriber_qos = QosKind::Specific(SubscriberQos { - partition: options.partition_qos_policy(), - ..Default::default() - }); - let subscriber = participant.create_subscriber(subscriber_qos, NO_LISTENER, NO_STATUS)?; - - let mut data_reader_qos = DataReaderQos { - durability: options.durability_qos_policy(), - reliability: options.reliability_qos_policy(), - representation: options.data_representation_qos_policy(), - ownership: options.ownership_qos_policy(), - history: options.history_depth_qos_policy(), - ..Default::default() - }; - if options.deadline_interval > 0 { - data_reader_qos.deadline.period = DurationKind::Finite( - core::time::Duration::from_millis(options.deadline_interval).into(), - ); - } - - let data_reader = match options.cft_expression { - Some(cft_expression) => { - let filtered_topic_name = options.topic_name + "_filtered"; - println!("ContentFilterTopic = \"{cft_expression}\""); - println!("Create reader for topic: {} ", filtered_topic_name); - let color = cft_expression.split("=").nth(1).unwrap().trim_matches(&[' ', '\'']).to_string(); - let content_filtered_topic = participant.create_contentfilteredtopic( - &filtered_topic_name, - &topic, - cft_expression, - vec![color], - )?; - - subscriber.create_datareader::( - &content_filtered_topic, - QosKind::Specific(data_reader_qos), - NO_LISTENER, - NO_STATUS, - )? - } - None => { - println!("Create reader for topic: {} ", options.topic_name); - subscriber.create_datareader::( - &topic, - QosKind::Specific(data_reader_qos), - NO_LISTENER, - NO_STATUS, - )? - } - }; - - Ok(data_reader) -} - -fn run_subscriber( - data_reader: &DataReader, - options: Options, - all_done: Receiver<()>, -) -> Result<(), RunningError> { - while all_done.try_recv().is_err() { - let mut previous_handle = None; - loop { - let max_samples = i32::MAX; - let read_result = if options.use_read { - data_reader.read_next_instance( - max_samples, - previous_handle, - ANY_SAMPLE_STATE, - ANY_VIEW_STATE, - ANY_INSTANCE_STATE, - ) - } else { - data_reader.take_next_instance( - max_samples, - previous_handle, - ANY_SAMPLE_STATE, - ANY_VIEW_STATE, - ANY_INSTANCE_STATE, - ) - }; - match read_result { - Ok(samples) => { - for sample in samples { - if sample.sample_info.valid_data { - let smaple_data = sample.data.as_ref().expect("data present"); - println!( - "{:10} {:10} {:03} {:03} [{}]", - data_reader.get_topicdescription().get_name(), - smaple_data.color, - smaple_data.x, - smaple_data.y, - smaple_data.shapesize - ); - std::io::stdout().flush().expect("flush stdout succeeds"); - } - previous_handle = Some(sample.sample_info.instance_handle); - } - - std::thread::sleep(std::time::Duration::from_millis( - options.read_period_ms as u64, - )); - } - Err(_) => break, - } - } - } - Ok(()) -} - -fn initialize(options: &Options) -> Result, InitializeError> { - let participant_factory = DomainParticipantFactory::get_instance(); - let participant = participant_factory.create_participant( - options.domain_id, - QosKind::Default, - Some(Listener), - &[ - StatusKind::InconsistentTopic, - StatusKind::OfferedIncompatibleQos, - StatusKind::PublicationMatched, - StatusKind::OfferedDeadlineMissed, - StatusKind::LivelinessLost, - StatusKind::RequestedIncompatibleQos, - StatusKind::SubscriptionMatched, - StatusKind::RequestedDeadlineMissed, - StatusKind::LivelinessChanged, - ], - )?; - println!("Create topic: {}", options.topic_name); - let _topic = participant.create_topic::( - &options.topic_name, - "ShapeType", - QosKind::Default, - NO_LISTENER, - NO_STATUS, - )?; - - Ok(participant) -} - -struct ParsingError(String); -struct InitializeError(String); -struct RunningError(String); - -impl From for InitializeError { - fn from(value: DdsError) -> Self { - Self(format!("DdsError: {:?}", value)) - } -} -impl From for RunningError { - fn from(value: DdsError) -> Self { - Self(format!("DdsError: {:?}", value)) - } -} - -struct Return { - code: u8, - describtion: String, -} -impl Debug for Return { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_fmt(format_args!("code {}: {}", self.code, self.describtion)) - } -} - -impl Termination for Return { - fn report(self) -> ExitCode { - self.code.into() - } -} - -impl From for Return { - fn from(value: ParsingError) -> Self { - Self { - code: 1, - describtion: value.0, - } - } -} - -impl From for Return { - fn from(value: InitializeError) -> Self { - Self { - code: 2, - describtion: value.0, - } - } -} - -impl From for Return { - fn from(value: RunningError) -> Self { - Self { - code: 3, - describtion: value.0, - } - } -} - -fn main() -> Result<(), Return> { - let (tx, rx) = std::sync::mpsc::channel(); - - ctrlc::set_handler(move || tx.send(()).expect("Could not send signal on channel.")) - .expect("Error setting Ctrl-C handler"); - - let options = Options::parse(); - options.validate()?; - let participant = initialize(&options)?; - if options.publish { - let data_writer = init_publisher(&participant, options.clone())?; - run_publisher(&data_writer, options.clone(), rx)?; - } else { - let data_reader = init_subscriber(&participant, options.clone())?; - run_subscriber(&data_reader, options.clone(), rx)?; - } - participant - .delete_contained_entities() - .expect("Entites beeing deleted"); - println!("Done."); - Ok(()) -} +use clap::{Parser, ValueEnum}; +use ctrlc; +use dust_dds::{ + dds_async::topic::TopicAsync, + domain::{ + domain_participant::DomainParticipant, + domain_participant_factory::DomainParticipantFactory, + domain_participant_listener::DomainParticipantListener, + }, + infrastructure::{ + error::DdsError, + qos::{DataReaderQos, DataWriterQos, PublisherQos, QosKind, SubscriberQos}, + qos_policy::{ + self, DataRepresentationQosPolicy, DurabilityQosPolicy, HistoryQosPolicy, + HistoryQosPolicyKind, OwnershipQosPolicy, OwnershipQosPolicyKind, + OwnershipStrengthQosPolicy, PartitionQosPolicy, ReliabilityQosPolicy, + XCDR_DATA_REPRESENTATION, XCDR2_DATA_REPRESENTATION, + }, + sample_info::{ANY_INSTANCE_STATE, ANY_SAMPLE_STATE, ANY_VIEW_STATE}, + status::{InconsistentTopicStatus, NO_STATUS, StatusKind}, + time::DurationKind, + }, + listener::NO_LISTENER, + publication::data_writer::DataWriter, + subscription::data_reader::DataReader, +}; +use rand::{Rng, random, thread_rng}; +use std::{ + fmt::Debug, + io::Write, + process::{ExitCode, Termination}, + sync::mpsc::Receiver, +}; + +include!(concat!(env!("OUT_DIR"), "/idl/shape.rs")); +impl Clone for ShapeType { + fn clone(&self) -> Self { + Self { + color: self.color.clone(), + x: self.x, + y: self.y, + shapesize: self.shapesize, + additional_payload_size: self.additional_payload_size.clone(), + } + } +} + +fn qos_policy_name(id: i32) -> String { + match id { + qos_policy::DATA_REPRESENTATION_QOS_POLICY_ID => "DATAREPRESENTATION", + qos_policy::DEADLINE_QOS_POLICY_ID => "DEADLINE", + qos_policy::DESTINATIONORDER_QOS_POLICY_ID => "DESTINATIONORDER", + qos_policy::DURABILITY_QOS_POLICY_ID => "DURABILITY", + qos_policy::DURABILITYSERVICE_QOS_POLICY_ID => "DURABILITYSERVICE", + qos_policy::ENTITYFACTORY_QOS_POLICY_ID => "ENTITYFACTORY", + qos_policy::GROUPDATA_QOS_POLICY_ID => "GROUPDATA", + qos_policy::HISTORY_QOS_POLICY_ID => "HISTORY", + qos_policy::LATENCYBUDGET_QOS_POLICY_ID => "LATENCYBUDGET", + qos_policy::LIFESPAN_QOS_POLICY_ID => "LIFESPAN", + qos_policy::LIVELINESS_QOS_POLICY_ID => "LIVELINESS", + qos_policy::OWNERSHIP_QOS_POLICY_ID => "OWNERSHIP", + qos_policy::PARTITION_QOS_POLICY_ID => "PARTITION", + qos_policy::PRESENTATION_QOS_POLICY_ID => "PRESENTATION", + qos_policy::READERDATALIFECYCLE_QOS_POLICY_ID => "READERDATALIFECYCLE", + qos_policy::RELIABILITY_QOS_POLICY_ID => "RELIABILITY", + qos_policy::RESOURCELIMITS_QOS_POLICY_ID => "RESOURCELIMITS", + qos_policy::TIMEBASEDFILTER_QOS_POLICY_ID => "TIMEBASEDFILTER", + qos_policy::TOPICDATA_QOS_POLICY_ID => "TOPICDATA", + qos_policy::TRANSPORTPRIORITY_QOS_POLICY_ID => "TRANSPORTPRIORITY", + qos_policy::USERDATA_QOS_POLICY_ID => "USERDATA", + qos_policy::WRITERDATALIFECYCLE_QOS_POLICY_ID => "WRITERDATALIFECYCLE", + _ => "UNKNOWN", + } + .to_string() +} + +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] +#[clap(rename_all = "kebab_case")] +enum FinalInstanceState { + /// unregister + U, + /// dispose + D, +} + +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] +#[clap(rename_all = "kebab_case")] +enum AccessScope { + /// INSTANCE + I, + /// TOPIC + T, + /// GROUP + G, +} + +#[derive(Parser, Clone)] +#[command(author, version, about, long_about = None)] +struct Options { + /// publish samples + #[clap(short = 'P', default_value_t = false)] + publish: bool, + + /// subscribe samples + #[clap(short = 'S', default_value_t = false)] + subscribe: bool, + + /// domain id + #[clap(short = 'd', default_value_t = 0)] + domain_id: i32, + + /// BEST_EFFORT reliability + #[clap(short = 'b', default_value_t = false)] + best_effort_reliability: bool, + + /// RELIABLE reliability + #[clap(short = 'r', default_value_t = false)] + reliable_reliability: bool, + + /// keep history depth (-1: use default, 0: KEEP_ALL) + #[clap(short = 'k', default_value_t = -1, allow_negative_numbers = true)] + history_depth: i32, + + /// set a 'deadline' with interval (ms) (0: OFF) + #[clap(short = 'f', default_value_t = 0)] + deadline_interval: u64, + + /// set ownership strength (-1: SHARED) + #[clap(short = 's', default_value_t = -1, allow_negative_numbers = true)] + ownership_strength: i32, + + /// set the topic name + #[clap(short = 't', default_value = "Square")] + topic_name: String, + + /// set color to publish (filter if subscriber) + #[clap(short = 'c', default_value = None)] + color: Option, + + /// set a 'partition' string + #[clap(short = 'p')] + partition: Option, + + /// set durability (v: VOLATILE, l: TRANSIENT_LOCAL, t: TRANSIENT, p: PERSISTENT) + #[clap(short = 'D', default_value_t = 'v')] + durability_kind: char, + + /// set data representation (1: XCDR, 2: XCDR2) + #[clap(short = 'x', default_value_t = 1)] + data_representation: u16, + + /// print Publisher's samples + #[clap(short = 'w', default_value_t = false)] + print_writer_samples: bool, + + /// set shapesize (0: increase the size for every sample) + #[clap(short = 'z', default_value_t = 20)] + shapesize: i32, + + /// use 'read()' instead of 'take()' + #[clap(short = 'R', default_value_t = false)] + use_read: bool, + + /// waiting period between 'write()' operations in ms. + #[clap(long = "write-period", default_value_t = 33)] + write_period_ms: i32, + + /// waiting period between 'read()' or 'take()' operations in ms. + #[clap(long = "read-period", default_value_t = 100)] + read_period_ms: i32, + + /// set log message verbosity (e: ERROR, d: DEBUG) + #[clap(short = 'v', default_value_t = 'e')] + log_message_verbosity: char, + + /// apply 'time based filter' with interval in ms [0: OFF] + #[clap(short = 'i', long = "time-filter")] + time_filter: Option, + + /// indicates the lifespan of a sample in ms + #[clap(short = 'l', long = "lifespan")] + lifespan: Option, + + /// indicates the number of iterations of the main loop + /// After that, the application will exit. Default (0): infinite + #[clap(short = 'n', long = "num-iterations")] + num_iterations: Option, + + /// indicates the number of instances a DataWriter writes If the value is > 1, the additional instances are + /// created by appending a number. For example, if the original color is "BLUE" the instances used are + /// "BLUE", "BLUE1", "BLUE2"... + #[clap(short = 'I', long = "num-instances")] + num_instances: Option, + + /// indicates the number of topics created (using the same type). This also creates a DataReader or DataWriter per + /// topic. If the value is > 1, the additional topic names are created by appending a number: For example, if the + /// original topic name is "Square", the topics created are "Square", "Square1", "Square2"... + #[clap(short = 'E', long = "num-topics")] + num_topics: Option, + + /// indicates the action performed after the DataWriter finishes its execution (before deleting it): + #[clap(short = 'M', long = "final-instance-state")] + final_instance_state: Option, + + /// sets Presentation.access_scope + #[clap(short = 'C', long = "access-scope")] + access_scope: Option, + + /// sets Presentation.coherent_access = true + #[clap(short = 'T', long = "coherent")] + coherent: bool, + + /// sets Presentation.ordered_access = true + #[clap(short = 'O', long = "ordered")] + ordered: bool, + + /// amount of samples sent for each DataWriter and instance that are grouped in a coherent set + #[clap(short = 'H', long = "coherent-sample-count")] + coherent_sample_count: Option, + + /// indicates the amount of bytes added to the samples written (for example to use large data) + #[clap(short = 'B', long = "additional-payload-size")] + additional_payload_size: Option, + + /// uses take()/read() instead of take_next_instance() read_next_instance() + #[clap(short = 'K', long = "take-read")] + take_read: bool, + + /// ContentFilteredTopic filter expression (quotes required around the expression). Cannot be used with -c on + /// subscriber applications + #[clap(short = 'F', long = "cft")] + cft_expression: Option, + + /// If set, the modulo operation is applied to the shapesize. This will make that shapesize is in the range [1,N]. + /// This only applies if shapesize is increased (-z 0) + #[clap(short = 'Q', long = "size-modulo")] + size_modulo: Option, +} + +impl Options { + fn validate(&self) -> Result<(), ParsingError> { + if self.subscribe && self.publish || (!self.subscribe && !self.publish) { + return Err(ParsingError( + "must be either subscribe or publish".to_string(), + )); + } + + Ok(()) + } + + fn interpret_color(&self) -> String { + match self.color.clone() { + Some(color) => color, + None => { + let default_color = "BLUE".to_string(); + println!( + "warning: color was not specified, defaulting to \"{}\"", + default_color + ); + default_color + } + } + } + + fn reliability_qos_policy(&self) -> ReliabilityQosPolicy { + let mut reliability = DataWriterQos::default().reliability; + if self.best_effort_reliability { + reliability.kind = qos_policy::ReliabilityQosPolicyKind::BestEffort; + } + if self.reliable_reliability { + reliability.kind = qos_policy::ReliabilityQosPolicyKind::Reliable; + } + reliability + } + + fn partition_qos_policy(&self) -> PartitionQosPolicy { + if let Some(partition) = &self.partition { + PartitionQosPolicy { + name: vec![partition.to_owned()], + } + } else { + PartitionQosPolicy::default() + } + } + + fn durability_qos_policy(&self) -> DurabilityQosPolicy { + DurabilityQosPolicy { + kind: match self.durability_kind { + 'v' => qos_policy::DurabilityQosPolicyKind::Volatile, + 'l' => qos_policy::DurabilityQosPolicyKind::TransientLocal, + 't' => qos_policy::DurabilityQosPolicyKind::Transient, + 'p' => qos_policy::DurabilityQosPolicyKind::Persistent, + _ => panic!("durability not valid"), + }, + } + } + + fn data_representation_qos_policy(&self) -> DataRepresentationQosPolicy { + let data_representation = match self.data_representation { + 1 => XCDR_DATA_REPRESENTATION, + 2 => XCDR2_DATA_REPRESENTATION, + _ => panic!("Wrong data representation"), + }; + qos_policy::DataRepresentationQosPolicy { + value: vec![data_representation], + } + } + + fn ownership_qos_policy(&self) -> OwnershipQosPolicy { + OwnershipQosPolicy { + kind: match self.ownership_strength { + -1 => qos_policy::OwnershipQosPolicyKind::Shared, + _ => qos_policy::OwnershipQosPolicyKind::Exclusive, + }, + } + } + + fn history_depth_qos_policy(&self) -> HistoryQosPolicy { + match self.history_depth { + -1 => HistoryQosPolicy::default(), + 0 => HistoryQosPolicy { + kind: HistoryQosPolicyKind::KeepAll, + }, + x if x >= 1 => HistoryQosPolicy { + kind: HistoryQosPolicyKind::KeepLast(x as u32), + }, + _ => panic!("history_depth not valid"), + } + } + + fn ownership_strength_qos_policy(&self) -> OwnershipStrengthQosPolicy { + if self.ownership_strength < -1 { + panic!("Ownership strength must be positive or zero") + } + OwnershipStrengthQosPolicy { + value: self.ownership_strength, + } + } +} + +struct Listener; +impl DomainParticipantListener for Listener { + async fn on_inconsistent_topic( + &mut self, + the_topic: TopicAsync, + _status: InconsistentTopicStatus, + ) { + println!( + "on_inconsistent_topic() topic: '{}' type: '{}'", + the_topic.get_name(), + the_topic.get_type_name(), + ); + } + + async fn on_offered_incompatible_qos( + &mut self, + the_writer: dust_dds::dds_async::data_writer::DataWriterAsync<()>, + status: dust_dds::infrastructure::status::OfferedIncompatibleQosStatus, + ) { + let policy_name = qos_policy_name(status.last_policy_id); + println!( + "on_offered_incompatible_qos() topic: '{}' type: '{}' : {:?} ({})", + the_writer.get_topic().get_name(), + the_writer.get_topic().get_type_name(), + status.last_policy_id, + policy_name + ); + } + + async fn on_publication_matched( + &mut self, + the_writer: dust_dds::dds_async::data_writer::DataWriterAsync<()>, + status: dust_dds::infrastructure::status::PublicationMatchedStatus, + ) { + if !the_writer.get_topic().get_name().starts_with("DCPS") { + println!( + "on_publication_matched() topic: '{}' type: '{}' : matched readers {} (change = {})", + the_writer.get_topic().get_name(), + the_writer.get_topic().get_type_name(), + status.current_count, + status.current_count_change + ); + } + } + + async fn on_offered_deadline_missed( + &mut self, + the_writer: dust_dds::dds_async::data_writer::DataWriterAsync<()>, + status: dust_dds::infrastructure::status::OfferedDeadlineMissedStatus, + ) { + println!( + "on_offered_deadline_missed() topic: '{}' type: '{}' : (total = {}, change = {})", + the_writer.get_topic().get_name(), + the_writer.get_topic().get_type_name(), + status.total_count, + status.total_count_change + ); + } + + async fn on_liveliness_lost( + &mut self, + the_writer: dust_dds::dds_async::data_writer::DataWriterAsync<()>, + status: dust_dds::infrastructure::status::LivelinessLostStatus, + ) { + println!( + "on_liveliness_lost() topic: '{}' type: '{}' : (total = {}, change = {})", + the_writer.get_topic().get_name(), + the_writer.get_topic().get_type_name(), + status.total_count, + status.total_count_change + ); + } + + async fn on_requested_incompatible_qos( + &mut self, + the_reader: dust_dds::dds_async::data_reader::DataReaderAsync<()>, + status: dust_dds::infrastructure::status::RequestedIncompatibleQosStatus, + ) { + let policy_name = qos_policy_name(status.last_policy_id); + println!( + "on_requested_incompatible_qos() topic: '{}' type: '{}' : {} ({})\n", + the_reader.get_topicdescription().get_name(), + the_reader.get_topicdescription().get_type_name(), + status.last_policy_id, + policy_name + ); + } + + async fn on_subscription_matched( + &mut self, + the_reader: dust_dds::dds_async::data_reader::DataReaderAsync<()>, + status: dust_dds::infrastructure::status::SubscriptionMatchedStatus, + ) { + if !the_reader + .get_topicdescription() + .get_name() + .starts_with("DCPS") + { + println!( + "on_subscription_matched() topic: '{}' type: '{}' : matched writers {} (change = {})", + the_reader.get_topicdescription().get_name(), + the_reader.get_topicdescription().get_type_name(), + status.current_count, + status.current_count_change + ); + } + } + + async fn on_requested_deadline_missed( + &mut self, + the_reader: dust_dds::dds_async::data_reader::DataReaderAsync<()>, + status: dust_dds::infrastructure::status::RequestedDeadlineMissedStatus, + ) { + println!( + "on_requested_deadline_missed() topic: '{}' type: '{}' : (total = {}, change = {})\n", + the_reader.get_topicdescription().get_name(), + the_reader.get_topicdescription().get_type_name(), + status.total_count, + status.total_count_change + ); + } + + async fn on_liveliness_changed( + &mut self, + the_reader: dust_dds::dds_async::data_reader::DataReaderAsync<()>, + status: dust_dds::infrastructure::status::LivelinessChangedStatus, + ) { + println!( + "on_liveliness_changed() topic: '{}' type: '{}' : (alive = {}, not_alive = {})", + the_reader.get_topicdescription().get_name(), + the_reader.get_topicdescription().get_type_name(), + status.alive_count, + status.not_alive_count, + ); + } +} + +fn move_shape( + shape: &mut ShapeType, + x_vel: &mut i32, + y_vel: &mut i32, + da_width: i32, + da_height: i32, +) { + shape.x = shape.x + *x_vel; + shape.y = shape.y + *y_vel; + if shape.x < 0 { + shape.x = 0; + *x_vel = -*x_vel; + } + if shape.x > da_width { + shape.x = da_width; + *x_vel = -*x_vel; + } + if shape.y < 0 { + shape.y = 0; + *y_vel = -*y_vel; + } + if shape.y > da_height { + shape.y = da_height; + *y_vel = -*y_vel; + } +} + +fn init_publisher( + participant: &DomainParticipant, + options: Options, +) -> Result, InitializeError> { + let topic = participant + .lookup_topicdescription(&options.topic_name) + .expect("lookup_topicdescription succeeds") + .expect("topic existes"); + let publisher_qos = QosKind::Specific(PublisherQos { + partition: options.partition_qos_policy(), + ..Default::default() + }); + let publisher = participant.create_publisher(publisher_qos, NO_LISTENER, NO_STATUS)?; + println!( + "Create writer for topic: {} color: {}", + options.topic_name, + options.interpret_color() + ); + + let mut data_writer_qos = DataWriterQos { + durability: options.durability_qos_policy(), + reliability: options.reliability_qos_policy(), + representation: options.data_representation_qos_policy(), + ownership: options.ownership_qos_policy(), + history: options.history_depth_qos_policy(), + ..Default::default() + }; + if options.deadline_interval > 0 { + data_writer_qos.deadline.period = DurationKind::Finite( + core::time::Duration::from_millis(options.deadline_interval).into(), + ); + } + if options.ownership_qos_policy().kind == OwnershipQosPolicyKind::Exclusive { + data_writer_qos.ownership_strength = options.ownership_strength_qos_policy(); + } + + let data_writer = publisher.create_datawriter::( + &topic, + QosKind::Specific(data_writer_qos), + NO_LISTENER, + NO_STATUS, + )?; + + Ok(data_writer) +} + +fn run_publisher( + data_writer: &DataWriter, + options: Options, + all_done: Receiver<()>, +) -> Result<(), RunningError> { + let mut random_gen = thread_rng(); + + let da_width = 240; + let da_height = 270; + let mut shape = ShapeType { + color: options.interpret_color(), + x: random::() % da_width, + y: random::() % da_height, + shapesize: options.shapesize, + additional_payload_size: vec![], + }; + + // get random non-zero velocity. + let mut x_vel = if random() { + random_gen.gen_range(1..5) + } else { + random_gen.gen_range(-5..-1) + }; + let mut y_vel = if random() { + random_gen.gen_range(1..5) + } else { + random_gen.gen_range(-5..-1) + }; + + while all_done.try_recv().is_err() { + if options.shapesize == 0 { + if let Some(size_modulo) = options.size_modulo { + // Size cannot be 0, so increase it after modulo operation + shape.shapesize = (shape.shapesize % size_modulo) + 1; + } else { + shape.shapesize += 1; + } + } + + move_shape(&mut shape, &mut x_vel, &mut y_vel, da_width, da_height); + if options.print_writer_samples { + println!( + "{:10} {:10} {:03} {:03} [{:}]", + options.topic_name.as_str(), + shape.color, + shape.x, + shape.y, + shape.shapesize + ); + } + data_writer.write(shape.clone(), None).ok(); + std::thread::sleep(std::time::Duration::from_millis( + options.write_period_ms as u64, + )); + } + Ok(()) +} + +fn init_subscriber( + participant: &DomainParticipant, + options: Options, +) -> Result, InitializeError> { + let topic = participant + .lookup_topicdescription(&options.topic_name) + .expect("lookup_topicdescription succeeds") + .expect("topic existes"); + let subscriber_qos = QosKind::Specific(SubscriberQos { + partition: options.partition_qos_policy(), + ..Default::default() + }); + let subscriber = participant.create_subscriber(subscriber_qos, NO_LISTENER, NO_STATUS)?; + + let mut data_reader_qos = DataReaderQos { + durability: options.durability_qos_policy(), + reliability: options.reliability_qos_policy(), + representation: options.data_representation_qos_policy(), + ownership: options.ownership_qos_policy(), + history: options.history_depth_qos_policy(), + ..Default::default() + }; + if options.deadline_interval > 0 { + data_reader_qos.deadline.period = DurationKind::Finite( + core::time::Duration::from_millis(options.deadline_interval).into(), + ); + } + + let data_reader = match options.cft_expression { + Some(cft_expression) => { + let filtered_topic_name = options.topic_name + "_filtered"; + println!("ContentFilterTopic = \"{cft_expression}\""); + println!("Create reader for topic: {} ", filtered_topic_name); + let color = cft_expression + .split("=") + .nth(1) + .unwrap() + .trim_matches(&[' ', '\'']) + .to_string(); + let content_filtered_topic = participant.create_contentfilteredtopic( + &filtered_topic_name, + &topic, + cft_expression, + vec![color], + )?; + + subscriber.create_datareader::( + &content_filtered_topic, + QosKind::Specific(data_reader_qos), + NO_LISTENER, + NO_STATUS, + )? + } + None => { + println!("Create reader for topic: {} ", options.topic_name); + subscriber.create_datareader::( + &topic, + QosKind::Specific(data_reader_qos), + NO_LISTENER, + NO_STATUS, + )? + } + }; + + Ok(data_reader) +} + +fn run_subscriber( + data_reader: &DataReader, + options: Options, + all_done: Receiver<()>, +) -> Result<(), RunningError> { + while all_done.try_recv().is_err() { + let mut previous_handle = None; + loop { + let max_samples = i32::MAX; + let read_result = if options.use_read { + data_reader.read_next_instance( + max_samples, + previous_handle, + ANY_SAMPLE_STATE, + ANY_VIEW_STATE, + ANY_INSTANCE_STATE, + ) + } else { + data_reader.take_next_instance( + max_samples, + previous_handle, + ANY_SAMPLE_STATE, + ANY_VIEW_STATE, + ANY_INSTANCE_STATE, + ) + }; + match read_result { + Ok(samples) => { + for sample in samples { + if sample.sample_info.valid_data { + let smaple_data = sample.data.as_ref().expect("data present"); + println!( + "{:10} {:10} {:03} {:03} [{}]", + data_reader.get_topicdescription().get_name(), + smaple_data.color, + smaple_data.x, + smaple_data.y, + smaple_data.shapesize + ); + std::io::stdout().flush().expect("flush stdout succeeds"); + } + previous_handle = Some(sample.sample_info.instance_handle); + } + + std::thread::sleep(std::time::Duration::from_millis( + options.read_period_ms as u64, + )); + } + Err(_) => break, + } + } + } + Ok(()) +} + +fn initialize(options: &Options) -> Result { + let participant_factory = DomainParticipantFactory::get_instance(); + let participant = participant_factory.create_participant( + options.domain_id, + QosKind::Default, + Some(Listener), + &[ + StatusKind::InconsistentTopic, + StatusKind::OfferedIncompatibleQos, + StatusKind::PublicationMatched, + StatusKind::OfferedDeadlineMissed, + StatusKind::LivelinessLost, + StatusKind::RequestedIncompatibleQos, + StatusKind::SubscriptionMatched, + StatusKind::RequestedDeadlineMissed, + StatusKind::LivelinessChanged, + ], + )?; + println!("Create topic: {}", options.topic_name); + let _topic = participant.create_topic::( + &options.topic_name, + "ShapeType", + QosKind::Default, + NO_LISTENER, + NO_STATUS, + )?; + + Ok(participant) +} + +struct ParsingError(String); +struct InitializeError(String); +struct RunningError(String); + +impl From for InitializeError { + fn from(value: DdsError) -> Self { + Self(format!("DdsError: {:?}", value)) + } +} +impl From for RunningError { + fn from(value: DdsError) -> Self { + Self(format!("DdsError: {:?}", value)) + } +} + +struct Return { + code: u8, + describtion: String, +} +impl Debug for Return { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("code {}: {}", self.code, self.describtion)) + } +} + +impl Termination for Return { + fn report(self) -> ExitCode { + self.code.into() + } +} + +impl From for Return { + fn from(value: ParsingError) -> Self { + Self { + code: 1, + describtion: value.0, + } + } +} + +impl From for Return { + fn from(value: InitializeError) -> Self { + Self { + code: 2, + describtion: value.0, + } + } +} + +impl From for Return { + fn from(value: RunningError) -> Self { + Self { + code: 3, + describtion: value.0, + } + } +} + +fn main() -> Result<(), Return> { + let (tx, rx) = std::sync::mpsc::channel(); + + ctrlc::set_handler(move || tx.send(()).expect("Could not send signal on channel.")) + .expect("Error setting Ctrl-C handler"); + + let options = Options::parse(); + options.validate()?; + let participant = initialize(&options)?; + if options.publish { + let data_writer = init_publisher(&participant, options.clone())?; + run_publisher(&data_writer, options.clone(), rx)?; + } else { + let data_reader = init_subscriber(&participant, options.clone())?; + run_subscriber(&data_reader, options.clone(), rx)?; + } + participant + .delete_contained_entities() + .expect("Entites beeing deleted"); + println!("Done."); + Ok(()) +} From 16496bec73cd343bab10dbd763ecb1a7cca14322 Mon Sep 17 00:00:00 2001 From: Stefan Kimmer Date: Wed, 18 Mar 2026 18:10:46 +0100 Subject: [PATCH 3/5] fix LR --- srcRs/DustDDS/src/main.rs | 1680 ++++++++++++++++++------------------- 1 file changed, 840 insertions(+), 840 deletions(-) diff --git a/srcRs/DustDDS/src/main.rs b/srcRs/DustDDS/src/main.rs index 40954828..60ae5610 100644 --- a/srcRs/DustDDS/src/main.rs +++ b/srcRs/DustDDS/src/main.rs @@ -1,840 +1,840 @@ -use clap::{Parser, ValueEnum}; -use ctrlc; -use dust_dds::{ - dds_async::topic::TopicAsync, - domain::{ - domain_participant::DomainParticipant, - domain_participant_factory::DomainParticipantFactory, - domain_participant_listener::DomainParticipantListener, - }, - infrastructure::{ - error::DdsError, - qos::{DataReaderQos, DataWriterQos, PublisherQos, QosKind, SubscriberQos}, - qos_policy::{ - self, DataRepresentationQosPolicy, DurabilityQosPolicy, HistoryQosPolicy, - HistoryQosPolicyKind, OwnershipQosPolicy, OwnershipQosPolicyKind, - OwnershipStrengthQosPolicy, PartitionQosPolicy, ReliabilityQosPolicy, - XCDR_DATA_REPRESENTATION, XCDR2_DATA_REPRESENTATION, - }, - sample_info::{ANY_INSTANCE_STATE, ANY_SAMPLE_STATE, ANY_VIEW_STATE}, - status::{InconsistentTopicStatus, NO_STATUS, StatusKind}, - time::DurationKind, - }, - listener::NO_LISTENER, - publication::data_writer::DataWriter, - subscription::data_reader::DataReader, -}; -use rand::{Rng, random, thread_rng}; -use std::{ - fmt::Debug, - io::Write, - process::{ExitCode, Termination}, - sync::mpsc::Receiver, -}; - -include!(concat!(env!("OUT_DIR"), "/idl/shape.rs")); -impl Clone for ShapeType { - fn clone(&self) -> Self { - Self { - color: self.color.clone(), - x: self.x, - y: self.y, - shapesize: self.shapesize, - additional_payload_size: self.additional_payload_size.clone(), - } - } -} - -fn qos_policy_name(id: i32) -> String { - match id { - qos_policy::DATA_REPRESENTATION_QOS_POLICY_ID => "DATAREPRESENTATION", - qos_policy::DEADLINE_QOS_POLICY_ID => "DEADLINE", - qos_policy::DESTINATIONORDER_QOS_POLICY_ID => "DESTINATIONORDER", - qos_policy::DURABILITY_QOS_POLICY_ID => "DURABILITY", - qos_policy::DURABILITYSERVICE_QOS_POLICY_ID => "DURABILITYSERVICE", - qos_policy::ENTITYFACTORY_QOS_POLICY_ID => "ENTITYFACTORY", - qos_policy::GROUPDATA_QOS_POLICY_ID => "GROUPDATA", - qos_policy::HISTORY_QOS_POLICY_ID => "HISTORY", - qos_policy::LATENCYBUDGET_QOS_POLICY_ID => "LATENCYBUDGET", - qos_policy::LIFESPAN_QOS_POLICY_ID => "LIFESPAN", - qos_policy::LIVELINESS_QOS_POLICY_ID => "LIVELINESS", - qos_policy::OWNERSHIP_QOS_POLICY_ID => "OWNERSHIP", - qos_policy::PARTITION_QOS_POLICY_ID => "PARTITION", - qos_policy::PRESENTATION_QOS_POLICY_ID => "PRESENTATION", - qos_policy::READERDATALIFECYCLE_QOS_POLICY_ID => "READERDATALIFECYCLE", - qos_policy::RELIABILITY_QOS_POLICY_ID => "RELIABILITY", - qos_policy::RESOURCELIMITS_QOS_POLICY_ID => "RESOURCELIMITS", - qos_policy::TIMEBASEDFILTER_QOS_POLICY_ID => "TIMEBASEDFILTER", - qos_policy::TOPICDATA_QOS_POLICY_ID => "TOPICDATA", - qos_policy::TRANSPORTPRIORITY_QOS_POLICY_ID => "TRANSPORTPRIORITY", - qos_policy::USERDATA_QOS_POLICY_ID => "USERDATA", - qos_policy::WRITERDATALIFECYCLE_QOS_POLICY_ID => "WRITERDATALIFECYCLE", - _ => "UNKNOWN", - } - .to_string() -} - -#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] -#[clap(rename_all = "kebab_case")] -enum FinalInstanceState { - /// unregister - U, - /// dispose - D, -} - -#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] -#[clap(rename_all = "kebab_case")] -enum AccessScope { - /// INSTANCE - I, - /// TOPIC - T, - /// GROUP - G, -} - -#[derive(Parser, Clone)] -#[command(author, version, about, long_about = None)] -struct Options { - /// publish samples - #[clap(short = 'P', default_value_t = false)] - publish: bool, - - /// subscribe samples - #[clap(short = 'S', default_value_t = false)] - subscribe: bool, - - /// domain id - #[clap(short = 'd', default_value_t = 0)] - domain_id: i32, - - /// BEST_EFFORT reliability - #[clap(short = 'b', default_value_t = false)] - best_effort_reliability: bool, - - /// RELIABLE reliability - #[clap(short = 'r', default_value_t = false)] - reliable_reliability: bool, - - /// keep history depth (-1: use default, 0: KEEP_ALL) - #[clap(short = 'k', default_value_t = -1, allow_negative_numbers = true)] - history_depth: i32, - - /// set a 'deadline' with interval (ms) (0: OFF) - #[clap(short = 'f', default_value_t = 0)] - deadline_interval: u64, - - /// set ownership strength (-1: SHARED) - #[clap(short = 's', default_value_t = -1, allow_negative_numbers = true)] - ownership_strength: i32, - - /// set the topic name - #[clap(short = 't', default_value = "Square")] - topic_name: String, - - /// set color to publish (filter if subscriber) - #[clap(short = 'c', default_value = None)] - color: Option, - - /// set a 'partition' string - #[clap(short = 'p')] - partition: Option, - - /// set durability (v: VOLATILE, l: TRANSIENT_LOCAL, t: TRANSIENT, p: PERSISTENT) - #[clap(short = 'D', default_value_t = 'v')] - durability_kind: char, - - /// set data representation (1: XCDR, 2: XCDR2) - #[clap(short = 'x', default_value_t = 1)] - data_representation: u16, - - /// print Publisher's samples - #[clap(short = 'w', default_value_t = false)] - print_writer_samples: bool, - - /// set shapesize (0: increase the size for every sample) - #[clap(short = 'z', default_value_t = 20)] - shapesize: i32, - - /// use 'read()' instead of 'take()' - #[clap(short = 'R', default_value_t = false)] - use_read: bool, - - /// waiting period between 'write()' operations in ms. - #[clap(long = "write-period", default_value_t = 33)] - write_period_ms: i32, - - /// waiting period between 'read()' or 'take()' operations in ms. - #[clap(long = "read-period", default_value_t = 100)] - read_period_ms: i32, - - /// set log message verbosity (e: ERROR, d: DEBUG) - #[clap(short = 'v', default_value_t = 'e')] - log_message_verbosity: char, - - /// apply 'time based filter' with interval in ms [0: OFF] - #[clap(short = 'i', long = "time-filter")] - time_filter: Option, - - /// indicates the lifespan of a sample in ms - #[clap(short = 'l', long = "lifespan")] - lifespan: Option, - - /// indicates the number of iterations of the main loop - /// After that, the application will exit. Default (0): infinite - #[clap(short = 'n', long = "num-iterations")] - num_iterations: Option, - - /// indicates the number of instances a DataWriter writes If the value is > 1, the additional instances are - /// created by appending a number. For example, if the original color is "BLUE" the instances used are - /// "BLUE", "BLUE1", "BLUE2"... - #[clap(short = 'I', long = "num-instances")] - num_instances: Option, - - /// indicates the number of topics created (using the same type). This also creates a DataReader or DataWriter per - /// topic. If the value is > 1, the additional topic names are created by appending a number: For example, if the - /// original topic name is "Square", the topics created are "Square", "Square1", "Square2"... - #[clap(short = 'E', long = "num-topics")] - num_topics: Option, - - /// indicates the action performed after the DataWriter finishes its execution (before deleting it): - #[clap(short = 'M', long = "final-instance-state")] - final_instance_state: Option, - - /// sets Presentation.access_scope - #[clap(short = 'C', long = "access-scope")] - access_scope: Option, - - /// sets Presentation.coherent_access = true - #[clap(short = 'T', long = "coherent")] - coherent: bool, - - /// sets Presentation.ordered_access = true - #[clap(short = 'O', long = "ordered")] - ordered: bool, - - /// amount of samples sent for each DataWriter and instance that are grouped in a coherent set - #[clap(short = 'H', long = "coherent-sample-count")] - coherent_sample_count: Option, - - /// indicates the amount of bytes added to the samples written (for example to use large data) - #[clap(short = 'B', long = "additional-payload-size")] - additional_payload_size: Option, - - /// uses take()/read() instead of take_next_instance() read_next_instance() - #[clap(short = 'K', long = "take-read")] - take_read: bool, - - /// ContentFilteredTopic filter expression (quotes required around the expression). Cannot be used with -c on - /// subscriber applications - #[clap(short = 'F', long = "cft")] - cft_expression: Option, - - /// If set, the modulo operation is applied to the shapesize. This will make that shapesize is in the range [1,N]. - /// This only applies if shapesize is increased (-z 0) - #[clap(short = 'Q', long = "size-modulo")] - size_modulo: Option, -} - -impl Options { - fn validate(&self) -> Result<(), ParsingError> { - if self.subscribe && self.publish || (!self.subscribe && !self.publish) { - return Err(ParsingError( - "must be either subscribe or publish".to_string(), - )); - } - - Ok(()) - } - - fn interpret_color(&self) -> String { - match self.color.clone() { - Some(color) => color, - None => { - let default_color = "BLUE".to_string(); - println!( - "warning: color was not specified, defaulting to \"{}\"", - default_color - ); - default_color - } - } - } - - fn reliability_qos_policy(&self) -> ReliabilityQosPolicy { - let mut reliability = DataWriterQos::default().reliability; - if self.best_effort_reliability { - reliability.kind = qos_policy::ReliabilityQosPolicyKind::BestEffort; - } - if self.reliable_reliability { - reliability.kind = qos_policy::ReliabilityQosPolicyKind::Reliable; - } - reliability - } - - fn partition_qos_policy(&self) -> PartitionQosPolicy { - if let Some(partition) = &self.partition { - PartitionQosPolicy { - name: vec![partition.to_owned()], - } - } else { - PartitionQosPolicy::default() - } - } - - fn durability_qos_policy(&self) -> DurabilityQosPolicy { - DurabilityQosPolicy { - kind: match self.durability_kind { - 'v' => qos_policy::DurabilityQosPolicyKind::Volatile, - 'l' => qos_policy::DurabilityQosPolicyKind::TransientLocal, - 't' => qos_policy::DurabilityQosPolicyKind::Transient, - 'p' => qos_policy::DurabilityQosPolicyKind::Persistent, - _ => panic!("durability not valid"), - }, - } - } - - fn data_representation_qos_policy(&self) -> DataRepresentationQosPolicy { - let data_representation = match self.data_representation { - 1 => XCDR_DATA_REPRESENTATION, - 2 => XCDR2_DATA_REPRESENTATION, - _ => panic!("Wrong data representation"), - }; - qos_policy::DataRepresentationQosPolicy { - value: vec![data_representation], - } - } - - fn ownership_qos_policy(&self) -> OwnershipQosPolicy { - OwnershipQosPolicy { - kind: match self.ownership_strength { - -1 => qos_policy::OwnershipQosPolicyKind::Shared, - _ => qos_policy::OwnershipQosPolicyKind::Exclusive, - }, - } - } - - fn history_depth_qos_policy(&self) -> HistoryQosPolicy { - match self.history_depth { - -1 => HistoryQosPolicy::default(), - 0 => HistoryQosPolicy { - kind: HistoryQosPolicyKind::KeepAll, - }, - x if x >= 1 => HistoryQosPolicy { - kind: HistoryQosPolicyKind::KeepLast(x as u32), - }, - _ => panic!("history_depth not valid"), - } - } - - fn ownership_strength_qos_policy(&self) -> OwnershipStrengthQosPolicy { - if self.ownership_strength < -1 { - panic!("Ownership strength must be positive or zero") - } - OwnershipStrengthQosPolicy { - value: self.ownership_strength, - } - } -} - -struct Listener; -impl DomainParticipantListener for Listener { - async fn on_inconsistent_topic( - &mut self, - the_topic: TopicAsync, - _status: InconsistentTopicStatus, - ) { - println!( - "on_inconsistent_topic() topic: '{}' type: '{}'", - the_topic.get_name(), - the_topic.get_type_name(), - ); - } - - async fn on_offered_incompatible_qos( - &mut self, - the_writer: dust_dds::dds_async::data_writer::DataWriterAsync<()>, - status: dust_dds::infrastructure::status::OfferedIncompatibleQosStatus, - ) { - let policy_name = qos_policy_name(status.last_policy_id); - println!( - "on_offered_incompatible_qos() topic: '{}' type: '{}' : {:?} ({})", - the_writer.get_topic().get_name(), - the_writer.get_topic().get_type_name(), - status.last_policy_id, - policy_name - ); - } - - async fn on_publication_matched( - &mut self, - the_writer: dust_dds::dds_async::data_writer::DataWriterAsync<()>, - status: dust_dds::infrastructure::status::PublicationMatchedStatus, - ) { - if !the_writer.get_topic().get_name().starts_with("DCPS") { - println!( - "on_publication_matched() topic: '{}' type: '{}' : matched readers {} (change = {})", - the_writer.get_topic().get_name(), - the_writer.get_topic().get_type_name(), - status.current_count, - status.current_count_change - ); - } - } - - async fn on_offered_deadline_missed( - &mut self, - the_writer: dust_dds::dds_async::data_writer::DataWriterAsync<()>, - status: dust_dds::infrastructure::status::OfferedDeadlineMissedStatus, - ) { - println!( - "on_offered_deadline_missed() topic: '{}' type: '{}' : (total = {}, change = {})", - the_writer.get_topic().get_name(), - the_writer.get_topic().get_type_name(), - status.total_count, - status.total_count_change - ); - } - - async fn on_liveliness_lost( - &mut self, - the_writer: dust_dds::dds_async::data_writer::DataWriterAsync<()>, - status: dust_dds::infrastructure::status::LivelinessLostStatus, - ) { - println!( - "on_liveliness_lost() topic: '{}' type: '{}' : (total = {}, change = {})", - the_writer.get_topic().get_name(), - the_writer.get_topic().get_type_name(), - status.total_count, - status.total_count_change - ); - } - - async fn on_requested_incompatible_qos( - &mut self, - the_reader: dust_dds::dds_async::data_reader::DataReaderAsync<()>, - status: dust_dds::infrastructure::status::RequestedIncompatibleQosStatus, - ) { - let policy_name = qos_policy_name(status.last_policy_id); - println!( - "on_requested_incompatible_qos() topic: '{}' type: '{}' : {} ({})\n", - the_reader.get_topicdescription().get_name(), - the_reader.get_topicdescription().get_type_name(), - status.last_policy_id, - policy_name - ); - } - - async fn on_subscription_matched( - &mut self, - the_reader: dust_dds::dds_async::data_reader::DataReaderAsync<()>, - status: dust_dds::infrastructure::status::SubscriptionMatchedStatus, - ) { - if !the_reader - .get_topicdescription() - .get_name() - .starts_with("DCPS") - { - println!( - "on_subscription_matched() topic: '{}' type: '{}' : matched writers {} (change = {})", - the_reader.get_topicdescription().get_name(), - the_reader.get_topicdescription().get_type_name(), - status.current_count, - status.current_count_change - ); - } - } - - async fn on_requested_deadline_missed( - &mut self, - the_reader: dust_dds::dds_async::data_reader::DataReaderAsync<()>, - status: dust_dds::infrastructure::status::RequestedDeadlineMissedStatus, - ) { - println!( - "on_requested_deadline_missed() topic: '{}' type: '{}' : (total = {}, change = {})\n", - the_reader.get_topicdescription().get_name(), - the_reader.get_topicdescription().get_type_name(), - status.total_count, - status.total_count_change - ); - } - - async fn on_liveliness_changed( - &mut self, - the_reader: dust_dds::dds_async::data_reader::DataReaderAsync<()>, - status: dust_dds::infrastructure::status::LivelinessChangedStatus, - ) { - println!( - "on_liveliness_changed() topic: '{}' type: '{}' : (alive = {}, not_alive = {})", - the_reader.get_topicdescription().get_name(), - the_reader.get_topicdescription().get_type_name(), - status.alive_count, - status.not_alive_count, - ); - } -} - -fn move_shape( - shape: &mut ShapeType, - x_vel: &mut i32, - y_vel: &mut i32, - da_width: i32, - da_height: i32, -) { - shape.x = shape.x + *x_vel; - shape.y = shape.y + *y_vel; - if shape.x < 0 { - shape.x = 0; - *x_vel = -*x_vel; - } - if shape.x > da_width { - shape.x = da_width; - *x_vel = -*x_vel; - } - if shape.y < 0 { - shape.y = 0; - *y_vel = -*y_vel; - } - if shape.y > da_height { - shape.y = da_height; - *y_vel = -*y_vel; - } -} - -fn init_publisher( - participant: &DomainParticipant, - options: Options, -) -> Result, InitializeError> { - let topic = participant - .lookup_topicdescription(&options.topic_name) - .expect("lookup_topicdescription succeeds") - .expect("topic existes"); - let publisher_qos = QosKind::Specific(PublisherQos { - partition: options.partition_qos_policy(), - ..Default::default() - }); - let publisher = participant.create_publisher(publisher_qos, NO_LISTENER, NO_STATUS)?; - println!( - "Create writer for topic: {} color: {}", - options.topic_name, - options.interpret_color() - ); - - let mut data_writer_qos = DataWriterQos { - durability: options.durability_qos_policy(), - reliability: options.reliability_qos_policy(), - representation: options.data_representation_qos_policy(), - ownership: options.ownership_qos_policy(), - history: options.history_depth_qos_policy(), - ..Default::default() - }; - if options.deadline_interval > 0 { - data_writer_qos.deadline.period = DurationKind::Finite( - core::time::Duration::from_millis(options.deadline_interval).into(), - ); - } - if options.ownership_qos_policy().kind == OwnershipQosPolicyKind::Exclusive { - data_writer_qos.ownership_strength = options.ownership_strength_qos_policy(); - } - - let data_writer = publisher.create_datawriter::( - &topic, - QosKind::Specific(data_writer_qos), - NO_LISTENER, - NO_STATUS, - )?; - - Ok(data_writer) -} - -fn run_publisher( - data_writer: &DataWriter, - options: Options, - all_done: Receiver<()>, -) -> Result<(), RunningError> { - let mut random_gen = thread_rng(); - - let da_width = 240; - let da_height = 270; - let mut shape = ShapeType { - color: options.interpret_color(), - x: random::() % da_width, - y: random::() % da_height, - shapesize: options.shapesize, - additional_payload_size: vec![], - }; - - // get random non-zero velocity. - let mut x_vel = if random() { - random_gen.gen_range(1..5) - } else { - random_gen.gen_range(-5..-1) - }; - let mut y_vel = if random() { - random_gen.gen_range(1..5) - } else { - random_gen.gen_range(-5..-1) - }; - - while all_done.try_recv().is_err() { - if options.shapesize == 0 { - if let Some(size_modulo) = options.size_modulo { - // Size cannot be 0, so increase it after modulo operation - shape.shapesize = (shape.shapesize % size_modulo) + 1; - } else { - shape.shapesize += 1; - } - } - - move_shape(&mut shape, &mut x_vel, &mut y_vel, da_width, da_height); - if options.print_writer_samples { - println!( - "{:10} {:10} {:03} {:03} [{:}]", - options.topic_name.as_str(), - shape.color, - shape.x, - shape.y, - shape.shapesize - ); - } - data_writer.write(shape.clone(), None).ok(); - std::thread::sleep(std::time::Duration::from_millis( - options.write_period_ms as u64, - )); - } - Ok(()) -} - -fn init_subscriber( - participant: &DomainParticipant, - options: Options, -) -> Result, InitializeError> { - let topic = participant - .lookup_topicdescription(&options.topic_name) - .expect("lookup_topicdescription succeeds") - .expect("topic existes"); - let subscriber_qos = QosKind::Specific(SubscriberQos { - partition: options.partition_qos_policy(), - ..Default::default() - }); - let subscriber = participant.create_subscriber(subscriber_qos, NO_LISTENER, NO_STATUS)?; - - let mut data_reader_qos = DataReaderQos { - durability: options.durability_qos_policy(), - reliability: options.reliability_qos_policy(), - representation: options.data_representation_qos_policy(), - ownership: options.ownership_qos_policy(), - history: options.history_depth_qos_policy(), - ..Default::default() - }; - if options.deadline_interval > 0 { - data_reader_qos.deadline.period = DurationKind::Finite( - core::time::Duration::from_millis(options.deadline_interval).into(), - ); - } - - let data_reader = match options.cft_expression { - Some(cft_expression) => { - let filtered_topic_name = options.topic_name + "_filtered"; - println!("ContentFilterTopic = \"{cft_expression}\""); - println!("Create reader for topic: {} ", filtered_topic_name); - let color = cft_expression - .split("=") - .nth(1) - .unwrap() - .trim_matches(&[' ', '\'']) - .to_string(); - let content_filtered_topic = participant.create_contentfilteredtopic( - &filtered_topic_name, - &topic, - cft_expression, - vec![color], - )?; - - subscriber.create_datareader::( - &content_filtered_topic, - QosKind::Specific(data_reader_qos), - NO_LISTENER, - NO_STATUS, - )? - } - None => { - println!("Create reader for topic: {} ", options.topic_name); - subscriber.create_datareader::( - &topic, - QosKind::Specific(data_reader_qos), - NO_LISTENER, - NO_STATUS, - )? - } - }; - - Ok(data_reader) -} - -fn run_subscriber( - data_reader: &DataReader, - options: Options, - all_done: Receiver<()>, -) -> Result<(), RunningError> { - while all_done.try_recv().is_err() { - let mut previous_handle = None; - loop { - let max_samples = i32::MAX; - let read_result = if options.use_read { - data_reader.read_next_instance( - max_samples, - previous_handle, - ANY_SAMPLE_STATE, - ANY_VIEW_STATE, - ANY_INSTANCE_STATE, - ) - } else { - data_reader.take_next_instance( - max_samples, - previous_handle, - ANY_SAMPLE_STATE, - ANY_VIEW_STATE, - ANY_INSTANCE_STATE, - ) - }; - match read_result { - Ok(samples) => { - for sample in samples { - if sample.sample_info.valid_data { - let smaple_data = sample.data.as_ref().expect("data present"); - println!( - "{:10} {:10} {:03} {:03} [{}]", - data_reader.get_topicdescription().get_name(), - smaple_data.color, - smaple_data.x, - smaple_data.y, - smaple_data.shapesize - ); - std::io::stdout().flush().expect("flush stdout succeeds"); - } - previous_handle = Some(sample.sample_info.instance_handle); - } - - std::thread::sleep(std::time::Duration::from_millis( - options.read_period_ms as u64, - )); - } - Err(_) => break, - } - } - } - Ok(()) -} - -fn initialize(options: &Options) -> Result { - let participant_factory = DomainParticipantFactory::get_instance(); - let participant = participant_factory.create_participant( - options.domain_id, - QosKind::Default, - Some(Listener), - &[ - StatusKind::InconsistentTopic, - StatusKind::OfferedIncompatibleQos, - StatusKind::PublicationMatched, - StatusKind::OfferedDeadlineMissed, - StatusKind::LivelinessLost, - StatusKind::RequestedIncompatibleQos, - StatusKind::SubscriptionMatched, - StatusKind::RequestedDeadlineMissed, - StatusKind::LivelinessChanged, - ], - )?; - println!("Create topic: {}", options.topic_name); - let _topic = participant.create_topic::( - &options.topic_name, - "ShapeType", - QosKind::Default, - NO_LISTENER, - NO_STATUS, - )?; - - Ok(participant) -} - -struct ParsingError(String); -struct InitializeError(String); -struct RunningError(String); - -impl From for InitializeError { - fn from(value: DdsError) -> Self { - Self(format!("DdsError: {:?}", value)) - } -} -impl From for RunningError { - fn from(value: DdsError) -> Self { - Self(format!("DdsError: {:?}", value)) - } -} - -struct Return { - code: u8, - describtion: String, -} -impl Debug for Return { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_fmt(format_args!("code {}: {}", self.code, self.describtion)) - } -} - -impl Termination for Return { - fn report(self) -> ExitCode { - self.code.into() - } -} - -impl From for Return { - fn from(value: ParsingError) -> Self { - Self { - code: 1, - describtion: value.0, - } - } -} - -impl From for Return { - fn from(value: InitializeError) -> Self { - Self { - code: 2, - describtion: value.0, - } - } -} - -impl From for Return { - fn from(value: RunningError) -> Self { - Self { - code: 3, - describtion: value.0, - } - } -} - -fn main() -> Result<(), Return> { - let (tx, rx) = std::sync::mpsc::channel(); - - ctrlc::set_handler(move || tx.send(()).expect("Could not send signal on channel.")) - .expect("Error setting Ctrl-C handler"); - - let options = Options::parse(); - options.validate()?; - let participant = initialize(&options)?; - if options.publish { - let data_writer = init_publisher(&participant, options.clone())?; - run_publisher(&data_writer, options.clone(), rx)?; - } else { - let data_reader = init_subscriber(&participant, options.clone())?; - run_subscriber(&data_reader, options.clone(), rx)?; - } - participant - .delete_contained_entities() - .expect("Entites beeing deleted"); - println!("Done."); - Ok(()) -} +use clap::{Parser, ValueEnum}; +use ctrlc; +use dust_dds::{ + dds_async::topic::TopicAsync, + domain::{ + domain_participant::DomainParticipant, + domain_participant_factory::DomainParticipantFactory, + domain_participant_listener::DomainParticipantListener, + }, + infrastructure::{ + error::DdsError, + qos::{DataReaderQos, DataWriterQos, PublisherQos, QosKind, SubscriberQos}, + qos_policy::{ + self, DataRepresentationQosPolicy, DurabilityQosPolicy, HistoryQosPolicy, + HistoryQosPolicyKind, OwnershipQosPolicy, OwnershipQosPolicyKind, + OwnershipStrengthQosPolicy, PartitionQosPolicy, ReliabilityQosPolicy, + XCDR_DATA_REPRESENTATION, XCDR2_DATA_REPRESENTATION, + }, + sample_info::{ANY_INSTANCE_STATE, ANY_SAMPLE_STATE, ANY_VIEW_STATE}, + status::{InconsistentTopicStatus, NO_STATUS, StatusKind}, + time::DurationKind, + }, + listener::NO_LISTENER, + publication::data_writer::DataWriter, + subscription::data_reader::DataReader, +}; +use rand::{Rng, random, thread_rng}; +use std::{ + fmt::Debug, + io::Write, + process::{ExitCode, Termination}, + sync::mpsc::Receiver, +}; + +include!(concat!(env!("OUT_DIR"), "/idl/shape.rs")); +impl Clone for ShapeType { + fn clone(&self) -> Self { + Self { + color: self.color.clone(), + x: self.x, + y: self.y, + shapesize: self.shapesize, + additional_payload_size: self.additional_payload_size.clone(), + } + } +} + +fn qos_policy_name(id: i32) -> String { + match id { + qos_policy::DATA_REPRESENTATION_QOS_POLICY_ID => "DATAREPRESENTATION", + qos_policy::DEADLINE_QOS_POLICY_ID => "DEADLINE", + qos_policy::DESTINATIONORDER_QOS_POLICY_ID => "DESTINATIONORDER", + qos_policy::DURABILITY_QOS_POLICY_ID => "DURABILITY", + qos_policy::DURABILITYSERVICE_QOS_POLICY_ID => "DURABILITYSERVICE", + qos_policy::ENTITYFACTORY_QOS_POLICY_ID => "ENTITYFACTORY", + qos_policy::GROUPDATA_QOS_POLICY_ID => "GROUPDATA", + qos_policy::HISTORY_QOS_POLICY_ID => "HISTORY", + qos_policy::LATENCYBUDGET_QOS_POLICY_ID => "LATENCYBUDGET", + qos_policy::LIFESPAN_QOS_POLICY_ID => "LIFESPAN", + qos_policy::LIVELINESS_QOS_POLICY_ID => "LIVELINESS", + qos_policy::OWNERSHIP_QOS_POLICY_ID => "OWNERSHIP", + qos_policy::PARTITION_QOS_POLICY_ID => "PARTITION", + qos_policy::PRESENTATION_QOS_POLICY_ID => "PRESENTATION", + qos_policy::READERDATALIFECYCLE_QOS_POLICY_ID => "READERDATALIFECYCLE", + qos_policy::RELIABILITY_QOS_POLICY_ID => "RELIABILITY", + qos_policy::RESOURCELIMITS_QOS_POLICY_ID => "RESOURCELIMITS", + qos_policy::TIMEBASEDFILTER_QOS_POLICY_ID => "TIMEBASEDFILTER", + qos_policy::TOPICDATA_QOS_POLICY_ID => "TOPICDATA", + qos_policy::TRANSPORTPRIORITY_QOS_POLICY_ID => "TRANSPORTPRIORITY", + qos_policy::USERDATA_QOS_POLICY_ID => "USERDATA", + qos_policy::WRITERDATALIFECYCLE_QOS_POLICY_ID => "WRITERDATALIFECYCLE", + _ => "UNKNOWN", + } + .to_string() +} + +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] +#[clap(rename_all = "kebab_case")] +enum FinalInstanceState { + /// unregister + U, + /// dispose + D, +} + +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] +#[clap(rename_all = "kebab_case")] +enum AccessScope { + /// INSTANCE + I, + /// TOPIC + T, + /// GROUP + G, +} + +#[derive(Parser, Clone)] +#[command(author, version, about, long_about = None)] +struct Options { + /// publish samples + #[clap(short = 'P', default_value_t = false)] + publish: bool, + + /// subscribe samples + #[clap(short = 'S', default_value_t = false)] + subscribe: bool, + + /// domain id + #[clap(short = 'd', default_value_t = 0)] + domain_id: i32, + + /// BEST_EFFORT reliability + #[clap(short = 'b', default_value_t = false)] + best_effort_reliability: bool, + + /// RELIABLE reliability + #[clap(short = 'r', default_value_t = false)] + reliable_reliability: bool, + + /// keep history depth (-1: use default, 0: KEEP_ALL) + #[clap(short = 'k', default_value_t = -1, allow_negative_numbers = true)] + history_depth: i32, + + /// set a 'deadline' with interval (ms) (0: OFF) + #[clap(short = 'f', default_value_t = 0)] + deadline_interval: u64, + + /// set ownership strength (-1: SHARED) + #[clap(short = 's', default_value_t = -1, allow_negative_numbers = true)] + ownership_strength: i32, + + /// set the topic name + #[clap(short = 't', default_value = "Square")] + topic_name: String, + + /// set color to publish (filter if subscriber) + #[clap(short = 'c', default_value = None)] + color: Option, + + /// set a 'partition' string + #[clap(short = 'p')] + partition: Option, + + /// set durability (v: VOLATILE, l: TRANSIENT_LOCAL, t: TRANSIENT, p: PERSISTENT) + #[clap(short = 'D', default_value_t = 'v')] + durability_kind: char, + + /// set data representation (1: XCDR, 2: XCDR2) + #[clap(short = 'x', default_value_t = 1)] + data_representation: u16, + + /// print Publisher's samples + #[clap(short = 'w', default_value_t = false)] + print_writer_samples: bool, + + /// set shapesize (0: increase the size for every sample) + #[clap(short = 'z', default_value_t = 20)] + shapesize: i32, + + /// use 'read()' instead of 'take()' + #[clap(short = 'R', default_value_t = false)] + use_read: bool, + + /// waiting period between 'write()' operations in ms. + #[clap(long = "write-period", default_value_t = 33)] + write_period_ms: i32, + + /// waiting period between 'read()' or 'take()' operations in ms. + #[clap(long = "read-period", default_value_t = 100)] + read_period_ms: i32, + + /// set log message verbosity (e: ERROR, d: DEBUG) + #[clap(short = 'v', default_value_t = 'e')] + log_message_verbosity: char, + + /// apply 'time based filter' with interval in ms [0: OFF] + #[clap(short = 'i', long = "time-filter")] + time_filter: Option, + + /// indicates the lifespan of a sample in ms + #[clap(short = 'l', long = "lifespan")] + lifespan: Option, + + /// indicates the number of iterations of the main loop + /// After that, the application will exit. Default (0): infinite + #[clap(short = 'n', long = "num-iterations")] + num_iterations: Option, + + /// indicates the number of instances a DataWriter writes If the value is > 1, the additional instances are + /// created by appending a number. For example, if the original color is "BLUE" the instances used are + /// "BLUE", "BLUE1", "BLUE2"... + #[clap(short = 'I', long = "num-instances")] + num_instances: Option, + + /// indicates the number of topics created (using the same type). This also creates a DataReader or DataWriter per + /// topic. If the value is > 1, the additional topic names are created by appending a number: For example, if the + /// original topic name is "Square", the topics created are "Square", "Square1", "Square2"... + #[clap(short = 'E', long = "num-topics")] + num_topics: Option, + + /// indicates the action performed after the DataWriter finishes its execution (before deleting it): + #[clap(short = 'M', long = "final-instance-state")] + final_instance_state: Option, + + /// sets Presentation.access_scope + #[clap(short = 'C', long = "access-scope")] + access_scope: Option, + + /// sets Presentation.coherent_access = true + #[clap(short = 'T', long = "coherent")] + coherent: bool, + + /// sets Presentation.ordered_access = true + #[clap(short = 'O', long = "ordered")] + ordered: bool, + + /// amount of samples sent for each DataWriter and instance that are grouped in a coherent set + #[clap(short = 'H', long = "coherent-sample-count")] + coherent_sample_count: Option, + + /// indicates the amount of bytes added to the samples written (for example to use large data) + #[clap(short = 'B', long = "additional-payload-size")] + additional_payload_size: Option, + + /// uses take()/read() instead of take_next_instance() read_next_instance() + #[clap(short = 'K', long = "take-read")] + take_read: bool, + + /// ContentFilteredTopic filter expression (quotes required around the expression). Cannot be used with -c on + /// subscriber applications + #[clap(short = 'F', long = "cft")] + cft_expression: Option, + + /// If set, the modulo operation is applied to the shapesize. This will make that shapesize is in the range [1,N]. + /// This only applies if shapesize is increased (-z 0) + #[clap(short = 'Q', long = "size-modulo")] + size_modulo: Option, +} + +impl Options { + fn validate(&self) -> Result<(), ParsingError> { + if self.subscribe && self.publish || (!self.subscribe && !self.publish) { + return Err(ParsingError( + "must be either subscribe or publish".to_string(), + )); + } + + Ok(()) + } + + fn interpret_color(&self) -> String { + match self.color.clone() { + Some(color) => color, + None => { + let default_color = "BLUE".to_string(); + println!( + "warning: color was not specified, defaulting to \"{}\"", + default_color + ); + default_color + } + } + } + + fn reliability_qos_policy(&self) -> ReliabilityQosPolicy { + let mut reliability = DataWriterQos::default().reliability; + if self.best_effort_reliability { + reliability.kind = qos_policy::ReliabilityQosPolicyKind::BestEffort; + } + if self.reliable_reliability { + reliability.kind = qos_policy::ReliabilityQosPolicyKind::Reliable; + } + reliability + } + + fn partition_qos_policy(&self) -> PartitionQosPolicy { + if let Some(partition) = &self.partition { + PartitionQosPolicy { + name: vec![partition.to_owned()], + } + } else { + PartitionQosPolicy::default() + } + } + + fn durability_qos_policy(&self) -> DurabilityQosPolicy { + DurabilityQosPolicy { + kind: match self.durability_kind { + 'v' => qos_policy::DurabilityQosPolicyKind::Volatile, + 'l' => qos_policy::DurabilityQosPolicyKind::TransientLocal, + 't' => qos_policy::DurabilityQosPolicyKind::Transient, + 'p' => qos_policy::DurabilityQosPolicyKind::Persistent, + _ => panic!("durability not valid"), + }, + } + } + + fn data_representation_qos_policy(&self) -> DataRepresentationQosPolicy { + let data_representation = match self.data_representation { + 1 => XCDR_DATA_REPRESENTATION, + 2 => XCDR2_DATA_REPRESENTATION, + _ => panic!("Wrong data representation"), + }; + qos_policy::DataRepresentationQosPolicy { + value: vec![data_representation], + } + } + + fn ownership_qos_policy(&self) -> OwnershipQosPolicy { + OwnershipQosPolicy { + kind: match self.ownership_strength { + -1 => qos_policy::OwnershipQosPolicyKind::Shared, + _ => qos_policy::OwnershipQosPolicyKind::Exclusive, + }, + } + } + + fn history_depth_qos_policy(&self) -> HistoryQosPolicy { + match self.history_depth { + -1 => HistoryQosPolicy::default(), + 0 => HistoryQosPolicy { + kind: HistoryQosPolicyKind::KeepAll, + }, + x if x >= 1 => HistoryQosPolicy { + kind: HistoryQosPolicyKind::KeepLast(x as u32), + }, + _ => panic!("history_depth not valid"), + } + } + + fn ownership_strength_qos_policy(&self) -> OwnershipStrengthQosPolicy { + if self.ownership_strength < -1 { + panic!("Ownership strength must be positive or zero") + } + OwnershipStrengthQosPolicy { + value: self.ownership_strength, + } + } +} + +struct Listener; +impl DomainParticipantListener for Listener { + async fn on_inconsistent_topic( + &mut self, + the_topic: TopicAsync, + _status: InconsistentTopicStatus, + ) { + println!( + "on_inconsistent_topic() topic: '{}' type: '{}'", + the_topic.get_name(), + the_topic.get_type_name(), + ); + } + + async fn on_offered_incompatible_qos( + &mut self, + the_writer: dust_dds::dds_async::data_writer::DataWriterAsync<()>, + status: dust_dds::infrastructure::status::OfferedIncompatibleQosStatus, + ) { + let policy_name = qos_policy_name(status.last_policy_id); + println!( + "on_offered_incompatible_qos() topic: '{}' type: '{}' : {:?} ({})", + the_writer.get_topic().get_name(), + the_writer.get_topic().get_type_name(), + status.last_policy_id, + policy_name + ); + } + + async fn on_publication_matched( + &mut self, + the_writer: dust_dds::dds_async::data_writer::DataWriterAsync<()>, + status: dust_dds::infrastructure::status::PublicationMatchedStatus, + ) { + if !the_writer.get_topic().get_name().starts_with("DCPS") { + println!( + "on_publication_matched() topic: '{}' type: '{}' : matched readers {} (change = {})", + the_writer.get_topic().get_name(), + the_writer.get_topic().get_type_name(), + status.current_count, + status.current_count_change + ); + } + } + + async fn on_offered_deadline_missed( + &mut self, + the_writer: dust_dds::dds_async::data_writer::DataWriterAsync<()>, + status: dust_dds::infrastructure::status::OfferedDeadlineMissedStatus, + ) { + println!( + "on_offered_deadline_missed() topic: '{}' type: '{}' : (total = {}, change = {})", + the_writer.get_topic().get_name(), + the_writer.get_topic().get_type_name(), + status.total_count, + status.total_count_change + ); + } + + async fn on_liveliness_lost( + &mut self, + the_writer: dust_dds::dds_async::data_writer::DataWriterAsync<()>, + status: dust_dds::infrastructure::status::LivelinessLostStatus, + ) { + println!( + "on_liveliness_lost() topic: '{}' type: '{}' : (total = {}, change = {})", + the_writer.get_topic().get_name(), + the_writer.get_topic().get_type_name(), + status.total_count, + status.total_count_change + ); + } + + async fn on_requested_incompatible_qos( + &mut self, + the_reader: dust_dds::dds_async::data_reader::DataReaderAsync<()>, + status: dust_dds::infrastructure::status::RequestedIncompatibleQosStatus, + ) { + let policy_name = qos_policy_name(status.last_policy_id); + println!( + "on_requested_incompatible_qos() topic: '{}' type: '{}' : {} ({})\n", + the_reader.get_topicdescription().get_name(), + the_reader.get_topicdescription().get_type_name(), + status.last_policy_id, + policy_name + ); + } + + async fn on_subscription_matched( + &mut self, + the_reader: dust_dds::dds_async::data_reader::DataReaderAsync<()>, + status: dust_dds::infrastructure::status::SubscriptionMatchedStatus, + ) { + if !the_reader + .get_topicdescription() + .get_name() + .starts_with("DCPS") + { + println!( + "on_subscription_matched() topic: '{}' type: '{}' : matched writers {} (change = {})", + the_reader.get_topicdescription().get_name(), + the_reader.get_topicdescription().get_type_name(), + status.current_count, + status.current_count_change + ); + } + } + + async fn on_requested_deadline_missed( + &mut self, + the_reader: dust_dds::dds_async::data_reader::DataReaderAsync<()>, + status: dust_dds::infrastructure::status::RequestedDeadlineMissedStatus, + ) { + println!( + "on_requested_deadline_missed() topic: '{}' type: '{}' : (total = {}, change = {})\n", + the_reader.get_topicdescription().get_name(), + the_reader.get_topicdescription().get_type_name(), + status.total_count, + status.total_count_change + ); + } + + async fn on_liveliness_changed( + &mut self, + the_reader: dust_dds::dds_async::data_reader::DataReaderAsync<()>, + status: dust_dds::infrastructure::status::LivelinessChangedStatus, + ) { + println!( + "on_liveliness_changed() topic: '{}' type: '{}' : (alive = {}, not_alive = {})", + the_reader.get_topicdescription().get_name(), + the_reader.get_topicdescription().get_type_name(), + status.alive_count, + status.not_alive_count, + ); + } +} + +fn move_shape( + shape: &mut ShapeType, + x_vel: &mut i32, + y_vel: &mut i32, + da_width: i32, + da_height: i32, +) { + shape.x = shape.x + *x_vel; + shape.y = shape.y + *y_vel; + if shape.x < 0 { + shape.x = 0; + *x_vel = -*x_vel; + } + if shape.x > da_width { + shape.x = da_width; + *x_vel = -*x_vel; + } + if shape.y < 0 { + shape.y = 0; + *y_vel = -*y_vel; + } + if shape.y > da_height { + shape.y = da_height; + *y_vel = -*y_vel; + } +} + +fn init_publisher( + participant: &DomainParticipant, + options: Options, +) -> Result, InitializeError> { + let topic = participant + .lookup_topicdescription(&options.topic_name) + .expect("lookup_topicdescription succeeds") + .expect("topic existes"); + let publisher_qos = QosKind::Specific(PublisherQos { + partition: options.partition_qos_policy(), + ..Default::default() + }); + let publisher = participant.create_publisher(publisher_qos, NO_LISTENER, NO_STATUS)?; + println!( + "Create writer for topic: {} color: {}", + options.topic_name, + options.interpret_color() + ); + + let mut data_writer_qos = DataWriterQos { + durability: options.durability_qos_policy(), + reliability: options.reliability_qos_policy(), + representation: options.data_representation_qos_policy(), + ownership: options.ownership_qos_policy(), + history: options.history_depth_qos_policy(), + ..Default::default() + }; + if options.deadline_interval > 0 { + data_writer_qos.deadline.period = DurationKind::Finite( + core::time::Duration::from_millis(options.deadline_interval).into(), + ); + } + if options.ownership_qos_policy().kind == OwnershipQosPolicyKind::Exclusive { + data_writer_qos.ownership_strength = options.ownership_strength_qos_policy(); + } + + let data_writer = publisher.create_datawriter::( + &topic, + QosKind::Specific(data_writer_qos), + NO_LISTENER, + NO_STATUS, + )?; + + Ok(data_writer) +} + +fn run_publisher( + data_writer: &DataWriter, + options: Options, + all_done: Receiver<()>, +) -> Result<(), RunningError> { + let mut random_gen = thread_rng(); + + let da_width = 240; + let da_height = 270; + let mut shape = ShapeType { + color: options.interpret_color(), + x: random::() % da_width, + y: random::() % da_height, + shapesize: options.shapesize, + additional_payload_size: vec![], + }; + + // get random non-zero velocity. + let mut x_vel = if random() { + random_gen.gen_range(1..5) + } else { + random_gen.gen_range(-5..-1) + }; + let mut y_vel = if random() { + random_gen.gen_range(1..5) + } else { + random_gen.gen_range(-5..-1) + }; + + while all_done.try_recv().is_err() { + if options.shapesize == 0 { + if let Some(size_modulo) = options.size_modulo { + // Size cannot be 0, so increase it after modulo operation + shape.shapesize = (shape.shapesize % size_modulo) + 1; + } else { + shape.shapesize += 1; + } + } + + move_shape(&mut shape, &mut x_vel, &mut y_vel, da_width, da_height); + if options.print_writer_samples { + println!( + "{:10} {:10} {:03} {:03} [{:}]", + options.topic_name.as_str(), + shape.color, + shape.x, + shape.y, + shape.shapesize + ); + } + data_writer.write(shape.clone(), None).ok(); + std::thread::sleep(std::time::Duration::from_millis( + options.write_period_ms as u64, + )); + } + Ok(()) +} + +fn init_subscriber( + participant: &DomainParticipant, + options: Options, +) -> Result, InitializeError> { + let topic = participant + .lookup_topicdescription(&options.topic_name) + .expect("lookup_topicdescription succeeds") + .expect("topic existes"); + let subscriber_qos = QosKind::Specific(SubscriberQos { + partition: options.partition_qos_policy(), + ..Default::default() + }); + let subscriber = participant.create_subscriber(subscriber_qos, NO_LISTENER, NO_STATUS)?; + + let mut data_reader_qos = DataReaderQos { + durability: options.durability_qos_policy(), + reliability: options.reliability_qos_policy(), + representation: options.data_representation_qos_policy(), + ownership: options.ownership_qos_policy(), + history: options.history_depth_qos_policy(), + ..Default::default() + }; + if options.deadline_interval > 0 { + data_reader_qos.deadline.period = DurationKind::Finite( + core::time::Duration::from_millis(options.deadline_interval).into(), + ); + } + + let data_reader = match options.cft_expression { + Some(cft_expression) => { + let filtered_topic_name = options.topic_name + "_filtered"; + println!("ContentFilterTopic = \"{cft_expression}\""); + println!("Create reader for topic: {} ", filtered_topic_name); + let color = cft_expression + .split("=") + .nth(1) + .unwrap() + .trim_matches(&[' ', '\'']) + .to_string(); + let content_filtered_topic = participant.create_contentfilteredtopic( + &filtered_topic_name, + &topic, + cft_expression, + vec![color], + )?; + + subscriber.create_datareader::( + &content_filtered_topic, + QosKind::Specific(data_reader_qos), + NO_LISTENER, + NO_STATUS, + )? + } + None => { + println!("Create reader for topic: {} ", options.topic_name); + subscriber.create_datareader::( + &topic, + QosKind::Specific(data_reader_qos), + NO_LISTENER, + NO_STATUS, + )? + } + }; + + Ok(data_reader) +} + +fn run_subscriber( + data_reader: &DataReader, + options: Options, + all_done: Receiver<()>, +) -> Result<(), RunningError> { + while all_done.try_recv().is_err() { + let mut previous_handle = None; + loop { + let max_samples = i32::MAX; + let read_result = if options.use_read { + data_reader.read_next_instance( + max_samples, + previous_handle, + ANY_SAMPLE_STATE, + ANY_VIEW_STATE, + ANY_INSTANCE_STATE, + ) + } else { + data_reader.take_next_instance( + max_samples, + previous_handle, + ANY_SAMPLE_STATE, + ANY_VIEW_STATE, + ANY_INSTANCE_STATE, + ) + }; + match read_result { + Ok(samples) => { + for sample in samples { + if sample.sample_info.valid_data { + let smaple_data = sample.data.as_ref().expect("data present"); + println!( + "{:10} {:10} {:03} {:03} [{}]", + data_reader.get_topicdescription().get_name(), + smaple_data.color, + smaple_data.x, + smaple_data.y, + smaple_data.shapesize + ); + std::io::stdout().flush().expect("flush stdout succeeds"); + } + previous_handle = Some(sample.sample_info.instance_handle); + } + + std::thread::sleep(std::time::Duration::from_millis( + options.read_period_ms as u64, + )); + } + Err(_) => break, + } + } + } + Ok(()) +} + +fn initialize(options: &Options) -> Result { + let participant_factory = DomainParticipantFactory::get_instance(); + let participant = participant_factory.create_participant( + options.domain_id, + QosKind::Default, + Some(Listener), + &[ + StatusKind::InconsistentTopic, + StatusKind::OfferedIncompatibleQos, + StatusKind::PublicationMatched, + StatusKind::OfferedDeadlineMissed, + StatusKind::LivelinessLost, + StatusKind::RequestedIncompatibleQos, + StatusKind::SubscriptionMatched, + StatusKind::RequestedDeadlineMissed, + StatusKind::LivelinessChanged, + ], + )?; + println!("Create topic: {}", options.topic_name); + let _topic = participant.create_topic::( + &options.topic_name, + "ShapeType", + QosKind::Default, + NO_LISTENER, + NO_STATUS, + )?; + + Ok(participant) +} + +struct ParsingError(String); +struct InitializeError(String); +struct RunningError(String); + +impl From for InitializeError { + fn from(value: DdsError) -> Self { + Self(format!("DdsError: {:?}", value)) + } +} +impl From for RunningError { + fn from(value: DdsError) -> Self { + Self(format!("DdsError: {:?}", value)) + } +} + +struct Return { + code: u8, + describtion: String, +} +impl Debug for Return { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("code {}: {}", self.code, self.describtion)) + } +} + +impl Termination for Return { + fn report(self) -> ExitCode { + self.code.into() + } +} + +impl From for Return { + fn from(value: ParsingError) -> Self { + Self { + code: 1, + describtion: value.0, + } + } +} + +impl From for Return { + fn from(value: InitializeError) -> Self { + Self { + code: 2, + describtion: value.0, + } + } +} + +impl From for Return { + fn from(value: RunningError) -> Self { + Self { + code: 3, + describtion: value.0, + } + } +} + +fn main() -> Result<(), Return> { + let (tx, rx) = std::sync::mpsc::channel(); + + ctrlc::set_handler(move || tx.send(()).expect("Could not send signal on channel.")) + .expect("Error setting Ctrl-C handler"); + + let options = Options::parse(); + options.validate()?; + let participant = initialize(&options)?; + if options.publish { + let data_writer = init_publisher(&participant, options.clone())?; + run_publisher(&data_writer, options.clone(), rx)?; + } else { + let data_reader = init_subscriber(&participant, options.clone())?; + run_subscriber(&data_reader, options.clone(), rx)?; + } + participant + .delete_contained_entities() + .expect("Entites beeing deleted"); + println!("Done."); + Ok(()) +} From 1e00246a44ec11f9717437395bb4d19f4c875149 Mon Sep 17 00:00:00 2001 From: Stefan Kimmer Date: Wed, 18 Mar 2026 18:53:57 +0100 Subject: [PATCH 4/5] fix version --- srcRs/DustDDS/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srcRs/DustDDS/Cargo.toml b/srcRs/DustDDS/Cargo.toml index c7b28466..2189620f 100644 --- a/srcRs/DustDDS/Cargo.toml +++ b/srcRs/DustDDS/Cargo.toml @@ -17,7 +17,7 @@ publish = false clap = { version = "4.5.47", features = ["derive", "string"] } rand = "0.8.5" ctrlc = "3.4" -dust_dds = version="0.15.0" +dust_dds = { version = "0.15.0" } [build-dependencies] dust_dds_gen = "0.15.0" From 102aafcd98c599e4542fa7011a12510bb6406456 Mon Sep 17 00:00:00 2001 From: Stefan Kimmer Date: Thu, 19 Mar 2026 14:58:22 +0100 Subject: [PATCH 5/5] add preRelease: true --- .github/workflows/1_run_interoperability_tests.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/1_run_interoperability_tests.yml b/.github/workflows/1_run_interoperability_tests.yml index 720883cf..0e8ed2d6 100644 --- a/.github/workflows/1_run_interoperability_tests.yml +++ b/.github/workflows/1_run_interoperability_tests.yml @@ -45,6 +45,7 @@ jobs: latest: true fileName: "*" out-file-path: zipped_executables + preRelease: true - name: Unzip executables run: unzip 'zipped_executables/*.zip' -d executables - name: Install Python requirements