Aggregated Variables#

Aggregation variables allow you to create new variables by computing statistics or transformations from existing dynamic (time-series) variables. These are essential for creating meaningful clinical indicators from raw measurement data.

Overview#

The aggregation module provides several variable types:

  • NativeStatic: Simple aggregations of dynamic variables (e.g., first, last, mean, max)

  • DerivedStatic: Computed static variables based on expressions or custom functions

  • DerivedDynamic: Computed time-series variables from other variables

Variable Types#

NativeStatic Variables#

NativeStatic variables create single values from time-series data using aggregation functions.

Available Aggregation Functions:

  • !first [columns]: First recorded value

  • !last [columns]: Last recorded value

  • !mean [column]: Mean value

  • !median [column]: Median value

  • !max [column]: Maximum value

  • !min [column]: Minimum value

  • !count [column]: Count of non-null values

  • !any: True if any value exists

  • !closest(reference, offset, tolerance) [columns]: Value closest to reference time

Examples:

from corr_vars.sources.aggregation import NativeStatic
from corr_vars import Cohort

cohort = Cohort(obs_level="icu_stay", load_default_vars=False)

# First blood pressure measurement
first_bp = NativeStatic(
    var_name="first_blood_pressure",
    select="!first value",
    base_var="blood_pressure_sys"
)
cohort.add_variable(first_bp)

# Maximum heart rate during ICU stay
max_hr = NativeStatic(
    var_name="max_heart_rate",
    select="!max value",
    base_var="heart_rate"
)
cohort.add_variable(max_hr)

# Blood pressure closest to admission
admission_bp = NativeStatic(
    var_name="admission_blood_pressure",
    select="!closest(icu_admission, 0, 2h) value",
    base_var="blood_pressure_sys"
)
cohort.add_variable(admission_bp)

DerivedStatic Variables#

DerivedStatic variables compute new values using expressions or custom functions.

Expression-Based Variables:

from corr_vars.sources.aggregation import DerivedStatic

# Body Mass Index calculation
bmi_var = DerivedStatic(
    var_name="bmi",
    requires=["weight_on_admission", "height"],
    expression="weight_on_admission / (height / 100) ** 2"
)
cohort.add_variable(bmi_var)

# Hospital mortality
mortality_var = DerivedStatic(
    var_name="hospital_mortality",
    requires=["hospital_discharge", "death_timestamp"],
    expression="hospital_discharge >= death_timestamp"
)
cohort.add_variable(mortality_var)

Custom Function Variables:

For complex calculations, you can define custom functions in the variable mapping:

# Custom SOFA score calculation
sofa_var = DerivedStatic(
    var_name="sofa_score_admission",
    requires=[
        "first_creatinine", "first_bilirubin",
        "first_pao2_fio2", "first_platelets",
        "first_gcs", "first_map"
    ]
)
# Custom function would be defined in variables.py

DerivedDynamic Variables#

DerivedDynamic variables create new time-series from existing ones.

from corr_vars.sources.aggregation import DerivedDynamic

# PaO2/FiO2 ratio calculation
pf_ratio = DerivedDynamic(
    var_name="pf_ratio",
    requires=["blood_pao2_arterial", "vent_fio2"],
    cleaning={"value": {"low": 50, "high": 800}}
)
# Custom calculation function defined in variables.py

Time Constraints and Filtering#

All aggregation variables support time constraints:

# Values only from first 24 hours
early_lactate = NativeStatic(
    var_name="max_lactate_24h",
    select="!max value",
    base_var="blood_lactate",
    tmin="icu_admission",
    tmax=("icu_admission", "+24h")
)

# Values before a specific event
pre_intubation_spo2 = NativeStatic(
    var_name="last_spo2_before_intubation",
    select="!last value",
    base_var="spo2",
    tmax="first_intubation_dtime"
)

Filtering with WHERE Clauses#

Filter source data before aggregation:

# Only abnormal values
high_temp = NativeStatic(
    var_name="max_fever_temperature",
    select="!max value",
    base_var="body_temperature",
    where="value > 38.0"
)

# Specific medication doses
max_norepinephrine = NativeStatic(
    var_name="max_norepinephrine_dose",
    select="!max value",
    base_var="med_norepinephrine",
    where="!isin(description, ['Norepinephrine', 'Noradrenaline'])"
)

Advanced Examples#

Complex Clinical Indicators#

