diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3dce4f6..cf6df5b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,6 +1,7 @@ name: ci on: push: + branches: [ main ] pull_request: jobs: diff --git a/docs/design/read-planning.md b/docs/design/read-planning.md new file mode 100644 index 0000000..06d8b89 --- /dev/null +++ b/docs/design/read-planning.md @@ -0,0 +1,25 @@ +# Read Planning + +This document outlines the strategy for planning and executing reads from Tdms files, focusing on efficient operation. + +## Hierarchy of Plans + +The planning generates three levels of plans: + +* File Plans +* Data Block Plans +* Record Plans + +## File Plans + +The goal of the file plan is to determine which blocks need to be read and in what way. We may have alternative read methods based on the type of read we want to complete. + +The main case for that right now is if we want to skip some samples. + +## Data Block Plans + +A plan for a data block states which samples from which channels in the block need to be read. + +## Record Plans + +The record plan describes the size of the records in the block to allow for skipping records and unread data. diff --git a/src/file/channel_reader.rs b/src/file/channel_reader.rs index 21b2ce1..60b9d93 100644 --- a/src/file/channel_reader.rs +++ b/src/file/channel_reader.rs @@ -1,8 +1,15 @@ use crate::paths::ChannelPath; +use crate::raw_data::BlockReadChannelConfig; use crate::{TdmsFile, error::TdmsError, index::DataLocation, io::data_types::TdmsStorageType}; #[derive(Eq, PartialEq, Clone, Debug)] -struct MultiChannelLocation { +struct ChannelReadPlan { + index: usize, + samples_to_skip: u64, +} + +#[derive(Eq, PartialEq, Clone, Debug)] +struct BlockRead { ///The data block index/number. data_block: usize, ///The channel locations in this block. @@ -10,7 +17,7 @@ struct MultiChannelLocation { /// /// todo: can we avoid a vec here? It should be small /// so smallvec or array may work. - channel_indexes: Vec>, + channel_indexes: Vec>, } #[derive(Eq, PartialEq, Clone, Debug)] @@ -51,34 +58,37 @@ impl TdmsFile { &mut self, channel: &ChannelPath, output: &mut [D], + ) -> Result<(), TdmsError> { + self.read_channel_from(channel, 0, output) + } + + /// Read a single channel from the tdms file starting at a specific sample position. + /// + /// channel should provide a path to the channel. + /// start is the number of samples to skip before reading. + /// output is a mutable slice for the data to be written into. + /// + /// If there is more data in the file than the size of the slice, we will stop reading at the end of the slice. + /// + /// # Performance + /// + /// This method optimizes reading by skipping entire data blocks when possible. + /// For example, if you want to start reading at sample 1500 and the first block contains + /// 1000 samples, it will skip the entire first block and start reading from sample 500 + /// of the second block. + pub fn read_channel_from( + &mut self, + channel: &ChannelPath, + start: u64, + output: &mut [D], ) -> Result<(), TdmsError> { let data_positions = self .index .get_channel_data_positions(channel) .ok_or_else(|| TdmsError::MissingObject(channel.path().to_owned()))?; - let mut samples_read = 0; - - for location in data_positions { - let block = self - .index - .get_data_block(location.data_block) - .ok_or_else(|| { - TdmsError::DataBlockNotFound(channel.clone(), location.data_block) - })?; - - samples_read += block.read_single( - location.channel_index, - &mut self.file, - &mut output[samples_read..], - )?; - - if samples_read >= output.len() { - break; - } - } - - Ok(()) + let plan = read_plan(&[data_positions], &[start]); + self.execute_read_plan(plan, &mut [output]) } /// Read multiple channels from the tdms file. @@ -89,6 +99,29 @@ impl TdmsFile { &mut self, channels: &[impl AsRef], output: &mut [&mut [D]], + ) -> Result<(), TdmsError> { + self.read_channels_from(channels, 0, output) + } + + /// Read multiple channels from the tdms file starting at a specific sample position. + /// + /// All channels will start reading from the same sample offset. + /// This is efficient for time-aligned data where all channels share the same time base. + /// + /// channels should provide a slice of paths to the channels. + /// start is the number of samples to skip before reading (same for all channels). + /// output is a set of mutable slices for the data to be written into. + /// Each channel will be read for the length of its corresponding slice. + /// + /// # Performance + /// + /// This method optimizes reading by skipping entire data blocks when possible. + /// A block is only skipped if all channels have their start position beyond that block. + pub fn read_channels_from( + &mut self, + channels: &[impl AsRef], + start: u64, + output: &mut [&mut [D]], ) -> Result<(), TdmsError> { let channel_positions = channels .iter() @@ -99,14 +132,33 @@ impl TdmsFile { }) .collect::, TdmsError>>()?; - let read_plan = read_plan(&channel_positions[..]); + let start_skips: Vec = vec![start; channels.len()]; + let plan = read_plan(&channel_positions[..], &start_skips); + + self.execute_read_plan(plan, output) + } + /// Execute a read plan, reading data from blocks into the output slices. + /// + /// This is the core read execution logic used by all read methods. + /// The plan specifies which blocks to read and any per-channel skip amounts. + fn execute_read_plan( + &mut self, + plan: Vec, + output: &mut [&mut [D]], + ) -> Result<(), TdmsError> { let mut channel_progress: Vec = output .iter() .map(|out_slice| ChannelProgress::new(out_slice.len())) .collect(); - for location in read_plan { + for location in plan { + // Check if any channel needs to skip at the start of this block + let any_skip_needed = location + .channel_indexes + .iter() + .any(|plan| plan.is_some() && plan.as_ref().unwrap().samples_to_skip > 0); + let block = self .index .get_data_block(location.data_block) @@ -117,14 +169,29 @@ impl TdmsFile { ) })?; - let mut channels_to_read = get_block_read_data(&location, output, &channel_progress); - - let location_samples_read = block.read(&mut self.file, &mut channels_to_read)?; - - let read_complete = - update_progress(location, &mut channel_progress, location_samples_read); + // Use fast path if no skip needed, slow path otherwise + let location_samples_read = if any_skip_needed { + let mut channels_with_skip = + get_block_read_data_with_skip(&location, output, &channel_progress); + block.read_with_per_channel_skip(&mut self.file, &mut channels_with_skip)? + } else { + let mut channels_to_read = + get_block_read_data(&location, output, &channel_progress); + block.read(&mut self.file, &mut channels_to_read)? + }; + + // Update progress + for (plan, progress) in location + .channel_indexes + .iter() + .zip(channel_progress.iter_mut()) + { + if plan.is_some() { + progress.add_samples(location_samples_read); + } + } - if read_complete { + if all_channels_complete(&channel_progress) { break; } } @@ -135,7 +202,7 @@ impl TdmsFile { /// Get the read parameters and output for this particular block. fn get_block_read_data<'a, 'b: 'o, 'c: 'o, 'o, D: TdmsStorageType>( - location: &'a MultiChannelLocation, + location: &'a BlockRead, output: &'b mut [&'c mut [D]], channel_progress: &[ChannelProgress], ) -> Vec<(usize, &'o mut [D])> { @@ -144,34 +211,43 @@ fn get_block_read_data<'a, 'b: 'o, 'c: 'o, 'o, D: TdmsStorageType>( .iter() .zip(output.iter_mut()) .zip(channel_progress.iter()) - .filter_map(|((channel_id, output), progress)| { - match (channel_id, progress) { - // If we have it our target, ignore this channel. + .filter_map(|((plan, output), progress)| { + match (plan, progress) { + // If we have hit our target, ignore this channel. (Some(_), progress) if progress.is_complete() => None, // More to read - include this channel. - (Some(idx), progress) => Some((*idx, &mut output[progress.samples_read..])), + (Some(plan), progress) => Some((plan.index, &mut output[progress.samples_read..])), _ => None, } }) .collect::>() } -/// Update the progress of the channels we have read. -/// -/// Returns true if all are complete. -fn update_progress( - location: MultiChannelLocation, - channel_progress: &mut [ChannelProgress], - iteration_samples: usize, -) -> bool { - assert_eq!(channel_progress.len(), location.channel_indexes.len()); - - for (ch_idx, block_idx) in location.channel_indexes.iter().enumerate() { - if block_idx.is_some() { - channel_progress[ch_idx].add_samples(iteration_samples); - } - } - all_channels_complete(channel_progress) +/// Get the read parameters, output, and skip amounts for this particular block. +fn get_block_read_data_with_skip<'a, 'b: 'o, 'c: 'o, 'o, D: TdmsStorageType>( + location: &'a BlockRead, + output: &'b mut [&'c mut [D]], + channel_progress: &[ChannelProgress], +) -> Vec> { + location + .channel_indexes + .iter() + .zip(output.iter_mut()) + .zip(channel_progress.iter()) + .filter_map(|((plan, output), progress)| { + match (plan, progress) { + // If we have hit our target, ignore this channel. + (Some(_), progress) if progress.is_complete() => None, + // More to read - include this channel with its skip amount. + (Some(plan), progress) => Some(BlockReadChannelConfig { + channel_index: plan.index, + output: &mut output[progress.samples_read..], + samples_to_skip: plan.samples_to_skip, + }), + _ => None, + } + }) + .collect::>() } fn all_channels_complete(channel_progress: &[ChannelProgress]) -> bool { @@ -182,55 +258,72 @@ fn all_channels_complete(channel_progress: &[ChannelProgress]) -> bool { /// Plan the locations that we need to visit for each channel. /// +/// Blocks are skipped entirely when all channels can skip them. +/// The first block that needs reading for each channel includes the partial skip amount. +/// /// todo:: Can we make this an iterator to avoid the vec allocation. /// todo: pretty sure we can use iterators more effectively here. -fn read_plan(channel_positions: &[&[DataLocation]]) -> Vec { +fn read_plan(channel_positions: &[&[DataLocation]], start_skips: &[u64]) -> Vec { let channels = channel_positions.len(); let mut next_location = vec![0usize; channels]; - let mut blocks: Vec = Vec::new(); + let mut remaining_skips: Vec = start_skips.to_vec(); + let mut blocks: Vec = Vec::new(); loop { + // Find the minimum data block among all channels' next locations let next_block = channel_positions .iter() .zip(next_location.iter()) - .map(|(locations, &index)| { - if let Some(location) = locations.get(index) { - location.data_block - } else { - usize::MAX - } - }) + .filter_map(|(locations, &index)| locations.get(index).map(|loc| loc.data_block)) .min(); - // Empty iterator check. let Some(next_block) = next_block else { return blocks; }; - //All out of range check. - if next_block == usize::MAX { - return blocks; - }; - - let channel_indexes: Vec> = channel_positions - .iter() - .zip(next_location.iter_mut()) - .map(|(locations, index)| { - let next_location = locations.get(*index)?; - - if next_location.data_block == next_block { - *index += 1; - Some(next_location.channel_index) - } else { - None + // Build channel read plans for this block + let mut channel_read_plans: Vec> = Vec::with_capacity(channels); + let mut any_needs_read = false; + + for ch_idx in 0..channels { + let locations = &channel_positions[ch_idx]; + let loc_idx = next_location[ch_idx]; + + match locations.get(loc_idx) { + Some(loc) if loc.data_block == next_block => { + let block_samples = loc.number_of_samples; + let skip = remaining_skips[ch_idx]; + + if skip >= block_samples { + // Can skip entire block for this channel - don't include in read + channel_read_plans.push(None); + } else { + // Need to read from this block (possibly after partial skip) + any_needs_read = true; + channel_read_plans.push(Some(ChannelReadPlan { + index: loc.channel_index, + samples_to_skip: skip, + })); + } + + // Advance to next location and update remaining skip + next_location[ch_idx] += 1; + remaining_skips[ch_idx] = remaining_skips[ch_idx].saturating_sub(block_samples); } - }) - .collect(); + _ => { + // Channel not in this block + channel_read_plans.push(None); + } + } + } - blocks.push(MultiChannelLocation { - data_block: next_block, - channel_indexes, - }) + // Only add the block if at least one channel needs to read + if any_needs_read { + blocks.push(BlockRead { + data_block: next_block, + channel_indexes: channel_read_plans, + }); + } } } @@ -256,16 +349,65 @@ mod tests { }, ]; - let plan = read_plan(&[&channel_locations[..]]); + let plan = read_plan(&[&channel_locations[..]], &[0]); let expected_plan = vec![ - MultiChannelLocation { + BlockRead { data_block: 20, - channel_indexes: vec![Some(1)], + channel_indexes: vec![Some(ChannelReadPlan { + index: 1, + samples_to_skip: 0, + })], }, - MultiChannelLocation { + BlockRead { data_block: 21, - channel_indexes: vec![Some(1)], + channel_indexes: vec![Some(ChannelReadPlan { + index: 1, + samples_to_skip: 0, + })], + }, + ]; + + assert_eq!(plan, expected_plan); + } + + #[test] + fn test_read_plan_single_channel_with_skip() { + let channel_locations = vec![ + DataLocation { + data_block: 20, + channel_index: 1, + number_of_samples: 1000, + }, + DataLocation { + data_block: 21, + channel_index: 1, + number_of_samples: 1000, + }, + DataLocation { + data_block: 22, + channel_index: 1, + number_of_samples: 1000, + }, + ]; + + // Skip 1500 samples: skip entire first block (1000), partial skip on second (500) + let plan = read_plan(&[&channel_locations[..]], &[1500]); + + let expected_plan = vec![ + BlockRead { + data_block: 21, + channel_indexes: vec![Some(ChannelReadPlan { + index: 1, + samples_to_skip: 500, + })], + }, + BlockRead { + data_block: 22, + channel_indexes: vec![Some(ChannelReadPlan { + index: 1, + samples_to_skip: 0, + })], }, ]; @@ -300,19 +442,154 @@ mod tests { }, ]; - let plan = read_plan(&[&channel_location_1[..], &channel_location_2[..]]); + let plan = read_plan(&[&channel_location_1[..], &channel_location_2[..]], &[0, 0]); + + let expected_plan = vec![ + BlockRead { + data_block: 20, + channel_indexes: vec![ + Some(ChannelReadPlan { + index: 1, + samples_to_skip: 0, + }), + Some(ChannelReadPlan { + index: 2, + samples_to_skip: 0, + }), + ], + }, + BlockRead { + data_block: 21, + channel_indexes: vec![ + Some(ChannelReadPlan { + index: 1, + samples_to_skip: 0, + }), + Some(ChannelReadPlan { + index: 0, + samples_to_skip: 0, + }), + ], + }, + ]; + + assert_eq!(plan, expected_plan); + } + + #[test] + fn test_read_plan_multi_channel_with_skip() { + let channel_location_1 = vec![ + DataLocation { + data_block: 20, + channel_index: 1, + number_of_samples: 1000, + }, + DataLocation { + data_block: 21, + channel_index: 1, + number_of_samples: 1000, + }, + ]; + + let channel_location_2 = vec![ + DataLocation { + data_block: 20, + channel_index: 2, + number_of_samples: 1000, + }, + DataLocation { + data_block: 21, + channel_index: 0, + number_of_samples: 1000, + }, + ]; + + // Skip 500 for both channels - partial skip on first block + let plan = read_plan( + &[&channel_location_1[..], &channel_location_2[..]], + &[500, 500], + ); let expected_plan = vec![ - MultiChannelLocation { + BlockRead { + data_block: 20, + channel_indexes: vec![ + Some(ChannelReadPlan { + index: 1, + samples_to_skip: 500, + }), + Some(ChannelReadPlan { + index: 2, + samples_to_skip: 500, + }), + ], + }, + BlockRead { + data_block: 21, + channel_indexes: vec![ + Some(ChannelReadPlan { + index: 1, + samples_to_skip: 0, + }), + Some(ChannelReadPlan { + index: 0, + samples_to_skip: 0, + }), + ], + }, + ]; + + assert_eq!(plan, expected_plan); + } + + #[test] + fn test_read_plan_multi_channel_skip_entire_block() { + let channel_location_1 = vec![ + DataLocation { + data_block: 20, + channel_index: 1, + number_of_samples: 1000, + }, + DataLocation { + data_block: 21, + channel_index: 1, + number_of_samples: 1000, + }, + ]; + + let channel_location_2 = vec![ + DataLocation { data_block: 20, - channel_indexes: vec![Some(1), Some(2)], + channel_index: 2, + number_of_samples: 1000, }, - MultiChannelLocation { + DataLocation { data_block: 21, - channel_indexes: vec![Some(1), Some(0)], + channel_index: 0, + number_of_samples: 1000, }, ]; + // Skip 1000 for both channels - skip entire first block + let plan = read_plan( + &[&channel_location_1[..], &channel_location_2[..]], + &[1000, 1000], + ); + + let expected_plan = vec![BlockRead { + data_block: 21, + channel_indexes: vec![ + Some(ChannelReadPlan { + index: 1, + samples_to_skip: 0, + }), + Some(ChannelReadPlan { + index: 0, + samples_to_skip: 0, + }), + ], + }]; + assert_eq!(plan, expected_plan); } @@ -354,24 +631,54 @@ mod tests { }, ]; - let plan = read_plan(&[&channel_location_1[..], &channel_location_2[..]]); + let plan = read_plan(&[&channel_location_1[..], &channel_location_2[..]], &[0, 0]); let expected_plan = vec![ - MultiChannelLocation { + BlockRead { data_block: 20, - channel_indexes: vec![Some(1), Some(2)], + channel_indexes: vec![ + Some(ChannelReadPlan { + index: 1, + samples_to_skip: 0, + }), + Some(ChannelReadPlan { + index: 2, + samples_to_skip: 0, + }), + ], }, - MultiChannelLocation { + BlockRead { data_block: 21, - channel_indexes: vec![Some(1), Some(0)], + channel_indexes: vec![ + Some(ChannelReadPlan { + index: 1, + samples_to_skip: 0, + }), + Some(ChannelReadPlan { + index: 0, + samples_to_skip: 0, + }), + ], }, - MultiChannelLocation { + BlockRead { data_block: 22, - channel_indexes: vec![None, Some(1)], + channel_indexes: vec![ + None, + Some(ChannelReadPlan { + index: 1, + samples_to_skip: 0, + }), + ], }, - MultiChannelLocation { + BlockRead { data_block: 25, - channel_indexes: vec![Some(0), None], + channel_indexes: vec![ + Some(ChannelReadPlan { + index: 0, + samples_to_skip: 0, + }), + None, + ], }, ]; diff --git a/src/io/data_types/mod.rs b/src/io/data_types/mod.rs index 38d229c..cf11837 100644 --- a/src/io/data_types/mod.rs +++ b/src/io/data_types/mod.rs @@ -119,7 +119,7 @@ impl Display for DataType { type StorageResult = std::result::Result; -pub trait TdmsStorageType: Sized { +pub trait TdmsStorageType: Sized + 'static { /// The [`DataType`] that can be read as this storage type. const SUPPORTED_TYPES: &'static [DataType]; /// The [`DataType`] that this storage type is naturally written as. diff --git a/src/raw_data/contigious_multi_channel_read.rs b/src/raw_data/contigious_multi_channel_read.rs index 88eda57..a724010 100644 --- a/src/raw_data/contigious_multi_channel_read.rs +++ b/src/raw_data/contigious_multi_channel_read.rs @@ -2,7 +2,7 @@ //! //! -use super::records::{RecordEntryPlan, RecordStructure}; +use super::records::{RecordEntryPlan, RecordPlan}; use crate::io::reader::TdmsReader; use crate::{error::TdmsError, io::data_types::TdmsStorageType}; use std::num::NonZeroU64; @@ -11,17 +11,17 @@ use std::{ marker::PhantomData, }; -/// The multichannel contigious reader will read from an contigous block. +/// The multichannel contiguous reader will read from a contiguous block. /// /// We will assume a single datatype as it is unclear if multiple types exist in the wild. -pub struct MultiChannelContigousReader> { +pub struct MultiChannelContiguousReader> { reader: T, _marker: PhantomData, block_size: NonZeroU64, block_start: u64, } -impl> MultiChannelContigousReader { +impl> MultiChannelContiguousReader { pub fn new(reader: T, block_start: u64, block_size: NonZeroU64) -> Self { Self { reader, @@ -39,36 +39,179 @@ impl> MultiChannelContigousReader { /// pub fn read( &mut self, - mut channels: RecordStructure, + channels: RecordPlan, + ) -> Result { + // Since the skip is highly efficient for contiguous data, we can use a + // single implementation. + self.read_from(channels, 0) + } + + /// Read the data from the block starting at a specific sample offset. + /// + /// For contiguous data, samples are stored sequentially per channel: + /// [Ch1 S0][Ch1 S1]...[Ch1 SN][Ch2 S0][Ch2 S1]... + /// + /// To skip samples, we need to skip within each channel's contiguous data. + pub fn read_from( + &mut self, + mut channels: RecordPlan, + start_sample: u64, + ) -> Result { + self.reader.to_file_position(self.block_start)?; + + let total_sub_blocks = self.block_size.get() / channels.block_size() as u64; + + // Calculate how many complete sub-blocks to skip and the remainder + let sub_block_length = channels.read_instructions()[0].length as u64; + let sub_blocks_to_skip = start_sample / sub_block_length; + let remainder_skip = start_sample % sub_block_length; + + let mut length = 0; + + for sub_block_idx in 0..total_sub_blocks { + if sub_block_idx < sub_blocks_to_skip { + // Skip entire sub-block by seeking past it + let skip_bytes = channels.block_size() as i64; + self.reader.move_position(skip_bytes)?; + } else if sub_block_idx == sub_blocks_to_skip { + // First sub-block to read - apply remainder skip + for channel in channels.read_instructions() { + if let RecordEntryPlan::Read { + block_skip: skip_first_samples, + .. + } = &mut channel.plan + { + *skip_first_samples = remainder_skip; + } + } + length += self.read_sub_block(&mut channels)?; + } else { + for channel in channels.read_instructions() { + if let RecordEntryPlan::Read { + block_skip: skip_first_samples, + .. + } = &mut channel.plan + { + *skip_first_samples = 0; + } + } + // Subsequent sub-blocks - no skip + length += self.read_sub_block(&mut channels)?; + } + } + + Ok(length) + } + + /// Read the data from the block with per-channel skip amounts. + /// + /// For contiguous data, each channel can skip independently by seeking. + pub fn read_with_per_channel_skip( + &mut self, + mut channels: RecordPlan, + skip_amounts: &[u64], ) -> Result { self.reader.to_file_position(self.block_start)?; let total_sub_blocks = self.block_size.get() / channels.block_size() as u64; + // Calculate per-channel sub-blocks to skip and remainders + let sub_block_length = channels.read_instructions()[0].length as u64; + let sub_blocks_to_skip: Vec = skip_amounts + .iter() + .map(|&skip| skip / sub_block_length) + .collect(); + let remainder_skips: Vec = skip_amounts + .iter() + .map(|&skip| skip % sub_block_length) + .collect(); + let mut length = 0; - for _ in 0..total_sub_blocks { - length += self.read_sub_block(&mut channels)?; + for sub_block_idx in 0..total_sub_blocks { + // Check if any channel needs to read from this sub-block + let any_channel_reads = sub_blocks_to_skip.iter().all(|&skip| sub_block_idx >= skip); + + if !any_channel_reads { + // Skip entire sub-block + let skip_bytes = channels.block_size() as i64; + self.reader.move_position(skip_bytes)?; + } else { + // Build skip amounts for this sub-block + let channel_skip_values = + channels + .read_instructions() + .iter_mut() + .filter_map(|instruction| { + if let RecordEntryPlan::Read { + block_skip: skip_first_samples, + .. + } = &mut instruction.plan + { + Some(skip_first_samples) + } else { + None + } + }); + for ((blocks_to_skip, remainder_skip), sub_block_skip) in sub_blocks_to_skip + .iter() + .zip(remainder_skips.iter()) + .zip(channel_skip_values) + { + *sub_block_skip = Self::calculate_skip_for_this_block( + sub_block_idx, + *blocks_to_skip, + *remainder_skip, + ); + } + length += self.read_sub_block(&mut channels)?; + } } Ok(length) } + fn calculate_skip_for_this_block( + sub_block_idx: u64, + blocks_to_skip: u64, + remainder_skip: u64, + ) -> u64 { + if sub_block_idx == blocks_to_skip { + // First sub-block to read for this channel - use remainder + remainder_skip + } else if sub_block_idx > blocks_to_skip { + // Subsequent sub-blocks - no skip + 0 + } else { + // Should not happen if any_channel_reads is correct + 0 + } + } + fn read_sub_block( &mut self, - channels: &mut RecordStructure<'_, D>, + channels: &mut RecordPlan<'_, D>, ) -> Result { let mut length = 0; + for read_instruction in channels.read_instructions().iter_mut() { match &mut read_instruction.plan { - RecordEntryPlan::Read(output) => { - for _ in 0..read_instruction.length { - let read_value = self.reader.read_value()?; - if let Some(value) = output.next() { - *value = read_value; - } + RecordEntryPlan::Read { + output, + block_skip: skip_first_samples, + } => { + let skip = (*skip_first_samples).min(read_instruction.length as u64) as usize; + + let samples_to_read = read_instruction.length.saturating_sub(skip); + + // Skip samples by seeking + if skip > 0 { + let skip_bytes = skip as i64 * D::SIZE_BYTES as i64; + self.reader.move_position(skip_bytes)?; } - length = read_instruction.length; + + // Read the remaining samples + length = self.read_sequential_samples(output, samples_to_read)?; } RecordEntryPlan::Skip(bytes) => { let skip_bytes = *bytes * read_instruction.length as i64; @@ -79,6 +222,26 @@ impl> MultiChannelContigousReader { Ok(length) } + + /// Reads the samples until the specified value or the output ends. + fn read_sequential_samples<'a, D: TdmsStorageType, I: Iterator>( + &mut self, + output: &mut I, + samples_to_read: usize, + ) -> Result { + let mut length = 0; + for output_value in output.take(samples_to_read) { + *output_value = self.reader.read_value()?; + length += 1; + } + // Skip to end of unread samples. + let unread_samples = samples_to_read - length; + if unread_samples > 0 { + self.reader + .move_position(unread_samples as i64 * D::SIZE_BYTES as i64)?; + } + Ok(length) + } } #[cfg(test)] @@ -118,15 +281,14 @@ mod tests { let mut buffer = create_test_buffer(); let meta = create_test_meta_data(2); - let mut reader = MultiChannelContigousReader::<_, _>::new( + let mut reader = MultiChannelContiguousReader::<_, _>::new( BigEndianReader::from_reader(&mut buffer), 0, 800.try_into().unwrap(), ); let mut output: Vec = vec![0.0; 3]; let mut channels = vec![(0usize, &mut output[..])]; - let read_plan = - RecordStructure::::build_record_plan(&meta, &mut channels[..]).unwrap(); + let read_plan = RecordPlan::::build_record_plan(&meta, &mut channels[..]).unwrap(); reader.read(read_plan).unwrap(); assert_eq!(output, vec![0.0, 1.0, 2.0]); } @@ -137,7 +299,7 @@ mod tests { let meta = create_test_meta_data(4); let length = meta.first().unwrap().number_of_values as f64; - let mut reader = MultiChannelContigousReader::<_, _>::new( + let mut reader = MultiChannelContiguousReader::<_, _>::new( BigEndianReader::from_reader(&mut buffer), 0, 800.try_into().unwrap(), @@ -145,8 +307,7 @@ mod tests { let mut output_1: Vec = vec![0.0; 3]; let mut output_2: Vec = vec![0.0; 3]; let mut channels = vec![(0usize, &mut output_1[..]), (2usize, &mut output_2[..])]; - let read_plan = - RecordStructure::::build_record_plan(&meta, &mut channels[..]).unwrap(); + let read_plan = RecordPlan::::build_record_plan(&meta, &mut channels[..]).unwrap(); let output_2_start = length * 2.0; reader.read(read_plan).unwrap(); @@ -173,7 +334,7 @@ mod tests { // ch3: 4, 5, 12, 13 // ch4: 6, 7, 14, 15 - let mut reader = MultiChannelContigousReader::<_, _>::new( + let mut reader = MultiChannelContiguousReader::<_, _>::new( BigEndianReader::from_reader(&mut buffer), 0, 800.try_into().unwrap(), @@ -181,8 +342,7 @@ mod tests { let mut output_1: Vec = vec![0.0; 3]; let mut output_2: Vec = vec![0.0; 3]; let mut channels = vec![(0usize, &mut output_1[..]), (2usize, &mut output_2[..])]; - let read_plan = - RecordStructure::::build_record_plan(&meta, &mut channels[..]).unwrap(); + let read_plan = RecordPlan::::build_record_plan(&meta, &mut channels[..]).unwrap(); reader.read(read_plan).unwrap(); assert_eq!(output_1, vec![0.0, 1.0, 8.0]); @@ -195,7 +355,7 @@ mod tests { let meta = create_test_meta_data(4); let length = meta.first().unwrap().number_of_values as f64; - let mut reader = MultiChannelContigousReader::<_, _>::new( + let mut reader = MultiChannelContiguousReader::<_, _>::new( BigEndianReader::from_reader(&mut buffer), 0, 800.try_into().unwrap(), @@ -203,8 +363,7 @@ mod tests { let mut output_1: Vec = vec![0.0; 3]; let mut output_2: Vec = vec![0.0; 2]; let mut channels = vec![(0usize, &mut output_1[..]), (2usize, &mut output_2[..])]; - let read_plan = - RecordStructure::::build_record_plan(&meta, &mut channels[..]).unwrap(); + let read_plan = RecordPlan::::build_record_plan(&meta, &mut channels[..]).unwrap(); reader.read(read_plan).unwrap(); @@ -212,4 +371,67 @@ mod tests { assert_eq!(output_1, vec![0.0, 1.0, 2.0]); assert_eq!(output_2, vec![output2_start, output2_start + 1.0]); } + + #[test] + fn read_data_contigious_with_skip() { + let mut buffer = create_test_buffer(); + let meta = create_test_meta_data(4); + let length = meta.first().unwrap().number_of_values as f64; + + let mut reader = MultiChannelContiguousReader::<_, _>::new( + BigEndianReader::from_reader(&mut buffer), + 0, + 800.try_into().unwrap(), + ); + let mut output_1: Vec = vec![0.0; 3]; + let mut output_2: Vec = vec![0.0; 3]; + let mut channels = vec![(0usize, &mut output_1[..]), (2usize, &mut output_2[..])]; + let read_plan = RecordPlan::::build_record_plan(&meta, &mut channels[..]).unwrap(); + + // Skip first 2 samples from each channel + reader.read_from(read_plan, 2).unwrap(); + + let output_2_start = length * 2.0; + assert_eq!(output_1, vec![2.0, 3.0, 4.0]); + assert_eq!( + output_2, + vec![ + output_2_start + 2.0, + output_2_start + 3.0, + output_2_start + 4.0 + ] + ); + } + + #[test] + fn read_data_contigious_with_skip_and_multiple_blocks() { + let mut buffer = create_test_buffer(); + let mut meta = create_test_meta_data(2); + + // Set up for multiple sub-blocks + for channel in meta.iter_mut() { + channel.number_of_values = 3; + } + + let mut reader = MultiChannelContiguousReader::<_, _>::new( + BigEndianReader::from_reader(&mut buffer), + 0, + 800.try_into().unwrap(), + ); + let mut output_1: Vec = vec![0.0; 3]; + let mut output_2: Vec = vec![0.0; 3]; + let mut channels = vec![(0usize, &mut output_1[..]), (1usize, &mut output_2[..])]; + let read_plan = RecordPlan::::build_record_plan(&meta, &mut channels[..]).unwrap(); + + // Skip first sample from each channel + let values_read = reader.read_from(read_plan, 1).unwrap(); + + // ch1, block 1: 0, 1, 2 + // ch2, block 1: 3, 4, 5 + // ch1, block 2: 6, 7, 8 + // ch2, block 2: 9, 10, 11 + assert_eq!(output_1, vec![1.0, 2.0, 6.0]); + assert_eq!(output_2, vec![4.0, 5.0, 9.0]); + assert_eq!(values_read, 3); + } } diff --git a/src/raw_data/interleaved_multi_channel_read.rs b/src/raw_data/interleaved_multi_channel_read.rs index 67c3f64..e39e548 100644 --- a/src/raw_data/interleaved_multi_channel_read.rs +++ b/src/raw_data/interleaved_multi_channel_read.rs @@ -2,7 +2,7 @@ //! //! -use super::records::{RecordEntryPlan, RecordStructure}; +use super::records::{RecordEntryPlan, RecordPlan}; use crate::io::reader::TdmsReader; use crate::{error::TdmsError, io::data_types::TdmsStorageType}; use std::num::NonZeroU64; @@ -31,26 +31,54 @@ impl> MultiChannelInterleavedReader { } } - /// Read the data from the block for the channels specified into the output slices. + /// Read the data from the block for the channels specified into the output slices, + /// using the skip amounts provided in the plan. /// /// Returns the number of values read in this block. /// /// *ASSUMPTION*: All channels have the same number of values available. The spec - /// allows for different lengths but all clients have I have seen do not. + /// allows for different lengths, but all clients have I have seen do not. + /// + /// For interleaved data, we skip the minimum across all channels (entire rows), + /// then read rows while discarding samples for channels that need more skipping. pub fn read( &mut self, - mut channels: RecordStructure, + mut channels: RecordPlan, ) -> Result { self.reader.to_file_position(self.block_start)?; - let row_count = self.block_size.get() as usize / channels.row_size(); + let total_row_count = self.block_size.get() / channels.row_size() as u64; + + // Find minimum skip (we can skip entire rows up to this point) + let min_skip = channels.block_skips().min().unwrap_or(0); + + // Skip entire rows + if min_skip > 0 { + let skip_bytes = min_skip as i64 * channels.row_size() as i64; + self.reader.move_position(skip_bytes)?; + } - for _ in 0..row_count { + // Calculate the remaining skip per channel + for skip in channels.block_skips_mut() { + *skip = skip.saturating_sub(min_skip); + } + + // Read rows, discarding samples for channels that still need to skip + let rows_to_process = total_row_count.saturating_sub(min_skip); + + let mut samples_read = 0; + for row in 0..rows_to_process { + let mut any_values_read = false; for read_instruction in channels.read_instructions().iter_mut() { match &mut read_instruction.plan { - RecordEntryPlan::Read(output) => { + RecordEntryPlan::Read { output, block_skip } => { let read_value = self.reader.read_value()?; - if let Some(value) = output.next() { + + // Only write if we've skipped enough for this channel + if row >= *block_skip + && let Some(value) = output.next() + { *value = read_value; + any_values_read = true; } } RecordEntryPlan::Skip(bytes) => { @@ -58,9 +86,24 @@ impl> MultiChannelInterleavedReader { } }; } + if !any_values_read { + break; + } + samples_read += 1; } - Ok(row_count) + Ok(samples_read) + } + + pub fn read_from( + &mut self, + mut channels: RecordPlan, + samples_to_skip: u64, + ) -> Result { + for channel_skip in channels.block_skips_mut() { + *channel_skip = samples_to_skip; + } + self.read(channels) } } @@ -106,8 +149,7 @@ mod tests { ); let mut output: Vec = vec![0.0; 3]; let mut channels = vec![(0usize, &mut output[..])]; - let read_plan = - RecordStructure::::build_record_plan(&meta, &mut channels[..]).unwrap(); + let read_plan = RecordPlan::::build_record_plan(&meta, &mut channels[..]).unwrap(); reader.read(read_plan).unwrap(); assert_eq!(output, vec![0.0, 2.0, 4.0]); } @@ -125,8 +167,7 @@ mod tests { let mut output_1: Vec = vec![0.0; 3]; let mut output_2: Vec = vec![0.0; 3]; let mut channels = vec![(0usize, &mut output_1[..]), (2usize, &mut output_2[..])]; - let read_plan = - RecordStructure::::build_record_plan(&meta, &mut channels[..]).unwrap(); + let read_plan = RecordPlan::::build_record_plan(&meta, &mut channels[..]).unwrap(); reader.read(read_plan).unwrap(); assert_eq!(output_1, vec![0.0, 4.0, 8.0]); assert_eq!(output_2, vec![2.0, 6.0, 10.0]); @@ -145,10 +186,34 @@ mod tests { let mut output_1: Vec = vec![0.0; 3]; let mut output_2: Vec = vec![0.0; 2]; let mut channels = vec![(0usize, &mut output_1[..]), (2usize, &mut output_2[..])]; - let read_plan = - RecordStructure::::build_record_plan(&meta, &mut channels[..]).unwrap(); + let read_plan = RecordPlan::::build_record_plan(&meta, &mut channels[..]).unwrap(); reader.read(read_plan).unwrap(); assert_eq!(output_1, vec![0.0, 4.0, 8.0]); assert_eq!(output_2, vec![2.0, 6.0]); } + + #[test] + fn read_data_interleaved_with_skip() { + let mut buffer = create_test_buffer(); + let meta = create_test_meta_data(4); + + let mut reader = MultiChannelInterleavedReader::<_, _>::new( + BigEndianReader::from_reader(&mut buffer), + 0, + 800.try_into().unwrap(), + ); + let mut output_1: Vec = vec![0.0; 3]; + let mut output_2: Vec = vec![0.0; 3]; + let mut channels = vec![(0usize, &mut output_1[..]), (2usize, &mut output_2[..])]; + let read_plan = RecordPlan::::build_record_plan(&meta, &mut channels[..]).unwrap(); + + // Skip first 2 rows (samples) + let rows_read = reader.read_from(read_plan, 2).unwrap(); + + // Interleaved: [0,1,2,3][4,5,6,7][8,9,10,11]... + // After skipping 2 rows: starts at row 2 which is [8,9,10,11] + assert_eq!(output_1, vec![8.0, 12.0, 16.0]); + assert_eq!(output_2, vec![10.0, 14.0, 18.0]); + assert_eq!(rows_read, 3); + } } diff --git a/src/raw_data/mod.rs b/src/raw_data/mod.rs index 6805af7..a437311 100644 --- a/src/raw_data/mod.rs +++ b/src/raw_data/mod.rs @@ -7,11 +7,11 @@ mod interleaved_multi_channel_read; mod records; mod write; -use records::RecordStructure; +use records::RecordPlan; pub use write::{MultiChannelSlice, WriteBlock}; use self::{ - contigious_multi_channel_read::MultiChannelContigousReader, + contigious_multi_channel_read::MultiChannelContiguousReader, interleaved_multi_channel_read::MultiChannelInterleavedReader, }; use crate::{ @@ -94,6 +94,13 @@ impl ChunkSize { } } +#[derive(Debug)] +pub struct BlockReadChannelConfig<'a, T: TdmsStorageType> { + pub channel_index: usize, + pub samples_to_skip: u64, + pub output: &'a mut [T], +} + /// Represents a block of data inside the file for fast random access. #[derive(Clone, PartialEq, Debug)] pub struct DataBlock { @@ -192,18 +199,18 @@ impl DataBlock { reader: &mut (impl Read + Seek), channels_to_read: &'b mut [(usize, &'b mut [D])], ) -> Result { - let record_plan = RecordStructure::build_record_plan(&self.channels, channels_to_read)?; + let record_plan = RecordPlan::build_record_plan(&self.channels, channels_to_read)?; match (self.layout, self.byte_order) { // No multichannel implementation for contiguous data yet. - (DataLayout::Contigious, Endianess::Big) => MultiChannelContigousReader::<_, _>::new( + (DataLayout::Contigious, Endianess::Big) => MultiChannelContiguousReader::<_, _>::new( BigEndianReader::from_reader(reader), self.start, self.length, ) .read(record_plan), (DataLayout::Contigious, Endianess::Little) => { - MultiChannelContigousReader::<_, _>::new( + MultiChannelContiguousReader::<_, _>::new( LittleEndianReader::from_reader(reader), self.start, self.length, @@ -241,6 +248,145 @@ impl DataBlock { //first is element size, second is total size. self.read(reader, &mut [(channel_index, output)]) } + + /// Read a single channel from the block starting at a specific sample offset. + /// + /// This method allows skipping a specified number of samples before reading. + /// The start_sample parameter indicates how many samples to skip in this block. + /// + /// Returns the number of samples actually read. + pub fn read_single_from( + &self, + channel_index: usize, + start_sample: u64, + reader: &mut (impl Read + Seek), + output: &mut [D], + ) -> Result { + self.read_from(reader, &mut [(channel_index, output)], start_sample) + } + + /// Read multiple channels from the block starting at a specific sample offset. + /// + /// This is the core implementation that supports reading with an offset. + /// The start_sample parameter indicates how many samples to skip in this block. + pub fn read_from<'b, D: TdmsStorageType>( + &self, + reader: &mut (impl Read + Seek), + channels_to_read: &'b mut [(usize, &'b mut [D])], + start_sample: u64, + ) -> Result { + let record_plan = RecordPlan::build_record_plan(&self.channels, channels_to_read)?; + + match (self.layout, self.byte_order) { + (DataLayout::Contigious, Endianess::Big) => MultiChannelContiguousReader::<_, _>::new( + BigEndianReader::from_reader(reader), + self.start, + self.length, + ) + .read_from(record_plan, start_sample), + (DataLayout::Contigious, Endianess::Little) => { + MultiChannelContiguousReader::<_, _>::new( + LittleEndianReader::from_reader(reader), + self.start, + self.length, + ) + .read_from(record_plan, start_sample) + } + (DataLayout::Interleaved, Endianess::Big) => { + MultiChannelInterleavedReader::<_, _>::new( + BigEndianReader::from_reader(reader), + self.start, + self.length, + ) + .read_from(record_plan, start_sample) + } + (DataLayout::Interleaved, Endianess::Little) => { + MultiChannelInterleavedReader::<_, _>::new( + LittleEndianReader::from_reader(reader), + self.start, + self.length, + ) + .read_from(record_plan, start_sample) + } + } + } + + /// Read multiple channels with per-channel skip amounts. + /// + /// Each element in channels_to_read is a tuple of (channel_index, output_buffer, skip_amount). + /// The skip_amount specifies how many samples to skip for that channel in this block. + /// + /// This is used when channels have different amounts of data in a block or were + /// written in separate blocks, requiring independent skip tracking per channel. + pub fn read_with_per_channel_skip<'b, D: TdmsStorageType>( + &self, + reader: &mut (impl Read + Seek), + channels_to_read: &'b mut [BlockReadChannelConfig<'b, D>], + ) -> Result { + // Extract skip amounts first (before mutable borrow) + let skip_amounts: Vec = channels_to_read + .iter() + .map( + |BlockReadChannelConfig { + samples_to_skip: skip, + .. + }| *skip, + ) + .collect(); + + // Extract the channel indices and buffers for the record plan + let mut channel_refs: Vec<(usize, &mut [D])> = channels_to_read + .iter_mut() + .map( + |BlockReadChannelConfig { + channel_index, + output, + .. + }| (*channel_index, &mut output[..]), + ) + .collect(); + + let mut record_plan = RecordPlan::build_record_plan(&self.channels, &mut channel_refs)?; + + match (self.layout, self.byte_order) { + (DataLayout::Contigious, Endianess::Big) => MultiChannelContiguousReader::<_, _>::new( + BigEndianReader::from_reader(reader), + self.start, + self.length, + ) + .read_with_per_channel_skip(record_plan, &skip_amounts), + (DataLayout::Contigious, Endianess::Little) => { + MultiChannelContiguousReader::<_, _>::new( + LittleEndianReader::from_reader(reader), + self.start, + self.length, + ) + .read_with_per_channel_skip(record_plan, &skip_amounts) + } + (DataLayout::Interleaved, Endianess::Big) => { + for (plan_skip, skip_amount) in record_plan.block_skips_mut().zip(skip_amounts) { + *plan_skip = skip_amount; + } + MultiChannelInterleavedReader::<_, _>::new( + BigEndianReader::from_reader(reader), + self.start, + self.length, + ) + .read(record_plan) + } + (DataLayout::Interleaved, Endianess::Little) => { + for (plan_skip, skip_amount) in record_plan.block_skips_mut().zip(skip_amounts) { + *plan_skip = skip_amount; + } + MultiChannelInterleavedReader::<_, _>::new( + LittleEndianReader::from_reader(reader), + self.start, + self.length, + ) + .read(record_plan) + } + } + } } #[cfg(test)] diff --git a/src/raw_data/records.rs b/src/raw_data/records.rs index cef940e..913cd21 100644 --- a/src/raw_data/records.rs +++ b/src/raw_data/records.rs @@ -13,15 +13,20 @@ use crate::{error::TdmsError, io::data_types::TdmsStorageType, meta_data::RawDat pub enum RecordEntryPlan<'a, T: 'a, I: Iterator> { // An entry that isn't to be read Skip(i64), - // Read entry to output index. - Read(I), + // Read entry to output index. The + Read { + output: I, + /// If set, skip these many samples before reading the entry. + /// To be set by the reader rather than generated in the plan. + block_skip: u64, + }, } impl<'a, T: TdmsStorageType, I: Iterator> RecordEntryPlan<'a, T, I> { fn entry_size_bytes(&self) -> Option { match self { RecordEntryPlan::Skip(bytes) => Some(*bytes as usize), - RecordEntryPlan::Read(_) => Some(T::SIZE_BYTES), + RecordEntryPlan::Read { .. } => Some(T::SIZE_BYTES), } } } @@ -39,9 +44,9 @@ pub struct RecordEntry<'a, T: 'a> { /// ready for reading. Marking sizes and positions of readable /// records and their outputs. #[derive(Debug)] -pub struct RecordStructure<'a, T>(Vec>); +pub struct RecordPlan<'a, T>(Vec>); -impl<'o, 'b: 'o, T: TdmsStorageType> RecordStructure<'o, T> { +impl<'o, 'b: 'o, T: TdmsStorageType> RecordPlan<'o, T> { /// Build a record structure for the channels specified. /// /// `channels` - This is the structure of the data segment. @@ -51,7 +56,7 @@ impl<'o, 'b: 'o, T: TdmsStorageType> RecordStructure<'o, T> { pub fn build_record_plan( channels: &[RawDataMeta], outputs: &'b mut [(usize, &'b mut [T])], - ) -> Result, TdmsError> { + ) -> Result, TdmsError> { let mut plan = Self::build_base_record(channels); validate_types_match(outputs, channels)?; @@ -65,6 +70,28 @@ impl<'o, 'b: 'o, T: TdmsStorageType> RecordStructure<'o, T> { &mut self.0[..] } + /// Get the block skips per read channel in the read plan. + pub fn block_skips(&self) -> impl Iterator + '_ { + self.0.iter().filter_map(|entry| { + if let RecordEntryPlan::Read { block_skip, .. } = &entry.plan { + Some(*block_skip) + } else { + None + } + }) + } + + /// Get the block skips as mut. + pub fn block_skips_mut(&mut self) -> impl Iterator + '_ { + self.0.iter_mut().filter_map(|entry| { + if let RecordEntryPlan::Read { block_skip, .. } = &mut entry.plan { + Some(block_skip) + } else { + None + } + }) + } + /// Get the size of a single record in bytes. /// /// ## Panics @@ -113,7 +140,10 @@ impl<'o, 'b: 'o, T: TdmsStorageType> RecordStructure<'o, T> { /// Set which records should be read in the plan. fn set_readable_records(&mut self, outputs: &'b mut [(usize, &'b mut [T])]) { for output in outputs { - self.0[output.0].plan = RecordEntryPlan::Read(output.1.iter_mut()); + self.0[output.0].plan = RecordEntryPlan::Read { + output: output.1.iter_mut(), + block_skip: 0, + }; } } @@ -161,14 +191,13 @@ mod tests { let mut outputs: Vec<(usize, &mut [f64])> = vec![(0, &mut out1), (1, &mut out2)]; - let read_plan = - RecordStructure::::build_record_plan(&channels, &mut outputs[..]).unwrap(); + let read_plan = RecordPlan::::build_record_plan(&channels, &mut outputs[..]).unwrap(); assert_eq!(read_plan.0.len(), 2); assert_eq!(read_plan.0[0].length, 1000); assert_eq!(read_plan.0[1].length, 1000); - assert!(matches!(read_plan.0[0].plan, RecordEntryPlan::Read(_))); - assert!(matches!(read_plan.0[0].plan, RecordEntryPlan::Read(_))); + assert!(matches!(read_plan.0[0].plan, RecordEntryPlan::Read { .. })); + assert!(matches!(read_plan.0[0].plan, RecordEntryPlan::Read { .. })); } #[test] @@ -189,13 +218,12 @@ mod tests { let mut outputs: Vec<(usize, &mut [f64])> = vec![(1, &mut out1)]; - let read_plan = - RecordStructure::::build_record_plan(&channels, &mut outputs[..]).unwrap(); + let read_plan = RecordPlan::::build_record_plan(&channels, &mut outputs[..]).unwrap(); assert_eq!(read_plan.0.len(), 2); assert_eq!(read_plan.0[1].length, 1000); assert!(matches!(read_plan.0[0].plan, RecordEntryPlan::Skip(8))); - assert!(matches!(read_plan.0[1].plan, RecordEntryPlan::Read(_))); + assert!(matches!(read_plan.0[1].plan, RecordEntryPlan::Read { .. })); } #[test] @@ -216,8 +244,7 @@ mod tests { let mut outputs: Vec<(usize, &mut [u32])> = vec![(1, &mut out1)]; - let read_plan_result = - RecordStructure::::build_record_plan(&channels, &mut outputs[..]); + let read_plan_result = RecordPlan::::build_record_plan(&channels, &mut outputs[..]); assert!(matches!( read_plan_result, @@ -246,8 +273,7 @@ mod tests { let mut outputs: Vec<(usize, &mut [f64])> = vec![(1, &mut out1)]; - let read_plan_result = - RecordStructure::::build_record_plan(&channels, &mut outputs[..]); + let read_plan_result = RecordPlan::::build_record_plan(&channels, &mut outputs[..]); assert!(matches!(read_plan_result, Ok(_))); } @@ -276,13 +302,12 @@ mod tests { let mut outputs: Vec<(usize, &mut [f64])> = vec![(1, &mut out1)]; - let read_plan = - RecordStructure::::build_record_plan(&channels, &mut outputs[..]).unwrap(); + let read_plan = RecordPlan::::build_record_plan(&channels, &mut outputs[..]).unwrap(); assert_eq!(read_plan.0.len(), 2); assert_eq!(read_plan.0[1].length, 1000); assert!(matches!(read_plan.0[0].plan, RecordEntryPlan::Skip(12))); - assert!(matches!(read_plan.0[1].plan, RecordEntryPlan::Read(_))); + assert!(matches!(read_plan.0[1].plan, RecordEntryPlan::Read { .. })); } #[test] @@ -308,8 +333,7 @@ mod tests { let mut outputs: Vec<(usize, &mut [i32])> = vec![(1, &mut out1)]; - let read_plan = - RecordStructure::::build_record_plan(&channels, &mut outputs[..]).unwrap(); + let read_plan = RecordPlan::::build_record_plan(&channels, &mut outputs[..]).unwrap(); assert_eq!(read_plan.row_size(), 20); } @@ -337,8 +361,7 @@ mod tests { let mut outputs: Vec<(usize, &mut [i32])> = vec![(1, &mut out1)]; - let read_plan = - RecordStructure::::build_record_plan(&channels, &mut outputs[..]).unwrap(); + let read_plan = RecordPlan::::build_record_plan(&channels, &mut outputs[..]).unwrap(); assert_eq!(read_plan.block_size(), 20000); } diff --git a/tests/read_with_offset.rs b/tests/read_with_offset.rs new file mode 100644 index 0000000..db135c0 --- /dev/null +++ b/tests/read_with_offset.rs @@ -0,0 +1,346 @@ +//! Tests for reading channels with start position offset + +use tedium::ChannelPath; + +mod common; + +#[test] +fn test_read_channel_from_start_zero() { + // Reading from start position 0 should be identical to read_channel + let mut file = common::open_test_file(); + let channel = ChannelPath::new("structure", "ch1"); + + let mut output_normal = vec![0.0f64; 10]; + let mut output_from = vec![0.0f64; 10]; + + // Reset file position for second read + let mut file2 = common::open_test_file(); + + file.read_channel(&channel, &mut output_normal).unwrap(); + file2 + .read_channel_from(&channel, 0, &mut output_from) + .unwrap(); + + assert_eq!( + output_normal, output_from, + "Reading from position 0 should match normal read" + ); +} + +#[test] +fn test_read_channel_from_middle() { + // Read first 20 samples normally, then read from position 10 and compare + let mut file = common::open_test_file(); + let channel = ChannelPath::new("structure", "ch1"); + + let mut full_read = vec![0.0f64; 20]; + file.read_channel(&channel, &mut full_read).unwrap(); + + // Now read from position 10 + let mut file2 = common::open_test_file(); + let mut offset_read = vec![0.0f64; 10]; + file2 + .read_channel_from(&channel, 10, &mut offset_read) + .unwrap(); + + // The offset read should match the second half of the full read + assert_eq!( + &full_read[10..20], + &offset_read[..], + "Offset read should match corresponding portion of full read" + ); +} + +#[test] +fn test_read_channel_from_with_small_output() { + // Test that reading with offset respects output buffer size + let mut file = common::open_test_file(); + let channel = ChannelPath::new("structure", "ch1"); + + let mut full_read = vec![0.0f64; 30]; + file.read_channel(&channel, &mut full_read).unwrap(); + + // Read 5 samples starting from position 10 + let mut file2 = common::open_test_file(); + let mut offset_read = vec![0.0f64; 5]; + file2 + .read_channel_from(&channel, 10, &mut offset_read) + .unwrap(); + + assert_eq!( + &full_read[10..15], + &offset_read[..], + "Should read correct samples with small buffer" + ); +} + +#[test] +fn test_read_channel_from_beyond_data() { + // Test reading with start position beyond available data + let mut file = common::open_test_file(); + let channel = ChannelPath::new("structure", "ch1"); + + // Get channel length first + let channel_length = file.channel_length(&channel).unwrap(); + + // Try to read from beyond the end + let mut output = vec![0.0f64; 10]; + let result = file.read_channel_from(&channel, channel_length + 100, &mut output); + + // Should succeed but read 0 samples (output should remain unchanged) + assert!(result.is_ok(), "Reading beyond data should not error"); +} + +#[test] +fn test_read_channel_from_at_boundary() { + // Test reading starting exactly at a block boundary + // This test assumes we know the block structure, which we might not + // So we'll just test that it works correctly + let mut file = common::open_test_file(); + let channel = ChannelPath::new("structure", "ch1"); + + let mut full_read = vec![0.0f64; 100]; + file.read_channel(&channel, &mut full_read).unwrap(); + + // Read from various positions + for start_pos in [0, 10, 25, 50, 75] { + let mut file2 = common::open_test_file(); + let mut offset_read = vec![0.0f64; 10]; + file2 + .read_channel_from(&channel, start_pos, &mut offset_read) + .unwrap(); + + let end_pos = (start_pos as usize + 10).min(full_read.len()); + assert_eq!( + &full_read[start_pos as usize..end_pos], + &offset_read[..(end_pos - start_pos as usize)], + "Reading from position {} should match", + start_pos + ); + } +} + +#[test] +fn test_read_channel_from_different_types() { + // Test with different data types + let mut file = common::open_test_file(); + + // Test with f64 + let channel = ChannelPath::new("structure", "ch1"); + + // Read normally + let mut full_read = vec![0.0f64; 20]; + file.read_channel(&channel, &mut full_read).unwrap(); + + // Read with offset + let mut file2 = common::open_test_file(); + let mut offset_read = vec![0.0f64; 10]; + file2 + .read_channel_from(&channel, 5, &mut offset_read) + .unwrap(); + + assert_eq!(&full_read[5..15], &offset_read[..]); +} + +#[test] +fn test_read_channel_from_preserves_existing_behavior() { + // Ensure that the refactored read_channel still works correctly + let mut file = common::open_test_file(); + let channel = ChannelPath::new("structure", "ch1"); + + let mut output = vec![0.0f64; 50]; + let result = file.read_channel(&channel, &mut output); + + assert!( + result.is_ok(), + "read_channel should still work after refactoring" + ); +} + +// ============================================================================ +// Multi-channel offset tests +// ============================================================================ + +#[test] +fn test_read_channels_from_start_zero() { + // Reading from start position 0 should be identical to read_channels + let mut file = common::open_test_file(); + let channel1 = ChannelPath::new("structure", "ch1"); + let channel2 = ChannelPath::new("structure", "ch2"); + let channels = [&channel1, &channel2]; + + let mut output1_normal = vec![0.0f64; 10]; + let mut output2_normal = vec![0.0f64; 10]; + let mut outputs_normal: Vec<&mut [f64]> = vec![&mut output1_normal, &mut output2_normal]; + + let mut output1_from = vec![0.0f64; 10]; + let mut output2_from = vec![0.0f64; 10]; + let mut outputs_from: Vec<&mut [f64]> = vec![&mut output1_from, &mut output2_from]; + + let mut file2 = common::open_test_file(); + + file.read_channels(&channels, &mut outputs_normal).unwrap(); + file2 + .read_channels_from(&channels, 0, &mut outputs_from) + .unwrap(); + + assert_eq!( + output1_normal, output1_from, + "Channel 1: Reading from position 0 should match normal read" + ); + assert_eq!( + output2_normal, output2_from, + "Channel 2: Reading from position 0 should match normal read" + ); +} + +#[test] +fn test_read_channels_from_middle() { + // Read first 20 samples normally, then read from position 10 and compare + let mut file = common::open_test_file(); + let channel1 = ChannelPath::new("structure", "ch1"); + let channel2 = ChannelPath::new("structure", "ch2"); + let channels = [&channel1, &channel2]; + + let mut full_read1 = vec![0.0f64; 20]; + let mut full_read2 = vec![0.0f64; 20]; + let mut full_outputs: Vec<&mut [f64]> = vec![&mut full_read1, &mut full_read2]; + file.read_channels(&channels, &mut full_outputs).unwrap(); + + // Now read from position 10 + let mut file2 = common::open_test_file(); + let mut offset_read1 = vec![0.0f64; 10]; + let mut offset_read2 = vec![0.0f64; 10]; + let mut offset_outputs: Vec<&mut [f64]> = vec![&mut offset_read1, &mut offset_read2]; + file2 + .read_channels_from(&channels, 10, &mut offset_outputs) + .unwrap(); + + // The offset read should match the second half of the full read + assert_eq!( + &full_read1[10..20], + &offset_read1[..], + "Channel 1: Offset read should match corresponding portion of full read" + ); + assert_eq!( + &full_read2[10..20], + &offset_read2[..], + "Channel 2: Offset read should match corresponding portion of full read" + ); +} + +#[test] +fn test_read_channels_from_with_small_output() { + // Test that reading with offset respects output buffer size + let mut file = common::open_test_file(); + let channel1 = ChannelPath::new("structure", "ch1"); + let channel2 = ChannelPath::new("structure", "ch2"); + let channels = [&channel1, &channel2]; + + let mut full_read1 = vec![0.0f64; 30]; + let mut full_read2 = vec![0.0f64; 30]; + let mut full_outputs: Vec<&mut [f64]> = vec![&mut full_read1, &mut full_read2]; + file.read_channels(&channels, &mut full_outputs).unwrap(); + + // Read 5 samples starting from position 10 + let mut file2 = common::open_test_file(); + let mut offset_read1 = vec![0.0f64; 5]; + let mut offset_read2 = vec![0.0f64; 5]; + let mut offset_outputs: Vec<&mut [f64]> = vec![&mut offset_read1, &mut offset_read2]; + file2 + .read_channels_from(&channels, 10, &mut offset_outputs) + .unwrap(); + + assert_eq!( + &full_read1[10..15], + &offset_read1[..], + "Channel 1: Should read correct samples with small buffer" + ); + assert_eq!( + &full_read2[10..15], + &offset_read2[..], + "Channel 2: Should read correct samples with small buffer" + ); +} + +#[test] +fn test_read_channels_from_preserves_existing_behavior() { + // Ensure that the refactored read_channels still works correctly + let mut file = common::open_test_file(); + let channel1 = ChannelPath::new("structure", "ch1"); + let channel2 = ChannelPath::new("structure", "ch2"); + let channels = [&channel1, &channel2]; + + let mut output1 = vec![0.0f64; 50]; + let mut output2 = vec![0.0f64; 50]; + let mut outputs: Vec<&mut [f64]> = vec![&mut output1, &mut output2]; + + let result = file.read_channels(&channels, &mut outputs); + + assert!( + result.is_ok(), + "read_channels should still work after refactoring" + ); +} + +#[test] +fn test_read_channels_from_separate_blocks() { + // Test the specific scenario where channels are written in separate blocks + // This ensures per-channel skip tracking works correctly + use tedium::DataLayout; + + let mut file = common::get_empty_file(); + let mut writer = file.writer().unwrap(); + + let channel1 = tedium::ChannelPath::new("test", "ch1"); + let channel2 = tedium::ChannelPath::new("test", "ch2"); + + // Write channel 1 in first block (1000 samples) + let data1: Vec = (0..1000).map(|i| i as f64).collect(); + writer + .write_channels(&[&channel1], &data1, DataLayout::Contigious) + .unwrap(); + + // Write channel 2 in second block (1000 samples) + let data2: Vec = (1000..2000).map(|i| i as f64).collect(); + writer + .write_channels(&[&channel2], &data2, DataLayout::Contigious) + .unwrap(); + + // Write both channels together in third block (1000 samples each) + let data3: Vec = (2000..4000).map(|i| i as f64).collect(); + writer + .write_channels(&[&channel1, &channel2], &data3, DataLayout::Contigious) + .unwrap(); + + drop(writer); + + // Now test reading with offset + // Ch1 has: Block 0 (1000 samples), Block 2 (1000 samples) = 2000 total + // Ch2 has: Block 1 (1000 samples), Block 2 (1000 samples) = 2000 total + + // Read from position 500 + let mut output1 = vec![0.0f64; 100]; + let mut output2 = vec![0.0f64; 100]; + let mut outputs: Vec<&mut [f64]> = vec![&mut output1, &mut output2]; + + file.read_channels_from(&[&channel1, &channel2], 500, &mut outputs) + .unwrap(); + + // Ch1 should skip 500 in Block 0, read samples 500-599 from Block 0 + // Ch2 should skip 500 in Block 1, read samples 500-599 from Block 1 + assert_eq!( + &output1[0..10], + &[ + 500.0, 501.0, 502.0, 503.0, 504.0, 505.0, 506.0, 507.0, 508.0, 509.0 + ], + "Ch1 should read from position 500 in its data stream" + ); + assert_eq!( + &output2[0..10], + &[ + 1500.0, 1501.0, 1502.0, 1503.0, 1504.0, 1505.0, 1506.0, 1507.0, 1508.0, 1509.0 + ], + "Ch2 should read from position 500 in its data stream (which is 1500 in the original data)" + ); +}