diff --git a/pyhealth/processors/audio_processor.py b/pyhealth/processors/audio_processor.py index 17b4bf236..b938b0b44 100644 --- a/pyhealth/processors/audio_processor.py +++ b/pyhealth/processors/audio_processor.py @@ -134,6 +134,51 @@ def process(self, value: Union[str, Path]) -> Any: return waveform + def is_token(self) -> bool: + """Audio data is continuous (float-valued), not discrete tokens. + + Returns: + False, since audio waveforms and spectrograms are continuous signals. + """ + return False + + def schema(self) -> tuple[str, ...]: + """Returns the schema of the processed audio feature. + + The audio processor emits a single tensor (waveform or mel spectrogram). + + Returns: + ("value",) + """ + return ("value",) + + def dim(self) -> tuple[int, ...]: + """Number of dimensions for the output tensor. + + Returns: + (2,) for waveform output (channels, samples), or + (3,) for mel spectrogram output (channels, n_mels, time). + """ + if self.n_mels is not None: + return (3,) + return (2,) + + def spatial(self) -> tuple[bool, ...]: + """Whether each dimension of the output tensor is spatial. + + For waveform (channels, samples): channels is not spatial, samples is. + For mel spectrogram (channels, n_mels, time): channels is not spatial, + n_mels and time are. + + Returns: + Tuple of booleans for each axis. + """ + if self.n_mels is not None: + # (channels, n_mels, time) + return (False, True, True) + # (channels, samples) + return (False, True) + def __repr__(self) -> str: return ( f"AudioProcessor(sample_rate={self.sample_rate}, " diff --git a/pyhealth/processors/base_processor.py b/pyhealth/processors/base_processor.py index fb489fb85..48cbe26ae 100644 --- a/pyhealth/processors/base_processor.py +++ b/pyhealth/processors/base_processor.py @@ -52,6 +52,57 @@ def process(self, value: Any) -> Any: Processed value. """ pass + + def is_token(self) -> bool: + """Returns whether the output (in particular, the value tensor) of the processor + represents discrete token indices (True) or continuous values (False). This is used to + determine whether to apply token-based transformations (e.g. `nn.Embedding`) or + value-based augmentations (e.g. `nn.Linear`). + + Returns: + True if the output of the processor represents discrete token indices, False otherwise. + """ + raise NotImplementedError("is_token method is not implemented for this processor.") + + def schema(self) -> tuple[str, ...]: + """Returns the schema of the processed feature. For a processor that emits a single tensor, + this should just return `["value"]`. For a processor that emits a tuple of tensors, + this should return a tuple of the same length as the tuple, with the semantic name of each tensor, + such as `["time", "value"]`, `["value", "mask"]`, etc. + + Typical semantic names include: + - "value": the main processed tensor output of the processor + - "time": the time tensor output of the processor (mostly for StageNet) + - "mask": the mask tensor output of the processor (if applicable) + + Returns: + Tuple of semantic names corresponding to the output of the processor. + """ + raise NotImplementedError("Schema method is not implemented for this processor.") + + def dim(self) -> tuple[int, ...]: + """Number of dimensions (`Tensor.dim()`) for each output + tensor, in the same order as the output tuple. + + Returns: + Tuple of integers corresponding to the number of dimensions of each output tensor. + """ + raise NotImplementedError("dim method is not implemented for this processor.") + + def spatial(self) -> tuple[bool, ...]: + """Whether each dimension (axis) of the value tensor is spatial (i.e. corresponds to a spatial + axis like time, height, width, etc.) or not. This is used to determine how to apply + augmentations and other transformations that should only be applied to spatial dimensions. + + E.g. for CNN or RNN features, this would help determine which dimensions to apply spatial augmentations to, + and which dimensions to treat as channels or features. + + Returns: + Tuple of booleans corresponding to whether each axis of the value tensor is spatial or not. + """ + raise NotImplementedError("spatial method is not implemented for this processor.") + + class SampleProcessor(Processor): diff --git a/pyhealth/processors/deep_nested_sequence_processor.py b/pyhealth/processors/deep_nested_sequence_processor.py index 6cedd10f0..2d67633eb 100644 --- a/pyhealth/processors/deep_nested_sequence_processor.py +++ b/pyhealth/processors/deep_nested_sequence_processor.py @@ -185,6 +185,21 @@ def __repr__(self): f"max_inner_len={self._max_inner_len})" ) + def is_token(self) -> bool: + """Deep nested sequence codes are discrete token indices.""" + return True + + def schema(self) -> tuple[str, ...]: + return ("value",) + + def dim(self) -> tuple[int, ...]: + """Output is a 3D tensor (groups, visits, codes).""" + return (3,) + + def spatial(self) -> tuple[bool, ...]: + # Groups are not sequential; visits are temporal/spatial; codes-per-visit is an unordered set + return (False, True, False) + @register_processor("deep_nested_sequence_floats") class DeepNestedFloatsProcessor(FeatureProcessor): @@ -379,4 +394,19 @@ def __repr__(self): f"max_middle_len={self._max_middle_len}, " f"max_inner_len={self._max_inner_len}, " f"forward_fill={self.forward_fill})" - ) \ No newline at end of file + ) + + def is_token(self) -> bool: + """Deep nested float values are continuous, not discrete tokens.""" + return False + + def schema(self) -> tuple[str, ...]: + return ("value",) + + def dim(self) -> tuple[int, ...]: + """Output is a 3D tensor (groups, visits, features).""" + return (3,) + + def spatial(self) -> tuple[bool, ...]: + # Groups are not sequential; visits are temporal/spatial; features dimension is not + return (False, True, False) \ No newline at end of file diff --git a/pyhealth/processors/image_processor.py b/pyhealth/processors/image_processor.py index d174529a7..b0185c1ec 100644 --- a/pyhealth/processors/image_processor.py +++ b/pyhealth/processors/image_processor.py @@ -95,6 +95,40 @@ def process(self, value: Union[str, Path]) -> Any: img.load() # Avoid "too many open files" errors return self.transform(img) + def is_token(self) -> bool: + """Image data is continuous (float-valued pixel intensities), not discrete tokens. + + Returns: + False. + """ + return False + + def schema(self) -> tuple[str, ...]: + """Single tensor output. + + Returns: + ("value",) + """ + return ("value",) + + def dim(self) -> tuple[int, ...]: + """Output tensor has 3 dimensions: (C, H, W). + + Returns: + (3,) + """ + return (3,) + + def spatial(self) -> tuple[bool, ...]: + """Spatial axes for the output tensor (C, H, W). + + Channels are not spatial; height and width are. + + Returns: + (False, True, True) + """ + return (False, True, True) + def __repr__(self) -> str: return ( f"ImageLoadingProcessor(image_size={self.image_size}, " diff --git a/pyhealth/processors/label_processor.py b/pyhealth/processors/label_processor.py index ae8d1f8aa..969721995 100644 --- a/pyhealth/processors/label_processor.py +++ b/pyhealth/processors/label_processor.py @@ -40,6 +40,20 @@ def process(self, value: Any) -> torch.Tensor: def size(self): return 1 + def is_token(self) -> bool: + """Binary labels are continuous float targets for BCE loss.""" + return False + + def schema(self) -> tuple[str, ...]: + return ("value",) + + def dim(self) -> tuple[int, ...]: + """Output shape is (1,), so 1 dimension.""" + return (1,) + + def spatial(self) -> tuple[bool, ...]: + return (False,) + def __repr__(self): return f"BinaryLabelProcessor(label_vocab_size={len(self.label_vocab)})" @@ -72,6 +86,20 @@ def process(self, value: Any) -> torch.Tensor: def size(self): return len(self.label_vocab) + def is_token(self) -> bool: + """Multi-class labels are discrete token indices.""" + return True + + def schema(self) -> tuple[str, ...]: + return ("value",) + + def dim(self) -> tuple[int, ...]: + """Output is a scalar tensor (dim 0).""" + return (0,) + + def spatial(self) -> tuple[bool, ...]: + return () + def __repr__(self): return f"MultiClassLabelProcessor(label_vocab_size={len(self.label_vocab)})" @@ -115,6 +143,20 @@ def process(self, value: Any) -> torch.Tensor: def size(self): return len(self.label_vocab) + def is_token(self) -> bool: + """Multi-label indicators are continuous float targets for BCE loss.""" + return False + + def schema(self) -> tuple[str, ...]: + return ("value",) + + def dim(self) -> tuple[int, ...]: + """Output shape is (num_classes,), so 1 dimension.""" + return (1,) + + def spatial(self) -> tuple[bool, ...]: + return (False,) + def __repr__(self): return f"MultiLabelProcessor(label_vocab_size={len(self.label_vocab)})" @@ -131,5 +173,19 @@ def process(self, value: Any) -> torch.Tensor: def size(self): return 1 + def is_token(self) -> bool: + """Regression labels are continuous, not discrete tokens.""" + return False + + def schema(self) -> tuple[str, ...]: + return ("value",) + + def dim(self) -> tuple[int, ...]: + """Output shape is (1,), so 1 dimension.""" + return (1,) + + def spatial(self) -> tuple[bool, ...]: + return (False,) + def __repr__(self): return "RegressionLabelProcessor()" diff --git a/pyhealth/processors/nested_sequence_processor.py b/pyhealth/processors/nested_sequence_processor.py index 89da66e37..461575621 100644 --- a/pyhealth/processors/nested_sequence_processor.py +++ b/pyhealth/processors/nested_sequence_processor.py @@ -162,6 +162,21 @@ def __repr__(self): f"padding={self._padding})" ) + def is_token(self) -> bool: + """Nested sequence codes are discrete token indices.""" + return True + + def schema(self) -> tuple[str, ...]: + return ("value",) + + def dim(self) -> tuple[int, ...]: + """Output is a 2D tensor (visits, codes_per_visit).""" + return (2,) + + def spatial(self) -> tuple[bool, ...]: + # Visits (time) is spatial; codes-per-visit is an unordered set, not spatial + return (True, False) + @register_processor("nested_sequence_floats") class NestedFloatsProcessor(FeatureProcessor): @@ -341,3 +356,18 @@ def __repr__(self): f"forward_fill={self.forward_fill}, " f"padding={self._padding})" ) + + def is_token(self) -> bool: + """Nested float values are continuous, not discrete tokens.""" + return False + + def schema(self) -> tuple[str, ...]: + return ("value",) + + def dim(self) -> tuple[int, ...]: + """Output is a 2D tensor (visits, features).""" + return (2,) + + def spatial(self) -> tuple[bool, ...]: + # Visits (time) is spatial; features dimension is not + return (True, False) diff --git a/pyhealth/processors/sequence_processor.py b/pyhealth/processors/sequence_processor.py index 01162548e..d7a7b1ddf 100644 --- a/pyhealth/processors/sequence_processor.py +++ b/pyhealth/processors/sequence_processor.py @@ -71,5 +71,19 @@ def add(self, tokens: set[str]): def size(self): return len(self.code_vocab) + def is_token(self) -> bool: + """Sequence codes are discrete token indices.""" + return True + + def schema(self) -> tuple[str, ...]: + return ("value",) + + def dim(self) -> tuple[int, ...]: + """Output is a 1D tensor of code indices.""" + return (1,) + + def spatial(self) -> tuple[bool, ...]: + return (True,) + def __repr__(self): return f"SequenceProcessor(code_vocab_size={len(self.code_vocab)})" diff --git a/pyhealth/processors/stagenet_processor.py b/pyhealth/processors/stagenet_processor.py index 0441ba869..cce8819c5 100644 --- a/pyhealth/processors/stagenet_processor.py +++ b/pyhealth/processors/stagenet_processor.py @@ -191,6 +191,8 @@ def _encode_nested_codes(self, nested_codes: List[List[str]]) -> torch.Tensor: Pads all inner sequences to self._max_nested_len (global max). """ + assert self._max_nested_len is not None, "Max nested length must be set during fit()" + # Handle empty nested codes (no visits/events) # Return single padding token with shape (1, max_len) if len(nested_codes) == 0: @@ -219,6 +221,45 @@ def size(self) -> int: """Return vocabulary size.""" return len(self.code_vocab) + def is_token(self) -> bool: + """Code indices are discrete token indices.""" + return True + + def schema(self) -> tuple[str, ...]: + """Output is a tuple of (time_tensor, value_tensor).""" + return ("time", "value") + + def dim(self) -> tuple[int, ...]: + """Number of dimensions for each output tensor. + + Time tensor is 1D. Value tensor is 1D (flat) or 2D (nested). + Must be called after fit(). + + Returns: + (1, 1) for flat codes or (1, 2) for nested codes. + """ + if self._is_nested is None: + raise NotImplementedError( + "StageNetProcessor.dim() requires fit() to be called first " + "to determine whether codes are flat or nested." + ) + if self._is_nested: + return (1, 2) + return (1, 1) + + def spatial(self) -> tuple[bool, ...]: + """Whether each dimension of the value tensor is spatial.""" + if self._is_nested is None: + raise NotImplementedError( + "StageNetProcessor.spatial() requires fit() to be called first." + ) + if self._is_nested: + # (visits, codes_per_visit) - visits are sequential/spatial, + # codes_per_visit is an unordered set and not spatial + return (True, False) + # Flat codes: single sequence dimension is spatial + return (True,) + def __repr__(self): if self._is_nested: return ( @@ -369,6 +410,44 @@ def size(self): """Return feature dimension.""" return self._size + def is_token(self) -> bool: + """Numeric values are continuous, not discrete tokens.""" + return False + + def schema(self) -> tuple[str, ...]: + """Output is a tuple of (time_tensor, value_tensor).""" + return ("time", "value") + + def dim(self) -> tuple[int, ...]: + """Number of dimensions for each output tensor. + + Time tensor is 1D. Value tensor is 1D (flat) or 2D (nested). + Must be called after fit(). + + Returns: + (1, 1) for flat values or (1, 2) for nested values. + """ + if self._is_nested is None: + raise NotImplementedError( + "StageNetTensorProcessor.dim() requires fit() to be called first " + "to determine whether values are flat or nested." + ) + if self._is_nested: + return (1, 2) + return (1, 1) + + def spatial(self) -> tuple[bool, ...]: + """Whether each dimension of the value tensor is spatial.""" + if self._is_nested is None: + raise NotImplementedError( + "StageNetTensorProcessor.spatial() requires fit() to be called first." + ) + if self._is_nested: + # (time_steps, features) - time is spatial, features are not + return (True, False) + # Flat: single sequence dimension is spatial + return (True,) + def __repr__(self): return ( f"StageNetTensorProcessor(is_nested={self._is_nested}, " diff --git a/pyhealth/processors/tensor_processor.py b/pyhealth/processors/tensor_processor.py index b74b98ac5..b1270051d 100644 --- a/pyhealth/processors/tensor_processor.py +++ b/pyhealth/processors/tensor_processor.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, Dict, Iterable, Optional import torch @@ -21,15 +21,41 @@ class TensorProcessor(FeatureProcessor): - torch.Tensor with appropriate shape and dtype """ - def __init__(self, dtype: torch.dtype = torch.float32): + def __init__( + self, + dtype: torch.dtype = torch.float32, + spatial_dims: Optional[tuple[bool, ...]] = None, + ): """ Initialize the TensorProcessor. Args: dtype: The desired torch data type for the output tensor. Default is torch.float32. + spatial_dims: Tuple of booleans indicating which dimensions are spatial. + If None, defaults to all False. Default is None. """ self.dtype = dtype + self._n_dim = None + self._spatial_dims = spatial_dims + + def fit(self, samples: Iterable[Dict[str, Any]], field: str) -> None: + """Infer n_dim from the first valid sample. + + Args: + samples: Iterable of sample dictionaries. + field: The field name to extract from samples. + """ + for sample in samples: + if field in sample and sample[field] is not None: + value = sample[field] + tensor = ( + value.detach().clone() + if isinstance(value, torch.Tensor) + else torch.tensor(value, dtype=self.dtype) + ) + self._n_dim = tensor.dim() + break def process(self, value: Any) -> torch.Tensor: """ @@ -57,6 +83,48 @@ def size(self) -> None: """ return None + def is_token(self) -> bool: + """Whether the output tensor represents discrete token indices, inferred from dtype. + + Returns: + True if dtype is integer (discrete tokens), False if floating point (continuous). + """ + return not self.dtype.is_floating_point + + def schema(self) -> tuple[str, ...]: + return ("value",) + + def dim(self) -> tuple[int, ...]: + """Number of dimensions for the output tensor. + + Returns: + (n_dim,) + + Raises: + NotImplementedError: If n_dim was not provided and fit() was not called. + """ + if self._n_dim is None: + raise NotImplementedError( + "TensorProcessor cannot determine n_dim automatically. " + "Call fit() first." + ) + return (self._n_dim,) + + def spatial(self) -> tuple[bool, ...]: + """Whether each dimension of the output tensor is spatial. + + If spatial_dims was provided at init, returns that. Otherwise defaults + to all False based on n_dim. + """ + if self._spatial_dims is not None: + return self._spatial_dims + if self._n_dim is None: + raise NotImplementedError( + "TensorProcessor cannot determine spatial dims. " + "Call fit() first." + ) + return tuple(False for _ in range(self._n_dim)) + def __repr__(self) -> str: """ String representation of the processor. diff --git a/pyhealth/processors/timeseries_processor.py b/pyhealth/processors/timeseries_processor.py index f5abcfae7..e761e8035 100644 --- a/pyhealth/processors/timeseries_processor.py +++ b/pyhealth/processors/timeseries_processor.py @@ -102,6 +102,21 @@ def size(self): # Size equals number of features, unknown until first process return self.n_features + def is_token(self) -> bool: + """Time series values are continuous, not discrete tokens.""" + return False + + def schema(self) -> tuple[str, ...]: + return ("value",) + + def dim(self) -> tuple[int, ...]: + """Output is a 2D tensor (time_steps, features).""" + return (2,) + + def spatial(self) -> tuple[bool, ...]: + # Time dimension is spatial; feature dimension is not + return (True, False) + def __repr__(self): return ( f"TimeSeriesProcessor(sampling_rate={self.sampling_rate}, "