"""TemporalTimeseriesProcessor — a TemporalFeatureProcessor that wraps the
existing TimeseriesProcessor but **preserves** timestamps in the output dict
instead of discarding them after resampling.
Why: The original TimeseriesProcessor returns a plain ``Tensor (S, F)`` with
no timestamps — the temporal information is silently consumed during uniform
resampling. This wrapper exposes those resampled timestamps so that
``UnifiedMultimodalEmbeddingModel`` can sort and align events across modalities
on a shared timeline.
"""
from datetime import datetime, timedelta
from typing import Any, Iterable, Dict, List, Tuple
import numpy as np
import torch
from . import register_processor
from .base_processor import ModalityType, TemporalFeatureProcessor
[docs]@register_processor("temporal_timeseries")
class TemporalTimeseriesProcessor(TemporalFeatureProcessor):
"""Temporal-aware wrapper around the classic TimeseriesProcessor.
Identical processing to ``TimeseriesProcessor`` (uniform resampling +
forward-fill imputation), but returns a **dict** ``{"value": Tensor,
"time": Tensor}`` instead of a bare tensor, making it compatible with
``UnifiedMultimodalEmbeddingModel``.
Input tuple format:
``(timestamps: List[datetime], values: np.ndarray[T, F])``
Output dict:
``{"value": FloatTensor (S, F), "time": FloatTensor (S,)}``
— ``S`` is determined by ``sampling_rate`` and the observation window.
— ``time`` contains hours elapsed from the first observation.
Args:
sampling_rate: Uniform re-sampling interval. Defaults to 1 hour.
impute_strategy: Currently only ``"forward_fill"`` is supported.
Example::
proc = TemporalTimeseriesProcessor(sampling_rate=timedelta(hours=2))
from datetime import datetime, timedelta
ts = [datetime(2023,1,1,0), datetime(2023,1,1,4), datetime(2023,1,1,8)]
val = np.array([[120.0, 80.0], [115.0, 78.0], [118.0, 82.0]])
out = proc.process_temporal((ts, val))
# out["value"].shape → (5, 2) ← 5 two-hour steps over 8 h
# out["time"].shape → (5,) ← [0., 2., 4., 6., 8.] hours
"""
def __init__(
self,
sampling_rate: timedelta = timedelta(hours=1),
impute_strategy: str = "forward_fill",
):
self.sampling_rate = sampling_rate
self.impute_strategy = impute_strategy
self.n_features: int | None = None
# ── FeatureProcessor interface ─────────────────────────────────────────
[docs] def fit(self, samples: Iterable[Dict[str, Any]], field: str) -> None:
"""Infer feature dimension from the first valid sample."""
for sample in samples:
if field in sample and sample[field] is not None:
_, values = sample[field]
arr = np.asarray(values)
if arr.ndim == 2:
self.n_features = arr.shape[1]
elif arr.ndim == 1:
self.n_features = 1
break
[docs] def process(self, value: Tuple[List[datetime], np.ndarray]) -> dict:
"""Process and return a dict compatible with TemporalFeatureProcessor.
Args:
value: ``(timestamps, values)`` where timestamps is a list of
``datetime`` objects and values is a ``np.ndarray`` of shape
``(T, F)`` or ``(T,)``.
Returns:
``{"value": FloatTensor (S, F), "time": FloatTensor (S,)}``
"""
timestamps, values = value
if len(timestamps) == 0:
raise ValueError("Timestamps list is empty.")
values = np.asarray(values, dtype=float)
if values.ndim == 1:
values = values[:, None] # (T,) → (T, 1)
num_features = values.shape[1]
start_time = timestamps[0]
end_time = timestamps[-1]
total_steps = int((end_time - start_time) / self.sampling_rate) + 1
sampled_values = np.full((total_steps, num_features), np.nan)
for t, v in zip(timestamps, values):
idx = int((t - start_time) / self.sampling_rate)
if 0 <= idx < total_steps:
sampled_values[idx] = v
# Forward-fill imputation
for f in range(num_features):
last = 0.0
for i in range(total_steps):
if not np.isnan(sampled_values[i, f]):
last = sampled_values[i, f]
else:
sampled_values[i, f] = last
# Build time tensor (hours from first observation)
hours_per_step = self.sampling_rate.total_seconds() / 3600.0
time_hours = np.array(
[i * hours_per_step for i in range(total_steps)], dtype=np.float32
)
return {
"value": torch.tensor(sampled_values, dtype=torch.float32),
"time": torch.tensor(time_hours, dtype=torch.float32),
}
# process_temporal delegates to process (already returns dict)
[docs] def process_temporal(self, value) -> dict:
return self.process(value)
[docs] def is_token(self) -> bool:
return False
[docs] def schema(self) -> tuple[str, ...]:
return ("value", "time")
[docs] def dim(self) -> tuple[int, ...]:
return (2, 1)
[docs] def spatial(self) -> tuple[bool, ...]:
return (True, False)
# ── TemporalFeatureProcessor interface ────────────────────────────────
[docs] def modality(self) -> ModalityType:
"""Continuous vitals / lab timeseries → NUMERIC modality."""
return ModalityType.NUMERIC
[docs] def value_dim(self) -> int:
"""Number of features per time-step (used with nn.Linear).
Must be called after fit()."""
return self.n_features if self.n_features is not None else 1
[docs] def size(self) -> int | None:
"""Alias for value_dim() — mirrors TimeseriesProcessor API."""
return self.n_features
def __repr__(self) -> str:
return (
f"TemporalTimeseriesProcessor("
f"sampling_rate={self.sampling_rate}, "
f"n_features={self.n_features})"
)