Cohort#
Cohort Workflow#
Initialization#
With this snippet, you can initialize a cohort object using the CUB-HDP data source.
cohort = Cohort(
obs_level="icu_stay", # One of: "hospital_stay", "icu_stay", "procedure"
load_default_vars=False, # Optional, defaults to True
sources={
"cub_hdp": {
"database": "db_hypercapnia_prepared",
"password_file": True, # Specify path to file, True assumes ~/password.txt
"merge_consecutive": False
}
},
)
Specify multiple data sources to combine cohorts.
cohort = Cohort(
obs_level="icu_stay", # One of: "hospital_stay", "icu_stay", "procedure"
load_default_vars=False, # Optional, defaults to True
sources={
"cub_hdp": {
"database": "db_hypercapnia_prepared",
"password_file": True, # Specify path to file, True assumes ~/password.txt
"merge_consecutive": False
},
"reprodicu": {
"path": "/data02/projects/reprodicubility/reprodICU/reprodICU_files"
}
},
)
Adding Variables#
# Use pre-defined variables
cohort.add_variable("pf_ratio")
# Create a custom variable on the fly
cohort.add_variable(
NativeStatic(
var_name="median_sodium_before_hn",
select="!median value",
base_var="blood_sodium",
tmin="hospital_admission",
tmax=cohort.t_eligible
)
)
# Perform manual operations on the dataframe
cohort.obs = cohort.obs.with_columns([
pl.col('first_sodium_recordtime').eq(pl.col('first_severe_hypernatremia_recordtime'))
.alias('idx_hypernatremia_was_on_admission')
])
cohort.obs = cohort.obs.with_columns([
pl.when(pl.col('idx_hypernatremia_was_on_admission'))
.then(pl.lit('community_acquired'))
.otherwise(pl.lit('hospital_acquired'))
.alias('hn_origin')
])
Inclusion/Exclusion#
# Add multiple inclusion criteria
cohort.include_list([
{
"variable": "age",
"operation": ">= 18",
"label": "Adult patients"
},
{
"variable": "icu_length_of_stay",
"operation": "> 2",
"label": "ICU stay > 2 days"
}
])
# Add exclusion criteria
cohort.exclude_list([
{
"variable": "any_dx_covid_19",
"operation": "== True",
"label": "Exclude COVID-19 patients"
}
])
# Visualize the inclusion/exclusion criteria
cohort.changetracker.create_flowchart(output_file="inclusion_flowchart.svg")
Exploration#
# Display the cohort dataframe
print(cohort.obs)
# Create a TableOne object
tableone = cohort.tableone(ignore_cols=["icu_id"])
print(tableone)
Data Export#
# Save to file
cohort.save("my_cohort.corr2")
# Load from file
cohort = Cohort.load("my_cohort.corr2")
# Export to CSV
cohort.to_csv("path/to/output_folder")
# Convert to Stata .dta file
cohort.to_stata("path/to/output_folder/my_cohort.dta")
Class Reference#
- class corr_vars.core.cohort.Cohort(obs_level='icu_stay', sources={'cub_hdp': {'conn_args': {'password_file': True}, 'database': 'db_hypercapnia_prepared', 'icu_stay': {'merge_consecutive': True}}}, project_vars={}, load_default_vars=True, logger_args={})[source]#
Bases:
objectClass to build a cohort in the CORR database.
- Parameters:
obs_level (Literal["icu_stay", "hospital_stay", "procedure"]) – Observation level (default: “icu_stay”).
sources (dict[str, dict]) –
Dictionary of data sources to use for data extraction. Available options are “cub_hdp”, “reprodicu”, “co5”. Source configurations:
cub_hdp: Accepted keys: database, password_file, merge_consecutive, filters, conn_args [remote_hostname (str), username (str)]
reprodicu: Accepted keys: path, exclude_datasets
co5: Not implemented yet. Data request is still pending.
Note: reprodicu does not yet implement variable extraction, only cohort data.
project_vars (dict) – Dictionary with local variable definitions (default: {}).
load_default_vars (bool) – Whether to load the default variables (default: True).
logger_args (dict) – Dictionary of Logging configurations [level (int), file_path (str), file_mode (str), verbose_fmt (bool), colored_output (bool), formatted_numbers (bool)] (default: {}).
- obs#
Static data for each observation. Contains one row per observation (e.g., ICU stay) with columns for static variables like demographics and outcomes.
Example
>>> cohort.obs patient_id case_id icu_stay_id icu_admission icu_discharge sex ... inhospital_death 0 P001 C001 C001_1 2023-01-01 08:30:00 2023-01-03 12:00:00 M ... False 1 P001 C001 C001_2 2023-01-03 14:20:00 2023-01-05 16:30:00 M ... False 2 P002 C002 C002_1 2023-01-02 09:15:00 2023-01-04 10:30:00 F ... False 3 P003 C003 C003_1 2023-01-04 11:45:00 2023-01-07 13:20:00 F ... True ...
- Type:
pl.DataFrame
- obsm#
Dynamic data stored as dictionary of DataFrames. Each DataFrame contains time-series data for a variable with columns:
recordtime: Timestamp of the measurement
value: Value of the measurement
recordtime_end: End time (only for duration-based variables like therapies)
description: Additional information (e.g., medication names)
Example
>>> cohort.obsm["blood_sodium"] icu_stay_id recordtime value 0 C001_1 2023-01-01 09:30:00 138 1 C001_1 2023-01-02 10:15:00 141 2 C001_2 2023-01-03 15:00:00 137 3 C002_1 2023-01-02 10:00:00 142 4 C003_1 2023-01-04 12:30:00 139 ...
- Type:
dict of pl.DataFrame
Notes
For large cohorts, set
load_default_vars=Falseto speed up the extraction. You can use pre-extracted cohorts as starting points and load them usingCohort.load().Variables can be added using
cohort.add_variable(). Static variables will be added toobs, dynamic variables toobsm.sources["cub_hdp"]["filters"]also allows a special shorthand “_dx” to extract the hospital admissions of the last x months, useful for debugging/prototyping. For example use “_d2” to extract every admission of the last 2 months.
Examples
Create a new cohort:
>>> cohort = Cohort( ... obs_level="icu_stay", ... load_default_vars=False, ... sources={ ... "cub_hdp": { ... "database": "db_hypercapnia_prepared", ... "password_file": True, ... "merge_consecutive": False}, ... "reprodicu": { ... "path": "/data02/projects/reprodicubility/reprodICU/reprodICU_files"} ... })
Access static data:
>>> cohort.obs.select("age_on_admission") # Get age for all patients >>> cohort.obs.filter(pl.col("sex") == "M") # Filter for male patients
Access time-series data:
>>> cohort.obsm["blood_sodium"] # Get all blood sodium measurements >>> # Get blood sodium measurements for a specific observation >>> cohort.obsm["blood_sodium"].filter(pl.col(cohort.primary_key) == "12345")
-
constant_vars:
list[str]#
-
obs_level:
ObsLevel#
-
primary_key:
str#
-
t_min:
str#
-
t_max:
str#
-
t_eligible:
str#
-
t_outcome:
str#
-
logger_args:
dict[str,Any]#
-
sources:
Final[SourceDict]#
-
project_vars:
dict[str,dict[str,Any]]#
-
tmpdir_manager:
Final[TemporaryDirectoryManager]#
- load_default_vars()[source]#
Load the default variables defined in
vars.json. It is recommended to use this after filtering your cohort for eligibility to speed up the process.- Returns:
Variables are loaded into the cohort.
- Return type:
None
Examples
>>> # Load default variables for an ICU cohort >>> cohort = Cohort(obs_level="icu_stay", load_default_vars=False) >>> # Apply filters first (faster) >>> cohort.include_list([ ... {"variable": "age_on_admission", "operation": ">= 18", "label": "Adults"} ... ]) >>> # Then load default variables >>> cohort.load_default_vars()
- add_variable(variable, save_as=None, tmin=None, tmax=None)[source]#
Add a variable to the cohort.
You may specify tmin and tmax as a tuple (e.g. (“hospital_admission”, “+1d”)), in which case it will be relative to the hospital admission time of the patient.
- Parameters:
variable (
str|Variable|MultiSourceVariable) – Variable to add. Either a string with the variable name (from vars.json) or a Variable object.save_as (
Optional[str]) – Name of the column to save the variable as. Defaults to variable name.tmin (
Union[str,tuple[str,str],None]) – Name of the column to use as tmin or tuple (see description).tmax (
Union[str,tuple[str,str],None]) – Name of the column to use as tmax or tuple (see description).
- Returns:
The variable object.
- Return type:
Examples
>>> cohort.add_variable("blood_sodium")
>>> cohort.add_variable( ... variable="anx_dx_covid_19", ... tmin=("hospital_admission", "-1d"), ... tmax=cohort.t_eligible ... )
>>> cohort.add_variable( ... NativeStatic( ... var_name="highest_hct_before_eligible", ... select="!max value", ... base_var='blood_hematokrit', ... tmax=cohort.t_eligible ... ) ... )
>>> cohort.add_variable( ... variable='any_med_glu', ... save_as="glucose_prior_eligible", ... tmin=(cohort.t_eligible, "-48h"), ... tmax=cohort.t_eligible ... )
- set_t_eligible(t_eligible, drop_ineligible=True)[source]#
Set the time anchor for eligibility. This can be referenced as cohort.t_eligible throughout the process and is required to add inclusion or exclusion criteria.
- Parameters:
t_eligible (
str) – Name of the column to use as t_eligible.drop_ineligible (
bool) – Whether to drop ineligible patients. Defaults to True.
- Returns:
t_eligible is set.
- Return type:
None
Examples
>>> # Add a suitable time-anchor variable >>> cohort.add_variable(NativeStatic( ... var_name="spo2_lt_90", ... base_var="spo2", ... select="!first recordtime", ... where="value < 90", ... )) >>> # Set the time anchor for eligibility >>> cohort.set_t_eligible("spo2_lt_90")
- set_t_outcome(t_outcome)[source]#
Set the time anchor for outcome. This can be referenced as cohort.t_outcome throughout the process and is recommended to specify for your study.
- Parameters:
t_outcome (str) – Name of the column to use as t_outcome.
- Returns:
t_outcome is set.
- Return type:
None
Examples
>>> cohort.set_t_outcome("hospital_discharge")
- change_tracker(description, mode='include')[source]#
Return a context manager to group cohort edits and record a single ChangeTracker state on exit.
Example
- with cohort.change_tracker(“Adults”, mode=”include”) as track:
track.filter(pl.col(“age_on_admission”) >= 18)
- Parameters:
description (
str)mode (
Literal['include','exclude'])
- include(*args, **kwargs)[source]#
Add an inclusion criterion to the cohort. It is recommended to use
Cohort.include_list()and add all of your inclusion criteria at once. However, if you need to specify criteria at a later stage, you can use this method.Warning
You should call
Cohort.include_list()before callingCohort.include()to ensure that the inclusion criteria are properly tracked.- Parameters:
variable (str | Variable)
operation (str)
label (str)
operations_done (str)
[Optional – tmin, tmax]
- Returns:
Criterion is added to the cohort.
- Return type:
None
Note
operationis passed topandas.DataFrame.query, which uses a slightly modified Python syntax. Also, if you specify “true”/”True” or “false”/”False” as a value foroperation, it will be converted to “== True” or “== False”, respectively.Examples
>>> cohort.include( ... variable="age_on_admission", ... operation=">= 18", ... label="Adult", ... operations_done="Include only adult patients" ... )
- exclude(*args, **kwargs)[source]#
Add an exclusion criterion to the cohort. It is recommended to use
Cohort.exclude_list()and add all of your exclusion criteria at once. However, if you need to specify criteria at a later stage, you can use this method.Warning
You should call
Cohort.exclude_list()before callingCohort.exclude()to ensure that the exclusion criteria are properly tracked.- Parameters:
variable (str | Variable)
operation (str)
label (str)
operations_done (str)
[Optional – tmin, tmax]
- Returns:
Criterion is added to the cohort.
- Return type:
None
Note
operationis passed topandas.DataFrame.query, which uses a slightly modified Python syntax. Also, if you specify “true”/”True” or “false”/”False” as a value foroperation, it will be converted to “== True” or “== False”, respectively.Examples
>>> cohort.exclude( ... variable="elix_total", ... operation="> 20", ... operations_done="Exclude patients with high Elixhauser score" ... )
- include_list(inclusion_list=[])[source]#
Add an inclusion criteria to the cohort.
- Parameters:
inclusion_list (list) – List of inclusion criteria. Must include a dictionary with keys: *
variable(str | Variable): Variable to use for exclusion *operation(str): Operation to apply (e.g., “> 5”, “== True”) *label(str): Short label for the exclusion step *operations_done(str): Detailed description of what this exclusion does *tmin(str, optional): Start time for variable extraction *tmax(str, optional): End time for variable extraction- Returns:
CohortTracker object, can be used to plot inclusion chart
- Return type:
ct (CohortTracker)
Note
Per default, all inclusion criteria are applied from
tmin=cohort.tmintotmax=cohort.t_eligible. This is recommended to avoid introducing immortality biases. However, in some cases you might want to set custom time bounds.Examples
>>> ct = cohort.include_list([ ... { ... "variable": "age_on_admission", ... "operation": ">= 18", ... "label": "Adult patients", ... "operations_done": "Excluded patients under 18 years old" ... } ... ]) >>> ct.create_flowchart()
- exclude_list(exclusion_list=[])[source]#
Add an exclusion criteria to the cohort.
- Parameters:
exclusion_list (list) –
List of exclusion criteria. Each criterion is a dictionary containing:
variable(str | Variable): Variable to use for exclusionoperation(str): Operation to apply (e.g., “> 5”, “== True”)label(str): Short label for the exclusion stepoperations_done(str): Detailed description of what this exclusion doestmin(str, optional): Start time for variable extractiontmax(str, optional): End time for variable extraction
- Returns:
CohortTracker object, can be used to plot exclusion chart
- Return type:
ct (CohortTracker)
Note
Per default, all exclusion criteria are applied from
tmin=cohort.tmintotmax=cohort.t_eligible. This is recommended to avoid introducing immortality biases. However, in some cases you might want to set custom time bounds.Examples
>>> ct = cohort.exclude_list([ ... { ... "variable": "any_rrt_icu", ... "operation": "true", ... "label": "No RRT", ... "operations_done": "Excluded RRT before hypernatremia" ... }, ... { ... "variable": "any_dx_tbi", ... "operation": "true", ... "label": "No TBI", ... "operations_done": "Excluded TBI before hypernatremia" ... }, ... { ... "variable": NativeStatic( ... var_name="sodium_count", ... select="!count value", ... base_var="blood_sodium"), ... "operation": "< 1", ... "label": "Final cohort", ... "operations_done": "Excluded cases with less than 1 sodium measurement after hypernatremia", ... "tmin": cohort.t_eligible, ... "tmax": "hospital_discharge" ... } ... ]) >>> ct.create_flowchart() # Plot the exclusion flowchart
- add_variable_definition(var_name, var_dict)[source]#
Add or update a local variable definition.
- Parameters:
var_name (str) – Name of the variable.
var_dict (dict) – Dictionary containing variable definition. Can be partial - missing fields will be inherited from global definition.
- Return type:
None
Examples
Add a completely new variable:
>>> cohort.add_variable_definition("my_new_var", { ... "type": "native_dynamic", ... "table": "it_ishmed_labor", ... "where": "c_katalog_leistungtext LIKE '%new%'", ... "value_dtype": "DOUBLE", ... "cleaning": {"value": {"low": 100, "high": 150}} ... })
Partially override existing variable:
>>> cohort.add_variable_definition("blood_sodium", { ... "where": "c_katalog_leistungtext LIKE '%custom_sodium%'" ... })
- to_csv(folder)[source]#
Save the cohort to CSV files.
- Parameters:
folder (str) – Path to the folder where CSV files will be saved.
- Return type:
None
Examples
>>> cohort.to_csv("output_data") >>> # Creates: >>> # output_data/_obs.csv >>> # output_data/blood_sodium.csv >>> # output_data/heart_rate.csv >>> # ... (one file per variable)
- save(filename)[source]#
Save the cohort to a single compressed .corr2 archive (.tar.zst equivalent). Saves cohort.__dict__ (excluding obs, obsm, variables, conn) to state.pkl, obs to obs.parquet, and each obsm DataFrame to obsm_<var_name>.parquet in a temp dir.
- Parameters:
filename (
str) – Path to the .corr2 archive- Return type:
None- Returns:
None
- property obs: DataFrame#
- property obsm: ObsmDict#
- property tmpdir_path#
- debug_print()[source]#
Print debug information about the cohort. Please use this if you are creating a GitHub issue.
- Return type:
None- Returns:
None
- to_stata(df=None, convert_dates=None, write_index=True, to_file=None)[source]#
Convert the cohort to a Stata DataFrame. You may use cohort.stata to access the dataframe directly. Note that you need to save it to a top-level variable to access it via %%stata.
- Parameters:
df (pd.DataFrame) – The DataFrame to be converted to Stata format. Will default to the obs DataFrame if unspecified (default: None)
convert_dates (dict[Hashable, str]) – Dictionary of columns to convert to Stata date format.
write_index (bool) – Whether to write the index as a column.
to_file (str | None) – Path to save as .dta file. If left unspecified, the DataFrame will not be saved.
- Returns:
A Pandas Dataframe compatible with Stata if to_file is None.
- Return type:
pd.DataFrame
- property stata: DataFrame | None#
Convert the cohort to a Stata DataFrame. You may use cohort.stata to access the dataframe directly. Note that you need to save it to a top-level variable to access it via %%stata.
- Parameters:
df (pd.DataFrame) – The DataFrame to be converted to Stata format. Will default to the obs DataFrame if unspecified (default: None)
convert_dates (dict[Hashable, str]) – Dictionary of columns to convert to Stata date format.
write_index (bool) – Whether to write the index as a column.
to_file (str | None) – Path to save as .dta file. If left unspecified, the DataFrame will not be saved.
- Returns:
A Pandas Dataframe compatible with Stata if to_file is None.
- Return type:
pd.DataFrame
- to_tableone(ignore_cols=[], groupby=None, filter=None, pval=False, normal_cols=[], **kwargs)[source]#
Create a TableOne object for the cohort.
- Parameters:
ignore_cols (list | str) – Column(s) to ignore.
groupby (str) – Column to group by.
filter (str) – Filter to apply to the data.
pval (bool) – Whether to calculate p-values.
normal_cols (list[str]) – Columns to treat as normally distributed.
**kwargs – Additional arguments to pass to TableOne.
- Returns:
A TableOne object.
- Return type:
TableOne
Examples
>>> tableone = cohort.tableone() >>> print(tableone) >>> tableone.to_csv("tableone.csv")
>>> tableone = cohort.tableone(groupby="sex", pval=False) >>> print(tableone) >>> tableone.to_csv("tableone_sex.csv")
- property tableone: TableOne#
Create a TableOne object for the cohort.
- Parameters:
ignore_cols (list | str) – Column(s) to ignore.
groupby (str) – Column to group by.
filter (str) – Filter to apply to the data.
pval (bool) – Whether to calculate p-values.
normal_cols (list[str]) – Columns to treat as normally distributed.
**kwargs – Additional arguments to pass to TableOne.
- Returns:
A TableOne object.
- Return type:
TableOne
Examples
>>> tableone = cohort.tableone() >>> print(tableone) >>> tableone.to_csv("tableone.csv")
>>> tableone = cohort.tableone(groupby="sex", pval=False) >>> print(tableone) >>> tableone.to_csv("tableone_sex.csv")