from __future__ import annotations
import copy
from datetime import datetime, date, timezone, timedelta
from enum import Flag, auto
import textwrap
import warnings
import pandas as pd
import polars as pl
from corr_vars import logger
import corr_vars.utils as utils
from corr_vars.sources.var_loader import load_variable
from corr_vars.definitions.constants import DYN_COLUMNS, COL_ORDER, PRIMARY_KEYS
from corr_vars.definitions.typing import (
ExtractedVariable,
PolarsFrame,
is_time_anchor_column,
)
from polars._typing import ClosedInterval
from typing import Any, TypeAlias, TYPE_CHECKING
if TYPE_CHECKING:
from corr_vars.core import Cohort
from corr_vars.definitions.typing import (
TimeAnchorColumn,
RequirementsIterable,
RequirementsDict,
CleaningDict,
VariableCallable,
)
DateLiteral: TypeAlias = datetime | date | int
class TimeAnchor:
column: str | None
delta: str | None
expr: pl.Expr
def __init__(self, bound: TimeAnchorColumn | DateLiteral) -> None:
if (
not is_time_anchor_column(bound)
and not isinstance(bound, datetime)
and not isinstance(bound, date)
):
raise TypeError(
"Expected TimeBoundColumn to be a string or a tuple of two strings or a polars expression."
)
if isinstance(bound, datetime) or isinstance(bound, date):
self.column = None
self.delta = None
self.expr = pl.lit(bound)
elif isinstance(bound, str):
self.column = bound
self.delta = None
self.expr = pl.col(self.column)
else:
self.column, delta = bound
self.delta = delta[1:] if delta[0] == "+" else delta
self.expr = pl.col(self.column).dt.offset_by(self.delta)
self.expr = self.expr.alias("time")
def __repr__(self) -> str:
if self.column is None or self.delta is None:
return self.column or (
f"Literal: {pl.select(self.expr).item()}"
if self.expr.meta.is_literal()
else ", ".join(self.expr.meta.root_names())
)
else:
op = "-" if self.delta[0] == "-" else "+"
rest = self.delta[1:] if self.delta[0] == "-" else self.delta
return f"{self.column} {op} {rest}"
def __eq__(self, other: object) -> bool:
if not isinstance(other, TimeAnchor):
return False
return (
self.column == other.column
and self.delta == other.delta
and self.expr.meta.eq(other.expr)
)
@property
def is_relative(self) -> bool:
return self.column is not None and self.delta is not None
@property
def is_literal(self) -> bool:
return self.column is None and self.delta is None
class TimeWindow:
tmin: TimeAnchor
tmax: TimeAnchor
user_specified_tmin: bool
user_specified_tmax: bool
def __init__(
self,
tmin: TimeAnchor | TimeAnchorColumn | DateLiteral | None = None,
tmax: TimeAnchor | TimeAnchorColumn | DateLiteral | None = None,
user_specified: bool | tuple[bool, bool] = False,
) -> None:
self.tmin = (
tmin if isinstance(tmin, TimeAnchor) else TimeAnchor(tmin or datetime.min)
)
self.tmax = (
tmax
if isinstance(tmax, TimeAnchor)
else TimeAnchor(tmax or datetime.now(tz=timezone(timedelta(hours=1))))
)
self.user_specified_tmin = (
user_specified[0] if isinstance(user_specified, tuple) else user_specified
)
self.user_specified_tmax = (
user_specified[1] if isinstance(user_specified, tuple) else user_specified
)
def __repr__(self) -> str:
return f"TimeWindow(tmin={self.tmin}, tmax={self.tmax})"
def __eq__(self, other: object) -> bool:
if not isinstance(other, TimeWindow):
return False
return self.tmin == other.tmin and self.tmax == other.tmax
@property
def user_specified(self) -> bool:
return self.user_specified_tmin or self.user_specified_tmax
####################
# Time Anchors #
####################
@property
def tmin_expr(self, alias: str = "tmin") -> pl.Expr:
return self.tmin.expr.alias(alias)
@property
def tmax_expr(self, alias: str = "tmax") -> pl.Expr:
return self.tmax.expr.alias(alias)
####################
# Time Concepts #
####################
# TODO: Use or discard
@property
def tmid_expr(self, alias: str = "tmid") -> pl.Expr:
return self.tmin_expr.add(self.duration_expr.truediv(2)).alias(alias)
# TODO: Use or discard
@property
def duration_expr(self, alias: str = "duration") -> pl.Expr:
return (self.tmax_expr - self.tmin_expr).alias(alias)
@property
def is_relative(self) -> bool:
return self.tmin.is_relative or self.tmax.is_relative
@property
def is_literal(self) -> bool:
return self.tmin.is_literal and self.tmax.is_literal
####################
# Validity #
####################
@property
def any_timebound_in_year_9999_expr(self) -> pl.Expr:
return pl.any_horizontal(
self.tmin_expr.dt.year().eq(pl.lit(9999)).fill_null(False),
self.tmax_expr.dt.year().eq(pl.lit(9999)).fill_null(False),
)
@property
def any_timebound_null_expr(self) -> pl.Expr:
return pl.any_horizontal(
self.tmin_expr.is_null(),
self.tmax_expr.is_null(),
)
@property
def tmin_after_tmax_expr(self) -> pl.Expr:
return self.tmin_expr > self.tmax_expr
@property
def any_timebound_invalid_expr(self) -> pl.Expr:
return pl.any_horizontal(
self.any_timebound_null_expr,
self.any_timebound_in_year_9999_expr,
self.tmin_after_tmax_expr,
)
class WindowRelation(Flag):
CONTAINED = auto() # fully inside [tmin, tmax]
OVERLAPS_LEFT = auto() # starts before tmin and ends inside
OVERLAPS_RIGHT = auto() # starts inside and ends after tmax
COVERS_WINDOW = auto() # spans over both boundaries
# Common combinations
ANY_LEFT_OVERLAP = OVERLAPS_LEFT | CONTAINED | COVERS_WINDOW
ANY_RIGHT_OVERLAP = OVERLAPS_RIGHT | CONTAINED | COVERS_WINDOW
ANY_OVERLAP = CONTAINED | OVERLAPS_LEFT | OVERLAPS_RIGHT | COVERS_WINDOW
class WindowOverlap:
@staticmethod
def contains_expr(
record: TimeAnchor, window: TimeWindow, closed: ClosedInterval = "both"
) -> pl.Expr:
return record.expr.is_between(window.tmin_expr, window.tmax_expr, closed=closed)
@staticmethod
def relation_expr(
record: TimeWindow,
window: TimeWindow,
) -> dict[WindowRelation, pl.Expr]:
start = record.tmin_expr
end = record.tmax_expr
tmin = window.tmin_expr
tmax = window.tmax_expr
return {
WindowRelation.CONTAINED: (start >= tmin) & (end <= tmax),
WindowRelation.OVERLAPS_LEFT: (start < tmin)
& (end >= tmin)
& (end <= tmax),
WindowRelation.OVERLAPS_RIGHT: (start >= tmin)
& (start <= tmax)
& (end > tmax),
WindowRelation.COVERS_WINDOW: (start < tmin) & (end > tmax),
}
@staticmethod
def overlap_expr(
record: TimeWindow,
window: TimeWindow,
relation: WindowRelation,
) -> pl.Expr:
expr_map = WindowOverlap.relation_expr(record, window)
expr = None
for flag, cond in expr_map.items():
if relation & flag:
expr = cond if expr is None else expr | cond
if expr is None:
raise ValueError(f"Invalid relation: {relation}")
return expr
# TODO: Use or discard
@staticmethod
def classify_expr(
record: TimeWindow,
window: TimeWindow,
*,
default: WindowRelation | None = None,
) -> pl.Expr:
exprs = WindowOverlap.relation_expr(record, window)
when = pl.when(exprs[WindowRelation.CONTAINED]).then(
pl.lit(WindowRelation.CONTAINED.value)
)
for rel in (
WindowRelation.OVERLAPS_LEFT,
WindowRelation.OVERLAPS_RIGHT,
WindowRelation.COVERS_WINDOW,
):
when = when.when(exprs[rel]).then(pl.lit(rel.value))
if default is not None:
when = when.otherwise(pl.lit(default.value))
else:
when = when.otherwise(pl.lit(None))
return when
def compatible_with_time_anchor(
df: pl.DataFrame | pl.LazyFrame, time_anchor: TimeAnchor
) -> bool:
if time_anchor.is_literal:
return True
column = time_anchor.column or ""
if isinstance(df, pl.DataFrame):
return column in df.columns
else:
return column in df.collect_schema().names()
def compatible_with_time_window(
df: pl.DataFrame | pl.LazyFrame, time_window: TimeWindow
) -> bool:
return compatible_with_time_anchor(
df, time_window.tmin
) and compatible_with_time_anchor(df, time_window.tmax)
[docs]
class Variable:
"""Base class for all variables.
Args:
var_name (str): The variable name.
dynamic (bool): True if the variable is dynamic (time-series).
requires (RequirementsIterable): List of variables or dict of variables with tmin/tmax required to calculate the variable (default: []).
tmin (TimeBoundColumn | None): The tmin argument. Can either be a string (column name) or a tuple of (column name, timedelta).
tmax (TimeBoundColumn | None): The tmax argument. Can either be a string (column name) or a tuple of (column name, timedelta).
py (Callable): The function to call to calculate the variable.
py_ready_polars (bool): True if the variable code can accept polars dataframes as input and return a polars dataframe.
cleaning (CleaningDict): Dictionary with cleaning rules for the variable.
Note:
tmin and tmax can be None when you create a Variable object, but must be set before extraction.
If you add the variable via cohort.add_variable(), it will be automatically set to the cohort's tmin and tmax.
This base class should not be used directly; use one of the subclasses instead. These are specified in the sources submodule.
Examples:
Basic cleaning configuration:
>>> cleaning = {
... "value": {
... "low": 10,
... "high": 80
... }
... }
Time constraints with relative offsets:
>>> # Extract data from 2 hours before to 6 hours after ICU admission
>>> tmin = ("icu_admission", "-2h")
>>> tmax = ("icu_admission", "+6h")
Variable with dependencies:
>>> # This variable requires other variables to be loaded first
>>> requires = ["blood_pressure_sys", "blood_pressure_dia"]
>>> # This variable requires other variables with fixed tmin/tmax to be loaded first
>>> requires = {
... "blood_pressure_sys_hospital": {
... "template": "blood_pressure_sys"
... "tmin": "hospital_admission",
... "tmax": "hospital_discharge"
... },
... "blood_pressure_dia_hospital": {
... "template": "blood_pressure_dia"
... "tmin": "hospital_admission",
... "tmax": "hospital_discharge"
... }
... }
"""
var_name: str
dynamic: bool
requirements: dict[str, RequirementsDict]
required_vars: dict[str, ExtractedVariable]
data: pl.DataFrame | None
time_window: TimeWindow
py: VariableCallable | None
py_ready_polars: bool
cleaning: CleaningDict | None
def __init__(
self,
var_name: str,
dynamic: bool,
tmin: TimeAnchorColumn | None,
tmax: TimeAnchorColumn | None,
requires: RequirementsIterable = [],
py: VariableCallable | None = None,
py_ready_polars: bool = False,
cleaning: CleaningDict | None = None,
) -> None:
# Metadata
self.var_name = var_name
self.dynamic = dynamic # True if the variable is dynamic (time-series)
# Tmin / Tmax
self.time_window = TimeWindow(tmin=tmin, tmax=tmax, user_specified=True)
# Requirements
self.requirements = (
requires
if isinstance(requires, dict)
else {r: {"template": r, "tmin": None, "tmax": None} for r in requires}
)
self.required_vars = {}
# Variable function
self.py = py
self.py_ready_polars = py_ready_polars
# Cleaning
self.cleaning = cleaning
# Data
self.data = None
[docs]
@classmethod
def from_time_window(cls, time_window: TimeWindow, *args, **kwargs) -> Variable:
var = cls(*args, **kwargs)
var.time_window = time_window
return var
def _get_required_vars(self, cohort: Cohort) -> None:
"""Load required variables for this variable.
Args:
cohort (Cohort): The cohort instance to load variables from.
Returns:
None: Sets self.required_vars with loaded Variable objects.
"""
if len(self.requirements) == 0:
logger.debug(f"No dependencies loaded for '{self.var_name}'.")
return
# Print dependency tree before extraction
logger.info(f"Loading dependencies for '{self.var_name}':")
utils.log_collection(logger=logger, collection=self.requirements)
for save_as, config in self.requirements.items():
var_name = config["template"]
if save_as in cohort.constant_vars:
if (config["tmin"] and self.time_window.user_specified_tmin) or (
config["tmax"] and self.time_window.user_specified_tmax
):
warnings.warn(
f"tmin/tmax are ignored for the constant variable {var_name}. "
"Please remove tmin/tmax.",
UserWarning,
)
self.required_vars[save_as] = ExtractedVariable(
var_name=var_name,
dynamic=False,
data=cohort._obs.clone().select(cohort.primary_key, var_name),
)
else:
var = load_variable(
var_name=var_name,
cohort=cohort,
time_window=TimeWindow(
config["tmin"] or self.time_window.tmin,
config["tmax"] or self.time_window.tmax,
),
)
data = var.extract(cohort)
self.required_vars[save_as] = ExtractedVariable(
var_name=var.var_name, dynamic=var.dynamic, data=data
)
def _call_var_function(self, cohort: Cohort) -> bool:
"""
Call the variable function if it exists.
Args:
cohort (Cohort)
Returns:
True if the function was called,
False otherwise.
Operations will be applied to Variable.data directly.
"""
if self.py is None:
logger.debug(f"Variable function not called for {self.var_name}")
return False
code = self.py
logger.debug(f"Calling Variable function {self.var_name}")
if self.py_ready_polars:
data = code(var=self, cohort=copy.deepcopy(cohort))
# Give type checker a hint
if not isinstance(data, pl.DataFrame):
raise TypeError("Variable function did not return a polars DataFrame")
self.data = data
else:
# Legacy pandas code path
logger.warning(
f"Converting {self.var_name} to pandas. This is slow, consider rewriting your variable function to accept polars dataframes and then set py_ready_polars=True"
)
# Ignore all type errors to keep the rest of the code clean.
# Only operate on copies of Cohort and Variable to avoid
# pandas Dataframes if an error occurs inside this code block.
# Convert variable data to pandas (None for complex variables)
pd_var = copy.deepcopy(self)
if pd_var.data is not None:
pd_var.data = utils.convert_to_pandas_df(pd_var.data) # type: ignore
# Convert obs data to pandas
pd_cohort = copy.deepcopy(cohort)
pd_cohort._obs = utils.convert_to_pandas_df(pd_cohort._obs) # type: ignore
# Convert required variables to pandas
for reqvar in pd_var.required_vars.values():
if reqvar.data is not None:
reqvar.data = utils.convert_to_pandas_df(reqvar.data) # type: ignore
data = code(var=pd_var, cohort=pd_cohort)
# Give type checker a hint
if not isinstance(data, pd.DataFrame):
raise TypeError("Variable function did not return a pandas DataFrame")
self.data = utils.convert_to_polars_df(data)
return True
def _add_time_window(self, cohort: Cohort, join_key: str | None = None) -> None:
"""Adds tmin / tmax / join key / primary key from cohort.obs to self.data.
This might duplicate data if the join_key (default: cohort.primary_key) is not unique in obs.
Args:
cohort (Cohort): The cohort instance containing time constraint information.
join_key (str | None): Whether to apply filtering even if case_tmin/case_tmax columns don't exist (default: cohort.primary_key if set to None).
Returns:
None: Modifies self.data to add tmin / tmax / join key / primary key columns.
"""
if self.data is None:
warnings.warn(
"_add_time_window was called before extraction, skipping.", UserWarning
)
return
obs = cohort._obs.clone()
# Remove NA tmin/tmax and tmin/tmax in year 9999, etc.
obs = utils.remove_invalid_time_window(obs, time_window=self.time_window)
# Return df with "case_tmin" and "case_tmin" columns
obs = utils.add_time_window_expr(
obs, time_window=self.time_window, aliases=("tmin", "tmax")
)
# Return df with "case_tmin" and "case_tmax" columns
join_key = join_key or cohort.primary_key
cb = utils.get_cb(
obs,
join_key=join_key,
primary_key=cohort.primary_key,
tmin_col="tmin",
tmax_col="tmax",
)
self.data = self.data.join(cb, on=join_key)
def _timefilter(
self,
time_window: TimeWindow | None = None,
relation: WindowRelation = WindowRelation.ANY_OVERLAP,
) -> None:
"""Apply time-based filtering to variable data based on tmin/tmax constraints."""
if self.data is None:
warnings.warn(
"_timefilter was called before extraction, skipping.", UserWarning
)
return
window = time_window or TimeWindow("case_tmin", "case_tmax")
tmin = window.tmin
tmax = window.tmax
# If these columns do not exist, we can assume that a filter is not intended for this variable
has_time_columns = compatible_with_time_window(self.data, window)
if not has_time_columns:
logger.warning(
f"Time-filter not applied to {self.var_name}, no {tmin} or {tmax} in data and not enforced"
)
return
start_col = "recordtime"
if "recordtime_end" in self.data.columns:
# Prepare data
end_col = "recordtime_end"
end_cleaned_col = "recordtime_end_clean"
self.data = (
self.data
# Ensure valid interval (end >= start)
# Will also fill missing in end_col with values from start_col
.with_columns(
pl.max_horizontal(start_col, end_col).alias(end_cleaned_col),
)
)
# Interval filter on recordtime and recordtime_end
record = TimeWindow(start_col, end_cleaned_col)
self.data = self.data.filter(
WindowOverlap.overlap_expr(
record=record,
window=window,
relation=relation,
)
)
# Remove end_cleaned column
self.data = self.data.drop(end_cleaned_col)
else:
# Simple filter on recordtime
record = TimeAnchor(start_col)
self.data = self.data.filter(WindowOverlap.contains_expr(record, window))
# Remove tmin / tmax columns
if tmin.column:
self.data = self.data.drop(tmin.column)
if tmax.column:
self.data = self.data.drop(tmax.column)
logger.info(f"Time-filter selected {len(self.data)} rows, column names:")
logger.info(utils.pretty_join(self.data.columns, indent="\t"))
def _apply_cleaning(self) -> bool:
"""Apply cleaning rules to filter out invalid values.
Returns:
True if the data was cleaned,
False otherwise.
Operations will be applied to Variable.data directly.
"""
if self.data is None:
warnings.warn(
"_apply_cleaning was called before extraction, skipping.", UserWarning
)
return False
if not self.cleaning:
return False
for col, bounds in self.cleaning.items():
if (
col == "value"
and "value" not in self.data.columns
and self.var_name in self.data.columns
):
col = self.var_name
if "low" in bounds:
self.data = self.data.filter(pl.col(col).ge(bounds["low"]))
if "high" in bounds:
self.data = self.data.filter(pl.col(col).le(bounds["high"]))
return True
def _add_relative_times(self, cohort: Cohort) -> None:
"""Add relative time columns to dynamic variable data.
Args:
cohort (Cohort): The cohort instance containing reference time information.
Returns:
None: Modifies self.data to include recordtime_relative and recordtime_end_relative columns.
"""
if self.data is None:
warnings.warn(
"_add_relative_times was called before extraction, skipping.",
UserWarning,
)
return
# Join with cohort data to get the reference time
self.data = utils.convert_to_polars_df(self.data)
self.data = self.data.join(
cohort._obs.select(cohort.primary_key, cohort.t_min),
on=cohort.primary_key,
how="left",
)
def relative_column(
df: pl.DataFrame, time_col: str, reference_col: str
) -> pl.DataFrame:
return df.with_columns(
pl.col(time_col)
.sub(pl.col(reference_col))
.dt.total_seconds()
.alias(f"{time_col}_relative")
)
if "recordtime" in self.data.columns:
# Calculate relative times from icu_admission in seconds
self.data = self.data.pipe(
relative_column, time_col="recordtime", reference_col=cohort.t_min
)
if (
"recordtime_end" in self.data.columns
and not self.data["recordtime_end"].is_null().all()
):
self.data = self.data.pipe(
relative_column, time_col="recordtime_end", reference_col=cohort.t_min
)
# Drop the reference time column
self.data = self.data.drop(cohort.t_min)
def _unify_and_order_columns(self, primary_key: str) -> None:
"""Order columns according to predefined COL_ORDER and add missing columns with null values.
Args:
primary_key (str): The primary key column name.
Returns:
None: Modifies self.data to have standardized column order.
"""
if self.data is None:
warnings.warn(
"_unify_and_order_columns was called before extraction, skipping.",
UserWarning,
)
return
assert (
primary_key in PRIMARY_KEYS
), f"Primary key {primary_key} must be one of {PRIMARY_KEYS}."
assert (
primary_key in self.data.columns
), f"Primary key {primary_key} not in data columns."
# Add missing dynamic columns (no keys) with null values
missing_cols = (col for col in DYN_COLUMNS if col not in self.data.columns)
if missing_cols:
self.data = self.data.with_columns(
pl.lit(None).alias(col) for col in missing_cols
)
# Order columns according to COL_ORDER, then add remaining columns
data_cols = set(self.data.columns)
ordered_cols = [col for col in COL_ORDER if col in data_cols]
remaining_cols = [col for col in data_cols if col not in COL_ORDER]
if len(remaining_cols) > 0:
warnings.warn(
"🚨 DEPRECATION ALERT 🚨\n"
"Unknown columns found.\n"
"- Please update the variable definition to move these to attributes.\n"
"- Unknown columns are no longer supported and might be deprecated soon!\n"
"- Columns: " + ", ".join(remaining_cols),
UserWarning,
)
self.data = self.data.select(ordered_cols + remaining_cols)
# Uncomment after user warning
# self.data = self.data.select(ordered_cols)
# System methods
def __repr__(self) -> str:
return self.__str__()
def __str__(self) -> str:
return textwrap.dedent(
f"""\
Variable: {self.var_name} ({self.__class__.__name__}, {'dynamic' if self.dynamic else 'static'})
├── time_window: {self.time_window}
└── data: {'Not extracted' if self.data is None else self.data.shape}
"""
).strip()
def __getstate__(self) -> dict[str, Any]:
return self.__dict__
def __setstate__(self, state: dict[str, Any]) -> None:
self.__dict__.update(state)
[docs]
class NativeVariable(Variable):
"""Extended Variable class for native variables from data sources.
Args:
var_name (str): The variable name.
dynamic (bool): True if the variable is dynamic (time-series) (default: True).
requires (RequirementsIterable): List of variables or dict of variables with tmin/tmax required to calculate the variable (default: []).
tmin (TimeBoundColumn | None): The tmin argument. Can either be a string (column name) or a tuple of (column name, timedelta) (default: None).
tmax (TimeBoundColumn | None): The tmax argument. Can either be a string (column name) or a tuple of (column name, timedelta) (default: None).
py (Callable): The function to call to calculate the variable (default: None).
py_ready_polars (bool): True if the variable code can accept polars dataframes as input and return a polars dataframe (default: False).
cleaning (CleaningDict): Dictionary with cleaning rules for the variable (default: None).
allow_caching (bool): Whether to allow caching of this variable (default: True).
"""
def __init__(
self,
# Parent class parameters
var_name: str,
dynamic: bool,
requires: RequirementsIterable = [],
tmin: TimeAnchorColumn | None = None,
tmax: TimeAnchorColumn | None = None,
py: VariableCallable | None = None,
py_ready_polars: bool = False,
cleaning: CleaningDict | None = None,
# This class parameters
allow_caching: bool = True,
) -> None:
super().__init__(
var_name,
dynamic=dynamic,
requires=requires,
tmin=tmin,
tmax=tmax,
py=py,
py_ready_polars=py_ready_polars,
cleaning=cleaning,
)
self.allow_caching = allow_caching
[docs]
def build_attributes(self, data: PolarsFrame) -> PolarsFrame:
"""Combines columns prefixed by attributes_ into a struct column called attributes"""
if not self.dynamic:
logger.debug("Skipping attribtues for non-dynamic variables")
return data
attr_cols = [
col
for col in data.collect_schema().names()
if col.startswith("attributes_")
]
if attr_cols:
struct_expr = {col[len("attributes_") :]: pl.col(col) for col in attr_cols}
# Mypy does not recognise the kwargs correctly as an overload
data = data.with_columns(pl.struct(**struct_expr).alias("attributes")) # type: ignore[call-overload]
data = data.drop(attr_cols)
return data
# def on_admission(self, select: str = "!first value") -> NativeStatic:
# """Create a new NativeStatic variable based on the current variable that extracts the value on admission.
# Args:
# select: Select clause specifying aggregation function and columns. Defaults to ``!first value``.
# Returns:
# NativeStatic
# Examples:
# >>> # Return the first value
# >>> var_adm = variable.on_admission()
# >>> cohort.add_variable(var_adm)
# >>> # Be more specific with your selection
# >>> var_adm = variable.on_admission("!closest(hospital_admission,0,2h) value")
# >>> cohort.add_variable(var_adm)
# """
# if not self.dynamic:
# raise ValueError("on_admission is only supported for dynamic variables")
# return NativeStatic(
# var_name=f"{self.var_name}_adm",
# select=select,
# base_var=self.var_name,
# tmin=self.tmin,
# tmax=self.tmax,
# )