Core Module#

The core module contains the fundamental classes for managing cohorts and variables in CORR-Vars.

Variable Architecture#

Variables in CORR-Vars follow a hierarchical structure that supports different types of data extraction and computation:

../_images/cv_var_hierarchy.png

Variable Types#

Base Variable Class

The foundational class that all variables inherit from. Provides common functionality for data processing, cleaning, and time filtering.

NativeVariable

Extended Variable class specifically for variables extracted directly from data sources. Includes caching capabilities and source-specific optimizations.

Variable Processing Pipeline#

Each variable follows a standardized processing pipeline:

  1. Extraction: Data is retrieved from the specified source

  2. Time Filtering: Data is filtered based on tmin/tmax constraints

  3. Cleaning: Invalid values are removed based on cleaning rules

  4. Column Ordering: Columns are standardized according to predefined order

  5. Relative Time Calculation: Relative timestamps are computed for dynamic variables

Examples#

Working with Variables#

from corr_vars.core.variable import Variable
from corr_vars import Cohort

# Initialize a cohort
cohort = Cohort(obs_level="icu_stay", load_default_vars=False)

# Variables are typically added through the cohort interface
cohort.add_variable("blood_sodium")

# Access the variable data
sodium_data = cohort.obsm["blood_sodium"]
print(f"Sodium measurements: {len(sodium_data)} records")

Custom Variable Creation#

from corr_vars.sources.aggregation import NativeStatic

# Create a custom aggregated variable
max_temp_var = NativeStatic(
    var_name="max_temperature_24h",
    select="!max value",
    base_var="body_temperature",
    tmin="icu_admission",
    tmax=("icu_admission", "+24h")
)

# Add to cohort
cohort.add_variable(max_temp_var)

Time Constraints#

# Add variable with custom time constraints
cohort.add_variable(
    "blood_lactate",
    tmin=("icu_admission", "-2h"),  # 2 hours before ICU admission
    tmax=("icu_admission", "+6h")   # 6 hours after ICU admission
)

Class Reference#

class corr_vars.core.variable.Variable(var_name, dynamic, requires=[], tmin=None, tmax=None, py=None, py_ready_polars=False, cleaning=None)[source]#

Bases: object

Base class for all variables.

Parameters:
  • var_name (str) – The variable name.

  • dynamic (bool) – True if the variable is dynamic (time-series).

  • requires (RequirementsIterable) – List of variables or dict of variables with tmin/tmax required to calculate the variable (default: []).

  • tmin (TimeBoundColumn | None) – The tmin argument. Can either be a string (column name) or a tuple of (column name, timedelta).

  • tmax (TimeBoundColumn | None) – The tmax argument. Can either be a string (column name) or a tuple of (column name, timedelta).

  • py (Callable) – The function to call to calculate the variable.

  • py_ready_polars (bool) – True if the variable code can accept polars dataframes as input and return a polars dataframe.

  • cleaning (CleaningDict) – Dictionary with cleaning rules for the variable.

Note

tmin and tmax can be None when you create a Variable object, but must be set before extraction. If you add the variable via cohort.add_variable(), it will be automatically set to the cohort’s tmin and tmax.

This base class should not be used directly; use one of the subclasses instead. These are specified in the sources submodule.

Examples

Basic cleaning configuration:

>>> cleaning = {
...     "value": {
...         "low": 10,
...         "high": 80
...     }
... }

Time constraints with relative offsets:

>>> # Extract data from 2 hours before to 6 hours after ICU admission
>>> tmin = ("icu_admission", "-2h")
>>> tmax = ("icu_admission", "+6h")

Variable with dependencies:

>>> # This variable requires other variables to be loaded first
>>> requires = ["blood_pressure_sys", "blood_pressure_dia"]
>>> # This variable requires other variables with fixed tmin/tmax to be loaded first
>>> requires = {
...     "blood_pressure_sys_hospital": {
...         "template": "blood_pressure_sys"
...         "tmin": "hospital_admission",
...         "tmax": "hospital_discharge"
...     },
...     "blood_pressure_dia_hospital": {
...         "template": "blood_pressure_dia"
...         "tmin": "hospital_admission",
...         "tmax": "hospital_discharge"
...     }
... }
var_name: str#
dynamic: bool#
requires: list[str] | dict[str, RequirementsDict]#
required_vars: dict[str, ExtractedVariable]#
data: DataFrame | None#
tmin: str | tuple[str, str] | None#
tmax: str | tuple[str, str] | None#
py: VariableCallable | None#
py_ready_polars: bool#
cleaning: dict[str, dict[Literal['low', 'high'], Any]] | None#
extract(cohort)[source]#

Extracts data from the datasource. Usually follows this pattern for dynamic (timeseries) data. ``` self._get_required_vars(cohort)

# This should change self.data either by returning or side effect self.data = self._custom_extraction(cohort)

# Or convert_polars=False for pandas functions self._call_var_function(cohort, convert_polars=True)

# Expects case_tmin, case_tmax for each primary key self._timefilter(cohort, always=not self.complex) self._apply_cleaning() self._add_relative_times(cohort) self._unify_and_order_columns(cohort.primary_key) ```

Parameters:

cohort (Cohort)

Return type:

DataFrame

class corr_vars.core.variable.NativeVariable(var_name, dynamic, requires=[], tmin=None, tmax=None, py=None, py_ready_polars=False, cleaning=None, allow_caching=True)[source]#

Bases: Variable

Extended Variable class for native variables from data sources.

Parameters:
  • var_name (str) – The variable name.

  • dynamic (bool) – True if the variable is dynamic (time-series) (default: True).

  • requires (RequirementsIterable) – List of variables or dict of variables with tmin/tmax required to calculate the variable (default: []).

  • tmin (TimeBoundColumn | None) – The tmin argument. Can either be a string (column name) or a tuple of (column name, timedelta) (default: None).

  • tmax (TimeBoundColumn | None) – The tmax argument. Can either be a string (column name) or a tuple of (column name, timedelta) (default: None).

  • py (Callable) – The function to call to calculate the variable (default: None).

  • py_ready_polars (bool) – True if the variable code can accept polars dataframes as input and return a polars dataframe (default: False).

  • cleaning (CleaningDict) – Dictionary with cleaning rules for the variable (default: None).

  • allow_caching (bool) – Whether to allow caching of this variable (default: True).

build_attributes(data)[source]#

Combines columns prefixed by attributes_ into a struct column called attributes

Parameters:

data (TypeVar(PolarsFrame, DataFrame, LazyFrame))

Return type:

TypeVar(PolarsFrame, DataFrame, LazyFrame)