Utils#

Here you can find generally useful functions that are utilized across the module and may also be used for individual variable definitions.

corr_vars.utils.helpers.filter_by_condition(df, expression, description='filter_by_condition call', verbose=True, mode='drop')[source]#

Drop rows from a DataFrame based on a condition function.

Parameters:
  • df (pl.DataFrame) – The input DataFrame.

  • expression (Union[Expr, Series, str, Iterable[Union[Expr, Series, str]], bool, list[bool], ndarray[Any, Any]]) – Expression(s) that evaluate to a boolean Series.

  • description (str) – Description of the condition.

  • mode (str) – Whether to drop or keep the rows. Can be “drop” or “keep”.

  • verbose (bool)

Returns:

The DataFrame with rows dropped based on the condition.

Return type:

pd.DataFrame

corr_vars.utils.helpers.df_find_closest(group, to_col, tdelta='0', pm_before='52w', pm_after='52w')[source]#

Find the closest record to the target time.

Parameters:
  • group (pd.DataFrame) – The input dataframe.

  • to_col (str) – The column containing the target time.

  • tdelta (str) – The time delta. Defaults to “0”.

  • pm_before (str) – The time range before the target time. Defaults to “52w”.

  • pm_after (str) – The time range after the target time. Defaults to “52w”.

Returns:

The closest record.

Return type:

pd.Series[Any] | pd.DataFrame

corr_vars.utils.helpers.aggregate_column(group, agg_func, column, params=[])[source]#
corr_vars.utils.helpers.get_cb(df, join_key='case_id', primary_key='case_id', tmin_col='tmin', tmax_col='tmax', ttarget_col=None, aliases=('tmin', 'tmax'))[source]#

Get the case bounds Dataframe. This is used to apply a time filter to a native dynamic variable.

Parameters:
  • df (pd.DataFrame | pl.DataFrame) – The cohort dataframe.

  • join_key (str) – The join key column. Defaults to “case_id”.

  • primary_key (str) – The primary key column. Defaults to “case_id”.

  • tmin_col (str) – The column containing the tmin. Defaults to “tmin”.

  • tmax_col (str) – The column containing the tmax. Defaults to “tmax”.

  • ttarget_col (str | None) – The column containing the target time. Defaults to None. (For !closest)

  • aliases (tuple[str, str]) – The aliases for tmin and tmax. Defaults to (“tmin”, “tmax”).

Returns:

The case bounds Dataframe with the same type as df.

Return type:

result

corr_vars.utils.helpers.pl_parse_time_args(obs, time_window=None)[source]#

Parse the time arguments to get the time series.

Parameters:
  • obs (pd.DataFrame | pl.DataFrame) – The observation dataframe.

  • time_window (TimeWindow) – The time_window argument. Defaults to None.

Returns:

The observation dataframe with the same type as obs with tmin and tmax columns.

Return type:

result

corr_vars.utils.helpers.remove_invalid_time_window(obs, time_window)[source]#
Parameters:
  • obs (TypeVar(PolarsFrame, DataFrame, LazyFrame))

  • time_window (TimeWindow)

Return type:

TypeVar(PolarsFrame, DataFrame, LazyFrame)

corr_vars.utils.helpers.add_time_window_expr(obs, time_window, aliases=('tmin', 'tmax'))[source]#
Parameters:
  • obs (TypeVar(PolarsFrame, DataFrame, LazyFrame))

  • time_window (TimeWindow)

  • aliases (tuple[str, str])

Return type:

TypeVar(PolarsFrame, DataFrame, LazyFrame)

corr_vars.utils.helpers.extract_df_data(df, col_dict=None, filter_dict=None, exact_match=False, remove_prefix=False, drop=False)[source]#

Extracts data from a DataFrame.

Parameters:
  • df (pandas.DataFrame) – The DataFrame to operate on.

  • col_dict (dict, optional) – A dictionary mapping column names to new names. Defaults to None.

  • (dict[str (filter_dict) – str], optional): A dictionary where keys are column names and values are lists of values to filter rows by (may include regex pattern for exact_match=False). Defaults to None.

  • list – str], optional): A dictionary where keys are column names and values are lists of values to filter rows by (may include regex pattern for exact_match=False). Defaults to None.

  • exact_match (bool, optional) – If True, performs exact matching when filtering. Defaults to False.

  • remove_prefix (bool, optional) – If True, removes prefix from default_key. Defaults to False.

  • drop (bool, optional) – If True, drop all columns not specified in col_dict.

  • filter_dict (Optional[dict[str, list[str]]])

Returns:

A DataFrame containing the extracted data from the original DataFrame.

Return type:

pandas.DataFrame

corr_vars.utils.helpers.merge_consecutive(data, primary_key, recordtime='recordtime', recordtime_end='recordtime_end', time_threshold=Timedelta('0 days 00:30:00'))[source]#

Combine consecutive sessions (<30min separation) of ecmo_vv_icu into a single session.

Parameters:
  • data (pd.DataFrame) – The data to merge.

  • primary_key (str) – The primary key column. Deprecated! Will not be used, case_id will be used instead.

  • recordtime (str) – The recordtime column. Defaults to “recordtime”.

  • recordtime_end (str) – The recordtime_end column. Defaults to “recordtime_end”.

  • time_threshold (pd.Timedelta) – The time threshold. Defaults to 30 minutes.

Returns:

The merged data.

Return type:

pd.DataFrame

corr_vars.utils.helpers.clean_timeseries(data, max_roc, time_col='recordtime', value_col='value', identifier_col='icu_stay_id')[source]#

Clean timeseries by removing values that create impossible rates of change using vectorized operations with Polars for maximum performance.

Parameters:
  • data (pl.DataFrame) – The data to clean.

  • max_roc (float) – The maximum rate of change.

  • time_col (str) – The time column.

  • value_col (str) – The value column.

  • identifier_col (str) – The identifier column.

Returns:

The cleaned data with the same type as data.

Return type:

result

corr_vars.utils.helpers.harmonize_str_list_cols(dfs)[source]#

Harmonises columns with str and list[str] dtypes in different DataFrames by converting str to list[str] as well.

Parameters:

dfs (Iterable[DataFrame])

Return type:

list[DataFrame]

corr_vars.utils.helpers.guess_variable_source(variable)[source]#

Guess the source of a variable.

Parameters:

variable (Variable)

Return type:

str | None

corr_vars.utils.helpers.deep_merge(base, override)[source]#

Recursively merge override into base with override taking precedence. Works on normal dictionaries, compatible with TypedDict runtime dicts.

Parameters:
  • base (TypeVar(T, bound= MutableMapping[str, Any]))

  • override (Mapping[str, Any])

Return type:

TypeVar(T, bound= MutableMapping[str, Any])