From 7544b4d1f38eae828e63c04c691cda5ed077842a Mon Sep 17 00:00:00 2001 From: Yongda Fan Date: Thu, 5 Feb 2026 21:50:04 -0600 Subject: [PATCH 1/2] Change UNK to 1 --- pyhealth/processors/base_processor.py | 11 +++-- .../deep_nested_sequence_processor.py | 42 +++++++++--------- .../processors/nested_sequence_processor.py | 43 +++++++++---------- pyhealth/processors/sequence_processor.py | 39 +++++++++-------- pyhealth/processors/stagenet_processor.py | 41 +++++++++--------- 5 files changed, 89 insertions(+), 87 deletions(-) diff --git a/pyhealth/processors/base_processor.py b/pyhealth/processors/base_processor.py index fb9841f8c..fb489fb85 100644 --- a/pyhealth/processors/base_processor.py +++ b/pyhealth/processors/base_processor.py @@ -93,24 +93,27 @@ def process(self, samples: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ pass -class VocabMixin(ABC): +class TokenProcessorInterface(ABC): """ Base class for feature processors that build a vocabulary. Provides a common interface for accessing vocabulary-related information. """ + + PAD = 0 + UNK = 1 @abstractmethod - def remove(self, vocabularies: set[str]): + def remove(self, tokens: set[str]): """Remove specified vocabularies from the processor.""" pass @abstractmethod - def retain(self, vocabularies: set[str]): + def retain(self, tokens: set[str]): """Retain only the specified vocabularies in the processor.""" pass @abstractmethod - def add(self, vocabularies: set[str]): + def add(self, tokens: set[str]): """Add specified vocabularies to the processor.""" pass \ No newline at end of file diff --git a/pyhealth/processors/deep_nested_sequence_processor.py b/pyhealth/processors/deep_nested_sequence_processor.py index 321a7334d..6cedd10f0 100644 --- a/pyhealth/processors/deep_nested_sequence_processor.py +++ b/pyhealth/processors/deep_nested_sequence_processor.py @@ -3,11 +3,11 @@ import torch from . import register_processor -from .base_processor import FeatureProcessor, VocabMixin +from .base_processor import FeatureProcessor, TokenProcessorInterface @register_processor("deep_nested_sequence") -class DeepNestedSequenceProcessor(FeatureProcessor, VocabMixin): +class DeepNestedSequenceProcessor(FeatureProcessor, TokenProcessorInterface): """ Feature processor for deeply nested categorical sequences with vocabulary. @@ -30,8 +30,8 @@ class DeepNestedSequenceProcessor(FeatureProcessor, VocabMixin): (num_groups, max_visits_per_group, max_codes_per_visit) Special tokens: - - : -1 for unknown codes - : 0 for padding + - : 1 for unknown codes Examples: >>> processor = DeepNestedSequenceProcessor() @@ -45,9 +45,8 @@ class DeepNestedSequenceProcessor(FeatureProcessor, VocabMixin): """ def __init__(self): - # -1 for for ease of boolean arithmetic > 0, > -1, etc. - self.code_vocab: Dict[Any, int] = {"": -1, "": 0} - self._next_index = 1 + self.code_vocab: Dict[Any, int] = {"": self.PAD, "": self.UNK} + self._next_index = 2 self._max_middle_len = 1 # Maximum length of middle sequences (e.g. visits) self._max_inner_len = 1 # Maximum length of inner sequences (e.g. codes per visit) @@ -86,26 +85,27 @@ def fit(self, samples: Iterable[Dict[str, Any]], field: str) -> None: self._max_middle_len = max(1, max_middle_len) self._max_inner_len = max(1, max_inner_len) - def remove(self, vocabularies: set[str]): + def remove(self, tokens: set[str]): """Remove specified vocabularies from the processor.""" - vocab = list(set(self.code_vocab.keys()) - vocabularies - {"", ""}) - self.code_vocab = {"": 0, "": -1} - for i, v in enumerate(vocab): - self.code_vocab[v] = i + 1 + keep = set(self.code_vocab.keys()) - tokens | {"", ""} + order = [k for k, v in sorted(self.code_vocab.items(), key=lambda x: x[1]) if k in keep] + + self.code_vocab = { k : i for i, k in enumerate(order) } - def retain(self, vocabularies: set[str]): + def retain(self, tokens: set[str]): """Retain only the specified vocabularies in the processor.""" - vocab = list(set(self.code_vocab.keys()) & vocabularies) - self.code_vocab = {"": 0, "": -1} - for i, v in enumerate(vocab): - self.code_vocab[v] = i + 1 + keep = set(self.code_vocab.keys()) & tokens | {"", ""} + order = [k for k, v in sorted(self.code_vocab.items(), key=lambda x: x[1]) if k in keep] + + self.code_vocab = { k : i for i, k in enumerate(order) } - def add(self, vocabularies: set[str]): + def add(self, tokens: set[str]): """Add specified vocabularies to the processor.""" - vocab = list(set(self.code_vocab.keys()) | vocabularies - {"", ""}) - self.code_vocab = {"": 0, "": -1} - for i, v in enumerate(vocab): - self.code_vocab[v] = i + 1 + i = len(self.code_vocab) + for token in tokens: + if token not in self.code_vocab: + self.code_vocab[token] = i + i += 1 def process(self, value: List[List[List[Any]]]) -> torch.Tensor: """Process deep nested sequence into padded 3D tensor. diff --git a/pyhealth/processors/nested_sequence_processor.py b/pyhealth/processors/nested_sequence_processor.py index a0cde5e03..89da66e37 100644 --- a/pyhealth/processors/nested_sequence_processor.py +++ b/pyhealth/processors/nested_sequence_processor.py @@ -3,11 +3,11 @@ import torch from . import register_processor -from .base_processor import FeatureProcessor, VocabMixin +from .base_processor import FeatureProcessor, TokenProcessorInterface @register_processor("nested_sequence") -class NestedSequenceProcessor(FeatureProcessor, VocabMixin): +class NestedSequenceProcessor(FeatureProcessor, TokenProcessorInterface): """ Feature processor for nested categorical sequences with vocabulary. @@ -22,8 +22,8 @@ class NestedSequenceProcessor(FeatureProcessor, VocabMixin): 4. Returns a 2D tensor of shape (num_visits, max_codes_per_visit) Special tokens: - - : -1 for unknown codes - : 0 for padding + - : 1 for unknown codes Args: padding: Additional padding to add on top of the observed maximum inner @@ -45,9 +45,8 @@ class NestedSequenceProcessor(FeatureProcessor, VocabMixin): """ def __init__(self, padding: int = 0): - # will be set to len(vocab) after fit - self.code_vocab: Dict[Any, int] = {"": None, "": 0} - self._next_index = 1 + self.code_vocab: Dict[Any, int] = {"": self.PAD, "": self.UNK} + self._next_index = 2 self._max_inner_len = 1 # Maximum length of inner sequences self._padding = padding # Additional padding beyond observed max @@ -82,27 +81,27 @@ def fit(self, samples: Iterable[Dict[str, Any]], field: str) -> None: observed_max = max(1, max_inner_len) self._max_inner_len = observed_max + self._padding - # Set token to len(vocab) - 1 after building vocabulary - # (-1 because is already in vocab) - self.code_vocab[""] = len(self.code_vocab) - 1 - - def remove(self, vocabularies: set[str]): + def remove(self, tokens: set[str]): """Remove specified vocabularies from the processor.""" - vocab = list(set(self.code_vocab.keys()) - vocabularies - {"", ""}) - vocab = [""] + vocab + [""] - self.code_vocab = {v: i for i, v in enumerate(vocab)} + keep = set(self.code_vocab.keys()) - tokens | {"", ""} + order = [k for k, v in sorted(self.code_vocab.items(), key=lambda x: x[1]) if k in keep] + + self.code_vocab = { k : i for i, k in enumerate(order) } - def retain(self, vocabularies: set[str]): + def retain(self, tokens: set[str]): """Retain only the specified vocabularies in the processor.""" - vocab = list(set(self.code_vocab.keys()) & vocabularies) - vocab = [""] + vocab + [""] - self.code_vocab = {v: i for i, v in enumerate(vocab)} + keep = set(self.code_vocab.keys()) & tokens | {"", ""} + order = [k for k, v in sorted(self.code_vocab.items(), key=lambda x: x[1]) if k in keep] + + self.code_vocab = { k : i for i, k in enumerate(order) } - def add(self, vocabularies: set[str]): + def add(self, tokens: set[str]): """Add specified vocabularies to the processor.""" - vocab = list(set(self.code_vocab.keys()) | vocabularies - {"", ""}) - vocab = [""] + vocab + [""] - self.code_vocab = {v: i for i, v in enumerate(vocab)} + i = len(self.code_vocab) + for token in tokens: + if token not in self.code_vocab: + self.code_vocab[token] = i + i += 1 def process(self, value: List[List[Any]]) -> torch.Tensor: """Process nested sequence into padded 2D tensor. diff --git a/pyhealth/processors/sequence_processor.py b/pyhealth/processors/sequence_processor.py index 4355e7b8b..01162548e 100644 --- a/pyhealth/processors/sequence_processor.py +++ b/pyhealth/processors/sequence_processor.py @@ -3,11 +3,11 @@ import torch from . import register_processor -from .base_processor import FeatureProcessor, VocabMixin +from .base_processor import FeatureProcessor, TokenProcessorInterface @register_processor("sequence") -class SequenceProcessor(FeatureProcessor, VocabMixin): +class SequenceProcessor(FeatureProcessor, TokenProcessorInterface): """ Feature processor for encoding categorical sequences (e.g., medical codes) into numerical indices. @@ -16,9 +16,8 @@ class SequenceProcessor(FeatureProcessor, VocabMixin): """ def __init__(self): - # will be set to len(vocab) after fit - self.code_vocab: Dict[Any, int] = {"": 0} - self._next_index = 1 + self.code_vocab: Dict[Any, int] = {"": self.PAD, "": self.UNK} + self._next_index = 2 def fit(self, samples: Iterable[Dict[str, Any]], field: str) -> None: for sample in samples: @@ -29,8 +28,6 @@ def fit(self, samples: Iterable[Dict[str, Any]], field: str) -> None: self.code_vocab[token] = self._next_index self._next_index += 1 - self.code_vocab[""] = len(self.code_vocab) - def process(self, value: Any) -> torch.Tensor: """Process token value(s) into tensor of indices. @@ -49,23 +46,27 @@ def process(self, value: Any) -> torch.Tensor: return torch.tensor(indices, dtype=torch.long) - def remove(self, vocabularies: set[str]): + def remove(self, tokens: set[str]): """Remove specified vocabularies from the processor.""" - vocab = list(set(self.code_vocab.keys()) - vocabularies - {"", ""}) - vocab = [""] + vocab + [""] - self.code_vocab = {v: i for i, v in enumerate(vocab)} + keep = set(self.code_vocab.keys()) - tokens | {"", ""} + order = [k for k, v in sorted(self.code_vocab.items(), key=lambda x: x[1]) if k in keep] + + self.code_vocab = { k : i for i, k in enumerate(order) } - def retain(self, vocabularies: set[str]): + def retain(self, tokens: set[str]): """Retain only the specified vocabularies in the processor.""" - vocab = list(set(self.code_vocab.keys()) & vocabularies) - vocab = [""] + vocab + [""] - self.code_vocab = {v: i for i, v in enumerate(vocab)} + keep = set(self.code_vocab.keys()) & tokens | {"", ""} + order = [k for k, v in sorted(self.code_vocab.items(), key=lambda x: x[1]) if k in keep] + + self.code_vocab = { k : i for i, k in enumerate(order) } - def add(self, vocabularies: set[str]): + def add(self, tokens: set[str]): """Add specified vocabularies to the processor.""" - vocab = list(set(self.code_vocab.keys()) | vocabularies - {"", ""}) - vocab = [""] + vocab + [""] - self.code_vocab = {v: i for i, v in enumerate(vocab)} + i = len(self.code_vocab) + for token in tokens: + if token not in self.code_vocab: + self.code_vocab[token] = i + i += 1 def size(self): return len(self.code_vocab) diff --git a/pyhealth/processors/stagenet_processor.py b/pyhealth/processors/stagenet_processor.py index 56a4b762f..0441ba869 100644 --- a/pyhealth/processors/stagenet_processor.py +++ b/pyhealth/processors/stagenet_processor.py @@ -3,11 +3,11 @@ import torch from . import register_processor -from .base_processor import FeatureProcessor, VocabMixin +from .base_processor import FeatureProcessor, TokenProcessorInterface @register_processor("stagenet") -class StageNetProcessor(FeatureProcessor, VocabMixin): +class StageNetProcessor(FeatureProcessor, TokenProcessorInterface): """ Feature processor for StageNet CODE inputs with coupled value/time data. @@ -55,9 +55,8 @@ class StageNetProcessor(FeatureProcessor, VocabMixin): """ def __init__(self, padding: int = 0): - # will be set to len(vocab) after fit - self.code_vocab: Dict[Any, int] = {"": None, "": 0} - self._next_index = 1 + self.code_vocab: Dict[Any, int] = {"": self.PAD, "": self.UNK} + self._next_index = 2 self._is_nested = None # Will be determined during fit # Max inner sequence length for nested codes self._max_nested_len = None @@ -118,27 +117,27 @@ def fit(self, samples: Iterable[Dict[str, Any]], field: str) -> None: observed_max = max(1, max_inner_len) self._max_nested_len = observed_max + self._padding - # Set token to the next available index - # Since is already in the vocab dict, we use _next_index - self.code_vocab[""] = self._next_index - - def remove(self, vocabularies: set[str]): + def remove(self, tokens: set[str]): """Remove specified vocabularies from the processor.""" - vocab = list(set(self.code_vocab.keys()) - vocabularies - {"", ""}) - vocab = [""] + vocab + [""] - self.code_vocab = {v: i for i, v in enumerate(vocab)} + keep = set(self.code_vocab.keys()) - tokens | {"", ""} + order = [k for k, v in sorted(self.code_vocab.items(), key=lambda x: x[1]) if k in keep] + + self.code_vocab = { k : i for i, k in enumerate(order) } - def retain(self, vocabularies: set[str]): + def retain(self, tokens: set[str]): """Retain only the specified vocabularies in the processor.""" - vocab = list(set(self.code_vocab.keys()) & vocabularies) - vocab = [""] + vocab + [""] - self.code_vocab = {v: i for i, v in enumerate(vocab)} + keep = set(self.code_vocab.keys()) & tokens | {"", ""} + order = [k for k, v in sorted(self.code_vocab.items(), key=lambda x: x[1]) if k in keep] + + self.code_vocab = { k : i for i, k in enumerate(order) } - def add(self, vocabularies: set[str]): + def add(self, tokens: set[str]): """Add specified vocabularies to the processor.""" - vocab = list(set(self.code_vocab.keys()) | vocabularies - {"", ""}) - vocab = [""] + vocab + [""] - self.code_vocab = {v: i for i, v in enumerate(vocab)} + i = len(self.code_vocab) + for token in tokens: + if token not in self.code_vocab: + self.code_vocab[token] = i + i += 1 def process( self, value: Tuple[Optional[List], List] From 43540c59c8800272ef13d08f2ee2e5d8b1277868 Mon Sep 17 00:00:00 2001 From: Yongda Fan Date: Thu, 5 Feb 2026 21:58:43 -0600 Subject: [PATCH 2/2] Fix test --- tests/core/test_stagenet_processor.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tests/core/test_stagenet_processor.py b/tests/core/test_stagenet_processor.py index 0eda7d438..6e217dc7d 100644 --- a/tests/core/test_stagenet_processor.py +++ b/tests/core/test_stagenet_processor.py @@ -21,7 +21,7 @@ class TestStageNetProcessor(unittest.TestCase): """Tests for StageNetProcessor (categorical codes).""" def test_unknown_token_index(self): - """Test that token is len(vocab) - 1, not -1.""" + """Test that token is at index 1.""" processor = StageNetProcessor() samples = [ {"data": ([0.0, 1.0], [["A", "B"], ["C", "D", "E"]])}, @@ -29,9 +29,8 @@ def test_unknown_token_index(self): ] processor.fit(samples, "data") - # should be len(vocab) - 1 (last index) - expected_unk_idx = len(processor.code_vocab) - 1 - self.assertEqual(processor.code_vocab[""], expected_unk_idx) + # should be at index 1 (following at 0) + self.assertEqual(processor.code_vocab[""], 1) # must be >= 0 for nn.Embedding compatibility self.assertGreaterEqual(processor.code_vocab[""], 0) @@ -40,9 +39,8 @@ def test_unknown_token_index(self): self.assertEqual(processor.code_vocab[""], 0) # Verify vocab size includes both special tokens - # Vocab: , , A, B, C, D, E, F = 8 tokens + # Vocab: , , A, B, C, D, E, F = 8 tokens self.assertEqual(len(processor.code_vocab), 8) - self.assertEqual(processor.code_vocab[""], 7) def test_unknown_token_embedding_compatibility(self): """Test that index works with nn.Embedding."""