Cohort#

Cohort Workflow#

Initialization#

With this snippet, you can initialize a cohort object using the CUB-HDP data source.

cohort = Cohort(
    obs_level="icu_stay",  # One of: "hospital_stay", "icu_stay", "procedure"
    load_default_vars=False,  # Optional, defaults to True
    sources={
        "cub_hdp": {
            "database": "db_hypercapnia_prepared",
            "password_file": True, # Specify path to file, True assumes ~/password.txt
            "merge_consecutive": False
        }
    },
)

Specify multiple data sources to combine cohorts.

cohort = Cohort(
     obs_level="icu_stay",    # One of: "hospital_stay", "icu_stay", "procedure"
    load_default_vars=False,  # Optional, defaults to True
    sources={
        "cub_hdp": {
            "database": "db_hypercapnia_prepared",
            "password_file": True, # Specify path to file, True assumes ~/password.txt
            "merge_consecutive": False
        },
        "reprodicu": {
            "path": "/data02/projects/reprodicubility/reprodICU/reprodICU_files"
        }
    },
)

Adding Variables#

# Use pre-defined variables
cohort.add_variable("pf_ratio")

# Create a custom variable on the fly
cohort.add_variable(
    NativeStatic(
        var_name="median_sodium_before_hn",
        select="!median value",
        base_var="blood_sodium",
        tmin="hospital_admission",
        tmax=cohort.t_eligible
    )
)

# Perform manual operations on the dataframe
cohort.obs = cohort.obs.with_columns([
    pl.col('first_sodium_recordtime').eq(pl.col('first_severe_hypernatremia_recordtime'))
    .alias('idx_hypernatremia_was_on_admission')
])

cohort.obs = cohort.obs.with_columns([
    pl.when(pl.col('idx_hypernatremia_was_on_admission'))
    .then(pl.lit('community_acquired'))
    .otherwise(pl.lit('hospital_acquired'))
    .alias('hn_origin')
])

Inclusion/Exclusion#

# Add multiple inclusion criteria
cohort.include_list([
    {
        "variable": "age",
        "operation": ">= 18",
        "label": "Adult patients"
    },
    {
        "variable": "icu_length_of_stay",
        "operation": "> 2",
        "label": "ICU stay > 2 days"
    }
])

# Add exclusion criteria
cohort.exclude_list([
    {
        "variable": "any_dx_covid_19",
        "operation": "== True",
        "label": "Exclude COVID-19 patients"
    }
])

# Visualize the inclusion/exclusion criteria
cohort.changetracker.create_flowchart(output_file="inclusion_flowchart.svg")

Exploration#

# Display the cohort dataframe
print(cohort.obs)


# Create a TableOne object
tableone = cohort.tableone(ignore_cols=["icu_id"])
print(tableone)

Data Export#

# Save to file
cohort.save("my_cohort.corr2")

# Load from file
cohort = Cohort.load("my_cohort.corr2")

# Export to CSV
cohort.to_csv("path/to/output_folder")

# Convert to Stata .dta file
cohort.to_stata("path/to/output_folder/my_cohort.dta")

Class Reference#

class corr_vars.core.cohort.Cohort(obs_level='icu_stay', sources={'cub_hdp': {'conn_args': {'password_file': True}, 'database': 'db_hypercapnia_prepared', 'icu_stay': {'merge_consecutive': True}}}, project_vars={}, load_default_vars=True, logger_args={})[source]#

Bases: object

Class to build a cohort in the CORR database.

Parameters:
  • obs_level (Literal["icu_stay", "hospital_stay", "procedure"]) – Observation level (default: “icu_stay”).

  • sources (dict[str, dict]) –

    Dictionary of data sources to use for data extraction. Available options are “cub_hdp”, “reprodicu”, “co5”. Source configurations:

    • cub_hdp: Accepted keys: database, password_file, merge_consecutive, filters, conn_args [remote_hostname (str), username (str)]

    • reprodicu: Accepted keys: path, exclude_datasets

    • co5: Not implemented yet. Data request is still pending.

    Note: reprodicu does not yet implement variable extraction, only cohort data.

  • project_vars (dict) – Dictionary with local variable definitions (default: {}).

  • load_default_vars (bool) – Whether to load the default variables (default: True).

  • logger_args (dict) – Dictionary of Logging configurations [level (int), file_path (str), file_mode (str), verbose_fmt (bool), colored_output (bool), formatted_numbers (bool)] (default: {}).

