Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
323 changes: 218 additions & 105 deletions crates/countersyncd/src/actor/ipfix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,15 @@ impl IpfixActor {
self.applied_templates_map.insert(msg_key, template_ids);
}

fn get_template_key(&self, template_id: u16) -> Option<&String> {
self.temporary_templates_map.get(&template_id).or_else(|| {
self.applied_templates_map
.iter()
.find(|(_, template_ids)| template_ids.contains(&template_id))
.map(|(msg_key, _)| msg_key)
})
}

/// Processes IPFIX template messages and stores them for later use.
///
/// # Arguments
Expand Down Expand Up @@ -621,10 +630,14 @@ impl IpfixActor {
}
}

// Store object names if provided
// Update object name mapping for the session key.
// A missing object_names field means the latest template update no longer
// provides object name mapping, so any stale value must be cleared.
if let Some(object_names) = &templates.object_names {
self.object_names_map
.insert(templates.key.clone(), object_names.clone());
} else {
self.object_names_map.remove(&templates.key);
}

let cache_ref = Self::get_cache();
Expand Down Expand Up @@ -777,126 +790,115 @@ impl IpfixActor {
read_size += len as usize;
continue;
}
let datarecords: Vec<&DataRecord> = data_message.iter_data_records().collect();
let mut observation_time: Option<u64>;