# Shock index calculation
shock_index = DerivedStatic(
    var_name="shock_index_admission",
    requires=["first_heart_rate", "first_blood_pressure_sys"],
    expression="first_heart_rate / first_blood_pressure_sys"
)

# APACHE II acute physiology score components
apache_temp = NativeStatic(
    var_name="apache_temperature",
    select="!closest(icu_admission, 0, 24h) value",
    base_var="body_temperature"
)

apache_map = NativeStatic(
    var_name="apache_mean_arterial_pressure",
    select="!closest(icu_admission, 0, 24h) value",
    base_var="blood_pressure_mean"
)

Outcome Variables#

# ICU length of stay
icu_los = DerivedStatic(
    var_name="icu_length_of_stay_days",
    requires=["icu_admission", "icu_discharge"],
    expression="(icu_discharge - icu_admission).dt.total_seconds() / 86400"
)

# Ventilator-free days
vent_free_days = DerivedStatic(
    var_name="ventilator_free_days_28",
    requires=["icu_admission", "last_extubation_dtime", "icu_discharge"],
    # Custom function required for complex logic
)

Quality Indicators#

# Number of blood pressure measurements per day
bp_frequency = NativeStatic(
    var_name="bp_measurements_per_day",
    select="!count value",
    base_var="blood_pressure_sys",
    tmin="icu_admission",
    tmax="icu_discharge"
)

# Time to first antibiotic
time_to_abx = NativeStatic(
    var_name="time_to_first_antibiotic_hours",
    select="!first recordtime",
    base_var="any_antibiotic_icu"
)

Best Practices#

  1. Use Appropriate Aggregation Functions: Choose the right function for your clinical question

  2. Set Time Constraints: Always specify appropriate tmin/tmax to avoid temporal biases

  3. Apply Cleaning Rules: Use cleaning parameters to filter out physiologically impossible values

  4. Document Clinical Rationale: Include clear variable names and documentation

  5. Validate Results: Always check aggregated values for clinical plausibility

Class Reference#

class corr_vars.sources.cub_hdp.extract.NativeStatic(var_name, select, base_var, where=None, tmin=None, tmax=None, cleaning=None)[source]#

Bases: Variable

Aggregated variables represent simple aggregations of dynamic variables.

Parameters:
  • var_name (str) – Name of the variable.

  • select (str) – Select clause specifying aggregation function and columns.

  • base_var (str) – Name of the base variable (must be a native_dynamic variable).

  • where (str, optional) – Optional WHERE clause (in format for polars).

  • tmin (str, optional) – Minimum time for the extraction.

  • tmax (str, optional) – Maximum time for the extraction.

The select argument supports several aggregation functions:

  • !first [columns]: Returns the first row within this case
    >>> "!first value"  # Single column
    >>> "!first value, recordtime"  # Multiple columns
    
  • !last [columns]: Returns the last row within this case
    >>> "!last value"
    >>> "!last value, recordtime"
    
  • !any: Returns True if any value exists
    >>> "!any"
    >>> "!any value"
    
  • !sum [column]: Calculates sum of values
    >>> "!sum value"
    
  • !closest(to_column, timedelta, plusminus) [columns]: Selects value closest to specified column
    Args:

    to_column: Column to compare “recordtime” against timedelta: Time to add to “to_column” for comparison plusminus: Allowed time mismatch (can specify different before/after with space)

    >>> "!closest(hospital_admission) value, recordtime"  # Closest to admission
    >>> "!closest(hospital_admission, 0, 2h 3h) value"  # 2h before to 3h after
    >>> "!closest(first_intubation_dtime, 6h, 2h) value"  # 6h after intubation ±2h
    
  • !mean [column]: Calculates mean value
    >>> "!mean value"
    
  • !median [column]: Calculates median value
    >>> "!median value"
    
  • !perc(quantile) [column]: Calculates specified percentile
    >>> "!perc(75) value"  # 75th percentile
    

The where argument supports SQL-style boolean expressions. These are evaluated in the context of the base variable by column_selector(). Where also supports magic commands (starting with !) to filter the data. Supported commands are:

  • !isin(column, [values]): Filters rows where the value in column is in values

  • !startswith(column, [values]): Filters rows where the value in column starts with any of the values

  • !endswith(column, [values]): Filters rows where the value in column ends with any of the values

Parameters:

cleaning (dict[str, dict[Literal['low', 'high'], Any]] | None)

base_var: tuple[str, TimeWindow] | VariableProtocol | MultiSourceVariable#
extract(cohort)[source]#