obs#

Static data for each observation. Contains one row per observation (e.g., ICU stay) with columns for static variables like demographics and outcomes.

Example

>>> cohort.obs
patient_id  case_id icu_stay_id            icu_admission        icu_discharge sex   ... inhospital_death
0  P001         C001    C001_1       2023-01-01 08:30:00  2023-01-03 12:00:00   M   ...  False
1  P001         C001    C001_2       2023-01-03 14:20:00  2023-01-05 16:30:00   M   ...  False
2  P002         C002    C002_1       2023-01-02 09:15:00  2023-01-04 10:30:00   F   ...  False
3  P003         C003    C003_1       2023-01-04 11:45:00  2023-01-07 13:20:00   F   ...  True
...
Type:

pl.DataFrame

obsm#

Dynamic data stored as dictionary of DataFrames. Each DataFrame contains time-series data for a variable with columns:

  • recordtime: Timestamp of the measurement

  • value: Value of the measurement

  • recordtime_end: End time (only for duration-based variables like therapies)

  • description: Additional information (e.g., medication names)

Example

>>> cohort.obsm["blood_sodium"]
   icu_stay_id          recordtime  value
0  C001_1      2023-01-01 09:30:00   138
1  C001_1      2023-01-02 10:15:00   141
2  C001_2      2023-01-03 15:00:00   137
3  C002_1      2023-01-02 10:00:00   142
4  C003_1      2023-01-04 12:30:00   139
...
Type:

dict of pl.DataFrame

Notes

  • For large cohorts, set load_default_vars=False to speed up the extraction. You can use pre-extracted cohorts as starting points and load them using Cohort.load().

  • Variables can be added using cohort.add_variable(). Static variables will be added to obs, dynamic variables to obsm.

  • sources["cub_hdp"]["filters"] also allows a special shorthand “_dx” to extract the hospital admissions of the last x months, useful for debugging/prototyping. For example use “_d2” to extract every admission of the last 2 months.

Examples

Create a new cohort:

>>> cohort = Cohort(
...                 obs_level="icu_stay",
...                 load_default_vars=False,
...                 sources={
...                     "cub_hdp": {
...                         "database": "db_hypercapnia_prepared",
...                         "password_file": True,
...                         "merge_consecutive": False},
...                     "reprodicu": {
...                         "path": "/data02/projects/reprodicubility/reprodICU/reprodICU_files"}
...                 })

Access static data:

>>> cohort.obs.select("age_on_admission")  # Get age for all patients
>>> cohort.obs.filter(pl.col("sex") == "M")  # Filter for male patients

Access time-series data:

>>> cohort.obsm["blood_sodium"]  # Get all blood sodium measurements
>>> # Get blood sodium measurements for a specific observation
>>> cohort.obsm["blood_sodium"].filter(pl.col(cohort.primary_key) == "12345")
constant_vars: list[str]#
obs_level: ObsLevel#
primary_key: str#
t_min: str#
t_max: str#
t_eligible: str#
t_outcome: str#
logger_args: dict[str, Any]#
sources: Final[SourceDict]#
project_vars: dict[str, dict[str, Any]]#
tmpdir_manager: Final[TemporaryDirectoryManager]#
load_default_vars()[source]#

Load the default variables defined in vars.json. It is recommended to use this after filtering your cohort for eligibility to speed up the process.

Returns:

Variables are loaded into the cohort.

Return type:

None

Examples

>>> # Load default variables for an ICU cohort
>>> cohort = Cohort(obs_level="icu_stay", load_default_vars=False)
>>> # Apply filters first (faster)
>>> cohort.include_list([
...     {"variable": "age_on_admission", "operation": ">= 18", "label": "Adults"}
... ])
>>> # Then load default variables
>>> cohort.load_default_vars()
add_variable(variable, save_as=None, tmin=None, tmax=None)[source]#

Add a variable to the cohort.