for record in datarecords {
observation_time = get_observation_time(record);
if observation_time.is_none() {
debug!(
"No observation time in record, use the last observer time {:?}",
cache.last_observer_time
);
observation_time = cache.last_observer_time;
} else if let (Some(obs_time), Some(last_time)) =
(observation_time, cache.last_observer_time)
{
if obs_time > last_time {
for set in &data_message.sets {
let (template_id, datarecords) = match &set.records {
ipfixrw::parser::Records::Data { set_id, data } => (*set_id, data),
_ => continue,
};

let object_names = self
.get_template_key(template_id)
.and_then(|key| self.object_names_map.get(key))
.map(|names| names.as_slice())
.unwrap_or(&[]);

let mut observation_time: Option<u64>;

for record in datarecords {
observation_time = get_observation_time(record);
if observation_time.is_none() {
debug!(
"No observation time in record, use the last observer time {:?}",
cache.last_observer_time
);
observation_time = cache.last_observer_time;
} else if let (Some(obs_time), Some(last_time)) =
(observation_time, cache.last_observer_time)
{
if obs_time > last_time {
cache.last_observer_time = observation_time;
}
} else {
// If we have observation time but no last time, update it
cache.last_observer_time = observation_time;
}
} else {
// If we have observation time but no last time, update it
cache.last_observer_time = observation_time;
}

// If we still don't have observation time, skip this record
if observation_time.is_none() {
warn!("No observation time available for record, skipping");
continue;
}

// Collect final stats directly
let mut final_stats: Vec<SAIStat> = Vec::new();
let mut template_key: Option<String> = None;

// Debug: Log all fields in the record to understand what we're getting
debug!("Processing record with {} fields:", record.values.len());
for (key, val) in record.values.iter() {
match key {
DataRecordKey::Unrecognized(field_spec) => {
debug!(
" Field ID: {}, Enterprise: {:?}, Length: {}, Value: {:?}",
field_spec.information_element_identifier,
field_spec.enterprise_number,
field_spec.field_length,
val
);
}
_ => {
debug!(" Key: {:?}, Value: {:?}", key, val);
}
// If we still don't have observation time, skip this record
if observation_time.is_none() {
warn!("No observation time available for record, skipping");
continue;
}
}

for (key, val) in record.values.iter() {
// Check if this is the observation time field or system time field
let is_time_field = match key {
DataRecordKey::Unrecognized(field_spec) => {
let field_id = field_spec.information_element_identifier;
let is_standard_field = field_spec.enterprise_number.is_none();
// Collect final stats directly
let mut final_stats: Vec<SAIStat> = Vec::new();

(field_id == OBSERVATION_TIME_NANOSECONDS
|| field_id == OBSERVATION_TIME_SECONDS)
&& is_standard_field
}
_ => false,
};

if is_time_field {
if let DataRecordKey::Unrecognized(field_spec) = key {
debug!(
"Skipping time field (ID: {})",
field_spec.information_element_identifier
);
// Debug: Log all fields in the record to understand what we're getting
debug!(
"Processing record for template_id {} with {} fields:",
template_id,
record.values.len()
);
for (key, val) in record.values.iter() {
match key {
DataRecordKey::Unrecognized(field_spec) => {
debug!(
" Field ID: {}, Enterprise: {:?}, Length: {}, Value: {:?}",
field_spec.information_element_identifier,
field_spec.enterprise_number,
field_spec.field_length,
val
);
}
_ => {
debug!(" Key: {:?}, Value: {:?}", key, val);
}
}
continue;
}

match key {
DataRecordKey::Unrecognized(field_spec) => {
// Try to find the template key for this record to get object_names
if template_key.is_none() {
// Look up the template key from the field
// We need to find which template this field belongs to
for (_tid, msg_key) in &self.temporary_templates_map {
// This is a simplification - in reality we'd need to check
// if this specific field belongs to this template
template_key = Some(msg_key.clone());
break;
}
// Also check applied templates
if template_key.is_none() {
for (msg_key, _) in &self.applied_templates_map {
template_key = Some(msg_key.clone());
break;
}
}
for (key, val) in record.values.iter() {
// Check if this is the observation time field or system time field
let is_time_field = match key {
DataRecordKey::Unrecognized(field_spec) => {
let field_id = field_spec.information_element_identifier;
let is_standard_field = field_spec.enterprise_number.is_none();

(field_id == OBSERVATION_TIME_NANOSECONDS
|| field_id == OBSERVATION_TIME_SECONDS)
&& is_standard_field
}
_ => false,
};

if is_time_field {
if let DataRecordKey::Unrecognized(field_spec) = key {
debug!(
"Skipping time field (ID: {})",
field_spec.information_element_identifier
);
}
continue;
}

// Get object names for this template key
let object_names = template_key
.as_ref()
.and_then(|key| self.object_names_map.get(key))
.map(|names| names.as_slice())
.unwrap_or(&[]);

// Create SAIStat directly
let stat = SAIStat::from_ipfix(field_spec, val, object_names);
debug!("Created SAIStat: {:?}", stat);
final_stats.push(stat);
match key {
DataRecordKey::Unrecognized(field_spec) => {
let stat = SAIStat::from_ipfix(field_spec, val, object_names);
debug!("Created SAIStat: {:?}", stat);
final_stats.push(stat);
}
_ => continue,
}
_ => continue,
}
}

let saistats = SAIStatsMessage::new(SAIStats {
observation_time: observation_time
.expect("observation_time should be Some at this point"),
stats: final_stats,
});
let saistats = SAIStatsMessage::new(SAIStats {
observation_time: observation_time
.expect("observation_time should be Some at this point"),
stats: final_stats,
});

messages.push(saistats.clone());
debug!("Record parsed {:?}", saistats);
messages.push(saistats.clone());
debug!("Record parsed {:?}", saistats);
}
}
read_size += len as usize;
debug!(
Expand Down Expand Up @@ -1167,6 +1169,117 @@ mod test {
);
}

#[test]
fn test_object_names_follow_template_id() {
let (_template_sender, template_receiver) = tokio::sync::mpsc::channel(1000);
let (_buffer_sender, buffer_receiver) = tokio::sync::mpsc::channel(1000);
let mut actor = IpfixActor::new(template_receiver, buffer_receiver);

let template_256_bytes: [u8; 44] = [
0x00, 0x0A, 0x00, 0x2C, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
0x00, 0x00, 0x00, 0x02, 0x00, 0x1C, 0x01, 0x00, 0x00, 0x03, 0x01, 0x45, 0x00, 0x08,
0x80, 0x01, 0x00, 0x08, 0x00, 0x01, 0x00, 0x02, 0x80, 0x02, 0x00, 0x08, 0x80, 0x03,
0x80, 0x04,
];

let template_257_bytes: [u8; 44] = [
0x00, 0x0A, 0x00, 0x2C, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
0x00, 0x00, 0x00, 0x02, 0x00, 0x1C, 0x01, 0x01, 0x00, 0x03, 0x01, 0x45, 0x00, 0x08,
0x80, 0x01, 0x00, 0x08, 0x00, 0x01, 0x00, 0x02, 0x80, 0x02, 0x00, 0x08, 0x80, 0x03,
0x80, 0x04,
];

actor.handle_template(IPFixTemplatesMessage::new(
String::from("session_a"),
Arc::new(Vec::from(template_256_bytes)),
Some(vec!["Ethernet0".to_string(), "Ethernet1".to_string()]),
));
actor.handle_template(IPFixTemplatesMessage::new(
String::from("session_b"),
Arc::new(Vec::from(template_257_bytes)),
Some(vec!["Ethernet8".to_string(), "Ethernet12".to_string()]),
));

let valid_records_bytes: [u8; 144] = [
0x00, 0x0A, 0x00, 0x48, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00,
0x00, 0x00, 0x01, 0x00, 0x00, 0x1C, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x01, 0x01, 0x00, 0x00, 0x1C, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x03, 0x00, 0x0A, 0x00, 0x48, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02,
0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x1C, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x04, 0x01, 0x01, 0x00, 0x1C, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x07,
];

let stats = actor.handle_record(Arc::new(Vec::from(valid_records_bytes)));
let stats: Vec<_> = stats
.into_iter()
.map(|msg| Arc::try_unwrap(msg).expect("single-owner test stats"))
.collect();

let session_a_names = ["Ethernet0", "Ethernet1"];
let session_b_names = ["Ethernet8", "Ethernet12"];
let mut saw_session_a = false;
let mut saw_session_b = false;

for msg in &stats {
let names: Vec<&str> = msg.stats.iter().map(|s| s.object_name.as_str()).collect();

let only_session_a = names.iter().all(|name| session_a_names.contains(name));
let only_session_b = names.iter().all(|name| session_b_names.contains(name));

assert!(
only_session_a || only_session_b,
"record mixes object names from multiple templates: {:?}",
names
);

saw_session_a |= only_session_a;
saw_session_b |= only_session_b;
}

assert!(saw_session_a, "did not observe any stats for session A/template 256");
assert!(saw_session_b, "did not observe any stats for session B/template 257");
}

#[test]
fn test_template_update_without_object_names_clears_stale_mapping() {
let (_template_sender, template_receiver) = tokio::sync::mpsc::channel(1000);
let (_buffer_sender, buffer_receiver) = tokio::sync::mpsc::channel(1000);
let mut actor = IpfixActor::new(template_receiver, buffer_receiver);

let template_bytes: [u8; 44] = [
0x00, 0x0A, 0x00, 0x2C, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
0x00, 0x00, 0x00, 0x02, 0x00, 0x1C, 0x01, 0x00, 0x00, 0x03, 0x01, 0x45, 0x00, 0x08,
0x80, 0x01, 0x00, 0x08, 0x00, 0x01, 0x00, 0x02, 0x80, 0x02, 0x00, 0x08, 0x80, 0x03,
0x80, 0x04,
];

actor.handle_template(IPFixTemplatesMessage::new(
String::from("session_a"),
Arc::new(Vec::from(template_bytes)),
Some(vec!["Ethernet0".to_string(), "Ethernet1".to_string()]),
));
assert_eq!(
actor.object_names_map.get("session_a"),
Some(&vec!["Ethernet0".to_string(), "Ethernet1".to_string()])
);

actor.handle_template(IPFixTemplatesMessage::new(
String::from("session_a"),
Arc::new(Vec::from(template_bytes)),
None,
));

assert!(
actor.object_names_map.get("session_a").is_none(),
"stale object_names should be cleared when a template update omits them"
);
}

#[tokio::test]
async fn test_ipfix() {
clear_logs(); // Clear any previous logs to ensure clean test state
Expand Down
Loading