Source code for pyhealth.processors.nested_sequence_processor

from typing import Any, Dict, List, Iterable

import torch

from . import register_processor
from .base_processor import FeatureProcessor, TokenProcessorInterface


[docs]@register_processor("nested_sequence") class NestedSequenceProcessor(FeatureProcessor, TokenProcessorInterface): """ Feature processor for nested categorical sequences with vocabulary. Handles nested sequences like drug recommendation history where each sample contains a list of visits, and each visit contains a list of codes: [["code1", "code2"], ["code3"], ["code4", "code5", "code6"]] The processor: 1. Builds a vocabulary from all codes across all samples 2. Encodes codes to indices 3. Pads inner sequences to the maximum sequence length found during fit 4. Returns a 2D tensor of shape (num_visits, max_codes_per_visit) Special tokens: - <pad>: 0 for padding - <unk>: 1 for unknown codes Args: padding: Additional padding to add on top of the observed maximum inner sequence length. The actual padding length will be observed_max + padding. This ensures the processor can handle sequences longer than those in the training data. Default: 0 (no extra padding). Examples: >>> processor = NestedSequenceProcessor() >>> # During fit, determines max inner sequence length >>> samples = [ ... {"codes": [["A", "B"], ["C", "D", "E"]]}, ... {"codes": [["F"]]} ... ] >>> processor.fit(samples, "codes") >>> # Process nested sequence (observed_max=3, default padding=0, total=3) >>> result = processor.process([["A", "B"], ["C"]]) >>> result.shape # (2, 3) - 2 visits, padded to observed_max """ def __init__(self, padding: int = 0): self.code_vocab: Dict[Any, int] = {"<pad>": self.PAD, "<unk>": self.UNK} self._next_index = 2 self._max_inner_len = 1 # Maximum length of inner sequences self._padding = padding # Additional padding beyond observed max
[docs] def fit(self, samples: Iterable[Dict[str, Any]], field: str) -> None: """Build vocabulary and determine maximum inner sequence length. Args: samples: List of sample dictionaries. field: The field name containing nested sequences. """ max_inner_len = 0 for sample in samples: if field in sample and sample[field] is not None: nested_seq = sample[field] # Nested sequences: [["A", "B"], ["C"], ...] if isinstance(nested_seq, list): for inner_seq in nested_seq: if isinstance(inner_seq, list): # Track max inner length max_inner_len = max(max_inner_len, len(inner_seq)) # Build vocabulary for code in inner_seq: if code is not None and code not in self.code_vocab: self.code_vocab[code] = self._next_index self._next_index += 1 # Store max inner length: add user-specified padding to observed maximum # This ensures the processor can handle sequences longer than those in training data observed_max = max(1, max_inner_len) self._max_inner_len = observed_max + self._padding
[docs] def remove(self, tokens: set[str]): """Remove specified vocabularies from the processor.""" keep = set(self.code_vocab.keys()) - tokens | {"<pad>", "<unk>"} order = [k for k, v in sorted(self.code_vocab.items(), key=lambda x: x[1]) if k in keep] self.code_vocab = { k : i for i, k in enumerate(order) }
[docs] def retain(self, tokens: set[str]): """Retain only the specified vocabularies in the processor.""" keep = set(self.code_vocab.keys()) & tokens | {"<pad>", "<unk>"} order = [k for k, v in sorted(self.code_vocab.items(), key=lambda x: x[1]) if k in keep] self.code_vocab = { k : i for i, k in enumerate(order) }
[docs] def add(self, tokens: set[str]): """Add specified vocabularies to the processor.""" i = len(self.code_vocab) for token in tokens: if token not in self.code_vocab: self.code_vocab[token] = i i += 1
[docs] def tokens(self) -> set[str]: """Return the set of tokens in the processor's vocabulary.""" return set(self.code_vocab.keys())
[docs] def process(self, value: List[List[Any]]) -> torch.Tensor: """Process nested sequence into padded 2D tensor. Empty or None visits are filled with padding tokens. Args: value: Nested list of codes [[code1, code2], [code3], ...] Returns: 2D tensor of shape (num_visits, max_inner_len) with code indices """ # Handle empty nested sequence if not value or len(value) == 0: pad_token = self.code_vocab["<pad>"] padded_row = [pad_token] * self._max_inner_len return torch.tensor([padded_row], dtype=torch.long) encoded_sequences = [] pad_token = self.code_vocab["<pad>"] for inner_seq in value: # Check if this visit is empty/null - use padding tokens if inner_seq is None or len(inner_seq) == 0: encoded_sequences.append([pad_token] * self._max_inner_len) continue indices = [] # Encode each code in the inner sequence for code in inner_seq: if code is None or code not in self.code_vocab: indices.append(self.code_vocab["<unk>"]) else: indices.append(self.code_vocab[code]) # Pad to maximum inner length while len(indices) < self._max_inner_len: indices.append(pad_token) encoded_sequences.append(indices) return torch.tensor(encoded_sequences, dtype=torch.long)
def vocab_size(self) -> int: """Return the size of the processor's vocabulary.""" return len(self.code_vocab)
[docs] def size(self) -> int: """Return max inner length (embedding dimension) for unified API.""" return self._max_inner_len
[docs] def vocab_size(self) -> int: """Return vocabulary size.""" return len(self.code_vocab)
def __repr__(self): return ( f"NestedSequenceProcessor(" f"vocab_size={len(self.code_vocab)}, " f"max_inner_len={self._max_inner_len}, " f"padding={self._padding})" )
[docs] def is_token(self) -> bool: """Nested sequence codes are discrete token indices.""" return True
[docs] def schema(self) -> tuple[str, ...]: return ("value",)
[docs] def dim(self) -> tuple[int, ...]: """Output is a 2D tensor (visits, codes_per_visit).""" return (2,)
[docs] def spatial(self) -> tuple[bool, ...]: # Visits (time) is spatial; codes-per-visit is an unordered set, not spatial return (True, False)
[docs]@register_processor("nested_sequence_floats") class NestedFloatsProcessor(FeatureProcessor): """ Feature processor for nested numerical sequences without vocabulary. Handles nested sequences of floats/numerical values where each sample contains a list of visits, and each visit contains a list of values: [[1.5, 2.3], [4.1], [0.9, 1.2, 3.4]] The processor: 1. Determines the maximum inner sequence length during fit 2. Optionally applies forward-fill for missing values 3. Returns a 2D tensor of shape (num_visits, max_values_per_visit) Args: forward_fill: If True, applies forward fill for NaN values across time steps and empty visits. If False, sets null values to 0. Default is True. padding: Additional padding to add on top of the observed maximum inner sequence length. The actual padding length will be observed_max + padding. This ensures the processor can handle sequences longer than those in the training data. Default: 0 (no extra padding). Examples: >>> processor = NestedFloatsProcessor() >>> # During fit, determines max inner sequence length >>> samples = [ ... {"values": [[1.0, 2.0], [3.0, 4.0, 5.0]]}, ... {"values": [[6.0]]} ... ] >>> processor.fit(samples, "values") >>> # Process nested sequence (observed_max=3, default padding=0, total=3) >>> result = processor.process([[1.0, 2.0], [3.0]]) >>> result.shape # (2, 3) - 2 visits, padded to observed_max """ def __init__(self, forward_fill: bool = True, padding: int = 0): self._max_inner_len = 1 # Maximum length of inner sequences self.forward_fill = forward_fill self._padding = padding # Additional padding beyond observed max
[docs] def fit(self, samples: Iterable[Dict[str, Any]], field: str) -> None: """Determine maximum inner sequence length. Args: samples: List of sample dictionaries. field: The field name containing nested sequences. """ max_inner_len = 0 for sample in samples: if field in sample and sample[field] is not None: nested_seq = sample[field] # Nested sequences: [[1.0, 2.0], [3.0], ...] if isinstance(nested_seq, list): for inner_seq in nested_seq: if isinstance(inner_seq, list): # Track max inner length max_inner_len = max(max_inner_len, len(inner_seq)) # Store max inner length: add user-specified padding to observed maximum # This ensures the processor can handle sequences longer than those in training data observed_max = max(1, max_inner_len) self._max_inner_len = observed_max + self._padding
[docs] def process(self, value: List[List[float]]) -> torch.Tensor: """Process nested numerical sequence with optional forward fill. For missing values (None or empty visits): - If forward_fill=True: uses forward fill from last valid visit - If forward_fill=False: sets null values to 0.0 (for masking) Args: value: Nested list of floats [[1.0, 2.0], [3.0], ...] Returns: 2D tensor of shape (num_visits, max_inner_len) with float values """ import numpy as np # Handle empty nested sequence if not value or len(value) == 0: if self.forward_fill: return torch.full( (1, self._max_inner_len), float("nan"), dtype=torch.float ) else: return torch.zeros((1, self._max_inner_len), dtype=torch.float) encoded_sequences = [] last_valid_values = None for inner_seq in value: # Check if this visit is empty/null if inner_seq is None or len(inner_seq) == 0: if self.forward_fill and last_valid_values is not None: # Forward fill: use last valid visit's values encoded_sequences.append(last_valid_values.copy()) else: # No forward fill or no prior visit, use zeros encoded_sequences.append([0.0] * self._max_inner_len) continue values = [] # Convert each value to float for val in inner_seq: if val is None: if self.forward_fill: values.append(float("nan")) else: values.append(0.0) else: try: values.append(float(val)) except (ValueError, TypeError): if self.forward_fill: values.append(float("nan")) else: values.append(0.0) # Pad to maximum inner length while len(values) < self._max_inner_len: if self.forward_fill: values.append(float("nan")) else: values.append(0.0) # Store as last valid values for forward fill last_valid_values = values.copy() encoded_sequences.append(values) # Convert to numpy array values_array = np.array(encoded_sequences, dtype=float) # Apply forward fill for NaN values if enabled # Forward fill happens in two passes: # 1. Across visits (column-wise): missing values get previous visit # 2. Within each visit (row-wise): pad positions get last valid value if self.forward_fill: # First: forward fill across visits (column-wise) # For each feature dimension, fill NaN with previous visit's value for feature_idx in range(values_array.shape[1]): last_value = None for visit_idx in range(values_array.shape[0]): if not np.isnan(values_array[visit_idx, feature_idx]): last_value = values_array[visit_idx, feature_idx] elif last_value is not None: values_array[visit_idx, feature_idx] = last_value # Second: forward fill within each visit (row-wise) # For padding positions, fill with last valid value in that visit for visit_idx in range(values_array.shape[0]): last_value = None for feature_idx in range(values_array.shape[1]): if not np.isnan(values_array[visit_idx, feature_idx]): last_value = values_array[visit_idx, feature_idx] elif last_value is not None: values_array[visit_idx, feature_idx] = last_value # Third: any remaining NaN values (first visit with no prior) # are set to 0.0 values_array = np.nan_to_num(values_array, nan=0.0) return torch.tensor(values_array, dtype=torch.float)
[docs] def size(self) -> int: """Return max inner length (embedding dimension) for unified API.""" return self._max_inner_len
def __repr__(self): return ( f"NestedFloatsProcessor(" f"max_inner_len={self._max_inner_len}, " f"forward_fill={self.forward_fill}, " f"padding={self._padding})" )
[docs] def is_token(self) -> bool: """Nested float values are continuous, not discrete tokens.""" return False
[docs] def schema(self) -> tuple[str, ...]: return ("value",)
[docs] def dim(self) -> tuple[int, ...]: """Output is a 2D tensor (visits, features).""" return (2,)
[docs] def spatial(self) -> tuple[bool, ...]: # Visits (time) is spatial; features dimension is not return (True, False)