You may specify tmin and tmax as a tuple (e.g. (“hospital_admission”, “+1d”)), in which case it will be relative to the hospital admission time of the patient.

Parameters:
  • variable (str | Variable | MultiSourceVariable) – Variable to add. Either a string with the variable name (from vars.json) or a Variable object.

  • save_as (Optional[str]) – Name of the column to save the variable as. Defaults to variable name.

  • tmin (Union[str, tuple[str, str], None]) – Name of the column to use as tmin or tuple (see description).

  • tmax (Union[str, tuple[str, str], None]) – Name of the column to use as tmax or tuple (see description).

Returns:

The variable object.

Return type:

Variable

Examples

>>> cohort.add_variable("blood_sodium")
>>> cohort.add_variable(
...    variable="anx_dx_covid_19",
...    tmin=("hospital_admission", "-1d"),
...    tmax=cohort.t_eligible
... )
>>> cohort.add_variable(
...    NativeStatic(
...        var_name="highest_hct_before_eligible",
...        select="!max value",
...        base_var='blood_hematokrit',
...        tmax=cohort.t_eligible
...    )
... )
>>> cohort.add_variable(
...    variable='any_med_glu',
...    save_as="glucose_prior_eligible",
...    tmin=(cohort.t_eligible, "-48h"),
...    tmax=cohort.t_eligible
... )
set_t_eligible(t_eligible, drop_ineligible=True)[source]#

Set the time anchor for eligibility. This can be referenced as cohort.t_eligible throughout the process and is required to add inclusion or exclusion criteria.

Parameters:
  • t_eligible (str) – Name of the column to use as t_eligible.

  • drop_ineligible (bool) – Whether to drop ineligible patients. Defaults to True.

Returns:

t_eligible is set.

Return type:

None

Examples

>>> # Add a suitable time-anchor variable
>>> cohort.add_variable(NativeStatic(
...    var_name="spo2_lt_90",
...    base_var="spo2",
...    select="!first recordtime",
...    where="value < 90",
... ))
>>> # Set the time anchor for eligibility
>>> cohort.set_t_eligible("spo2_lt_90")
set_t_outcome(t_outcome)[source]#

Set the time anchor for outcome. This can be referenced as cohort.t_outcome throughout the process and is recommended to specify for your study.

Parameters:

t_outcome (str) – Name of the column to use as t_outcome.

Returns:

t_outcome is set.

Return type:

None

Examples

>>> cohort.set_t_outcome("hospital_discharge")
change_tracker(description, mode='include')[source]#

Return a context manager to group cohort edits and record a single ChangeTracker state on exit.

Example

with cohort.change_tracker(“Adults”, mode=”include”) as track:

track.filter(pl.col(“age_on_admission”) >= 18)

Parameters:
  • description (str)

  • mode (Literal['include', 'exclude'])

include(*args, **kwargs)[source]#

Add an inclusion criterion to the cohort. It is recommended to use Cohort.include_list() and add all of your inclusion criteria at once. However, if you need to specify criteria at a later stage, you can use this method.

Warning

You should call Cohort.include_list() before calling Cohort.include() to ensure that the inclusion criteria are properly tracked.

Parameters:
  • variable (str | Variable)

  • operation (str)

  • label (str)

  • operations_done (str)

  • [Optional – tmin, tmax]

Returns:

Criterion is added to the cohort.

Return type:

None

Note

operation is passed to pandas.DataFrame.query, which uses a slightly modified Python syntax. Also, if you specify “true”/”True” or “false”/”False” as a value for operation, it will be converted to “== True” or “== False”, respectively.

Examples

>>> cohort.include(
...    variable="age_on_admission",
...    operation=">= 18",
...    label="Adult",
...    operations_done="Include only adult patients"
... )
exclude(*args, **kwargs)[source]#

Add an exclusion criterion to the cohort. It is recommended to use Cohort.exclude_list() and add all of your exclusion criteria at once. However, if you need to specify criteria at a later stage, you can use this method.

Warning

You should call Cohort.exclude_list() before calling Cohort.exclude() to ensure that the exclusion criteria are properly tracked.

Parameters:
  • variable (str | Variable)

  • operation (str)

  • label (str)

  • operations_done (str)

  • [Optional – tmin, tmax]