Extract the variable. You do not need to call this yourself, as it is called internally when you add the variable to a cohort. However, you may call it directly to obtain variable data independently of the cohort. You still need a cohort object for case ids and other metadata.

Parameters:

cohort (Cohort) – Cohort object.

Return type:

DataFrame

Returns:

Extracted variable.

After extraction, you may also access the data as Variable.data.

Examples

>>> var = NativeStatic(
...     var_name="first_sodium_recordtime",
...     select="!first recordtime",
...     base_var="blood_sodium",
...     tmin="hospital_admission"
... )
>>> var.extract(cohort) # With var.extract(), the data will not be added to the cohort.
>>> var.data # You can access the data as a polars dataframe.
class corr_vars.sources.cub_hdp.extract.DerivedStatic(var_name, requires=[], expression=None, tmin=None, tmax=None, py=None, py_ready_polars=False, dynamic=False, cleaning=None)[source]#

Bases: Variable

DerivedStatic: These variables are derivations on existing columns in the cohort.obs dataframe based on the expression argument.

Parameters:
  • var_name – Name of the variable.

  • requires (list[str] | dict[str, RequirementsDict]) – List of required variables.

  • expression (str | None) – Expression to extract the variable.

  • tmin (str | tuple[str, str] | None) – Minimum time for the extraction.

  • tmax (str | tuple[str, str] | None) – Maximum time for the extraction.

Note that DerivedStatic variables are executed on the cohort.obs dataframe and must reference existing columns in cohort.obs.

For DerivedStatic variables, you may either provide an SQL-Like expression (which will be parsed by column_selector) or a custom function in variables.py. Use expressions where possible, but custom functions if you require more complex logic.

Examples

>>> DerivedStatic(
...     var_name="inhospital_death",
...     requires=["hospital_discharge", "death_timestamp"],
...     expression="hospital_discharge <= death_timestamp"
... )
>>> DerivedStatic(
...     var_name="any_va_ecmo_icu",
...     requires=["ecmo_va_icu_ops", "ecmo_va_icu"],
...     expression=(ecmo_va_icu_ops | ecmo_va_icu)
... )
Parameters:
  • py (VariableCallable | None)

  • py_ready_polars (bool)

  • dynamic (bool)

  • cleaning (dict[str, dict[Literal['low', 'high'], Any]] | None)

extract(cohort)[source]#

Extracts data from the datasource. Usually follows this pattern.

# Load & Extract required variables
self._get_required_vars(cohort)

# This should change self.data either by returning or side effect
self.data = self._custom_extraction(cohort)

# Calls variable function to transform extracted data
self._call_var_function(cohort)

if self.dynamic:
    self._add_time_window(cohort)
    # Expects case_tmin, case_tmax for each primary key
    self._timefilter()
    self._apply_cleaning()
Parameters:

cohort (Cohort)

Return type:

DataFrame

class corr_vars.sources.cub_hdp.extract.DerivedDynamic(var_name, requires, cleaning=None, tmin=None, tmax=None, py=None, py_ready_polars=False, dynamic=True)[source]#

Bases: Variable

Derived dynamic variables are extracted using a custom function.

Parameters:
  • var_name – Name of the variable.

  • requires (list[str] | dict[str, RequirementsDict]) – List of required variables.

  • cleaning (dict[str, dict[Literal['low', 'high'], Any]] | None) – Cleaning parameters ({column_name: {low: int, high: int}})

  • tmin (str | tuple[str, str] | None) – Minimum time for the extraction.

  • tmax (str | tuple[str, str] | None) – Maximum time for the extraction.

  • py (VariableCallable | None) – Custom function.

  • py_ready_polars (bool) – Whether the custom function is already prepared for polars. Default is False (input and output are pandas dataframes).

Examples

>>> def var_func(var, cohort):
...     # Note that this simplified example only works if blood_pao2_arterial and vent_fio2 are of the same length (which is proabably not the case).
...     return var.with_columns(
...         (var.required_vars["blood_pao2_arterial"] / var.required_vars["vent_fio2"]).alias("pf_ratio")
...     )
>>> DerivedDynamic(
...     var_name="pf_ratio",
...     requires=["blood_pao2_arterial", "vent_fio2"],
...     py=var_func,
...     py_ready_polars=True)
Parameters:

dynamic (bool)

extract(cohort)[source]#

Extract the variable.

Parameters:

cohort (Cohort) – Cohort object.

Returns:

Extracted variable.

Return type:

pl.DataFrame