Returns:

Criterion is added to the cohort.

Return type:

None

Note

operation is passed to pandas.DataFrame.query, which uses a slightly modified Python syntax. Also, if you specify “true”/”True” or “false”/”False” as a value for operation, it will be converted to “== True” or “== False”, respectively.

Examples

>>> cohort.exclude(
...    variable="elix_total",
...    operation="> 20",
...    operations_done="Exclude patients with high Elixhauser score"
... )
include_list(inclusion_list=[])[source]#

Add an inclusion criteria to the cohort.

Parameters:

inclusion_list (list) – List of inclusion criteria. Must include a dictionary with keys: * variable (str | Variable): Variable to use for exclusion * operation (str): Operation to apply (e.g., “> 5”, “== True”) * label (str): Short label for the exclusion step * operations_done (str): Detailed description of what this exclusion does * tmin (str, optional): Start time for variable extraction * tmax (str, optional): End time for variable extraction

Returns:

CohortTracker object, can be used to plot inclusion chart

Return type:

ct (CohortTracker)

Note

Per default, all inclusion criteria are applied from tmin=cohort.tmin to tmax=cohort.t_eligible. This is recommended to avoid introducing immortality biases. However, in some cases you might want to set custom time bounds.

Examples

>>> ct = cohort.include_list([
...    {
...        "variable": "age_on_admission",
...        "operation": ">= 18",
...        "label": "Adult patients",
...        "operations_done": "Excluded patients under 18 years old"
...    }
...  ])
>>> ct.create_flowchart()
exclude_list(exclusion_list=[])[source]#

Add an exclusion criteria to the cohort.

Parameters:

exclusion_list (list) –

List of exclusion criteria. Each criterion is a dictionary containing:

  • variable (str | Variable): Variable to use for exclusion

  • operation (str): Operation to apply (e.g., “> 5”, “== True”)

  • label (str): Short label for the exclusion step

  • operations_done (str): Detailed description of what this exclusion does

  • tmin (str, optional): Start time for variable extraction

  • tmax (str, optional): End time for variable extraction

Returns:

CohortTracker object, can be used to plot exclusion chart

Return type:

ct (CohortTracker)

Note

Per default, all exclusion criteria are applied from tmin=cohort.tmin to tmax=cohort.t_eligible. This is recommended to avoid introducing immortality biases. However, in some cases you might want to set custom time bounds.

Examples

>>> ct = cohort.exclude_list([
...    {
...        "variable": "any_rrt_icu",
...        "operation": "true",
...        "label": "No RRT",
...        "operations_done": "Excluded RRT before hypernatremia"
...    },
...    {
...        "variable": "any_dx_tbi",
...        "operation": "true",
...        "label": "No TBI",
...        "operations_done": "Excluded TBI before hypernatremia"
...    },
...    {
...        "variable": NativeStatic(
...            var_name="sodium_count",
...            select="!count value",
...            base_var="blood_sodium"),
...        "operation": "< 1",
...        "label": "Final cohort",
...        "operations_done": "Excluded cases with less than 1 sodium measurement after hypernatremia",
...        "tmin": cohort.t_eligible,
...        "tmax": "hospital_discharge"
...    }
...  ])
>>> ct.create_flowchart() # Plot the exclusion flowchart
add_variable_definition(var_name, var_dict)[source]#

Add or update a local variable definition.

Parameters:
  • var_name (str) – Name of the variable.

  • var_dict (dict) – Dictionary containing variable definition. Can be partial - missing fields will be inherited from global definition.

Return type:

None

Examples

Add a completely new variable:

>>> cohort.add_variable_definition("my_new_var", {
...     "type": "native_dynamic",
...     "table": "it_ishmed_labor",
...     "where": "c_katalog_leistungtext LIKE '%new%'",
...     "value_dtype": "DOUBLE",
...     "cleaning": {"value": {"low": 100, "high": 150}}
... })

Partially override existing variable:

>>> cohort.add_variable_definition("blood_sodium", {
...     "where": "c_katalog_leistungtext LIKE '%custom_sodium%'"
... })
add_inclusion()[source]#
Return type:

None

add_exclusion()[source]#
Return type:

None

to_csv(folder)[source]#

Save the cohort to CSV files.

Parameters:

folder (str) – Path to the folder where CSV files will be saved.

Return type:

None

Examples

>>> cohort.to_csv("output_data")
>>> # Creates:
>>> # output_data/_obs.csv
>>> # output_data/blood_sodium.csv
>>> # output_data/heart_rate.csv
>>> # ... (one file per variable)
save(filename)[source]#

Save the cohort to a single compressed .corr2 archive (.tar.zst equivalent). Saves cohort.__dict__ (excluding obs, obsm, variables, conn) to state.pkl, obs to obs.parquet, and each obsm DataFrame to obsm_<var_name>.parquet in a temp dir.

Parameters:

filename (str) – Path to the .corr2 archive

Return type:

None

Returns:

None

classmethod load(filename)[source]#
Parameters:

filename (str)

Return type:

Cohort

property obs: DataFrame#
property obsm: ObsmDict#
property tmpdir_path#
debug_print()[source]#

Print debug information about the cohort. Please use this if you are creating a GitHub issue.

Return type:

None

Returns:

None

to_stata(df=None, convert_dates=None, write_index=True, to_file=None)[source]#

Convert the cohort to a Stata DataFrame. You may use cohort.stata to access the dataframe directly. Note that you need to save it to a top-level variable to access it via %%stata.

Parameters:
  • df (pd.DataFrame) – The DataFrame to be converted to Stata format. Will default to the obs DataFrame if unspecified (default: None)

  • convert_dates (dict[Hashable, str]) – Dictionary of columns to convert to Stata date format.

  • write_index (bool) – Whether to write the index as a column.

  • to_file (str | None) – Path to save as .dta file. If left unspecified, the DataFrame will not be saved.

Returns:

A Pandas Dataframe compatible with Stata if to_file is None.

Return type:

pd.DataFrame

property stata: DataFrame | None#

Convert the cohort to a Stata DataFrame. You may use cohort.stata to access the dataframe directly. Note that you need to save it to a top-level variable to access it via %%stata.

Parameters:
  • df (pd.DataFrame) – The DataFrame to be converted to Stata format. Will default to the obs DataFrame if unspecified (default: None)

  • convert_dates (dict[Hashable, str]) – Dictionary of columns to convert to Stata date format.

  • write_index (bool) – Whether to write the index as a column.

  • to_file (str | None) – Path to save as .dta file. If left unspecified, the DataFrame will not be saved.

Returns:

A Pandas Dataframe compatible with Stata if to_file is None.

Return type:

pd.DataFrame

to_tableone(ignore_cols=[], groupby=None, filter=None, pval=False, normal_cols=[], **kwargs)[source]#

Create a TableOne object for the cohort.

Parameters:
  • ignore_cols (list | str) – Column(s) to ignore.

  • groupby (str) – Column to group by.

  • filter (str) – Filter to apply to the data.

  • pval (bool) – Whether to calculate p-values.

  • normal_cols (list[str]) – Columns to treat as normally distributed.

  • **kwargs – Additional arguments to pass to TableOne.

Returns:

A TableOne object.

Return type:

TableOne

Examples

>>> tableone = cohort.tableone()
>>> print(tableone)
>>> tableone.to_csv("tableone.csv")
>>> tableone = cohort.tableone(groupby="sex", pval=False)
>>> print(tableone)
>>> tableone.to_csv("tableone_sex.csv")
property tableone: TableOne#

Create a TableOne object for the cohort.

Parameters:
  • ignore_cols (list | str) – Column(s) to ignore.

  • groupby (str) – Column to group by.

  • filter (str) – Filter to apply to the data.

  • pval (bool) – Whether to calculate p-values.

  • normal_cols (list[str]) – Columns to treat as normally distributed.

  • **kwargs – Additional arguments to pass to TableOne.

Returns:

A TableOne object.

Return type:

TableOne

Examples

>>> tableone = cohort.tableone()
>>> print(tableone)
>>> tableone.to_csv("tableone.csv")
>>> tableone = cohort.tableone(groupby="sex", pval=False)
>>> print(tableone)
>>> tableone.to_csv("tableone_sex.csv")