Utils#
Here you can find generally useful functions that are utilized across the module and may also be used for individual variable definitions.
- corr_vars.utils.helpers.filter_by_condition(df, expression, description='filter_by_condition call', verbose=True, mode='drop')[source]#
Drop rows from a DataFrame based on a condition function.
- Parameters:
df (pl.DataFrame) – The input DataFrame.
expression (
Union[Expr,Series,str,Iterable[Union[Expr,Series,str]],bool,list[bool],ndarray[Any,Any]]) – Expression(s) that evaluate to a boolean Series.description (str) – Description of the condition.
mode (str) – Whether to drop or keep the rows. Can be “drop” or “keep”.
verbose (
bool)
- Returns:
The DataFrame with rows dropped based on the condition.
- Return type:
pd.DataFrame
- corr_vars.utils.helpers.df_find_closest(group, to_col, tdelta='0', pm_before='52w', pm_after='52w')[source]#
Find the closest record to the target time.
- Parameters:
group (pd.DataFrame) – The input dataframe.
to_col (str) – The column containing the target time.
tdelta (str) – The time delta. Defaults to “0”.
pm_before (str) – The time range before the target time. Defaults to “52w”.
pm_after (str) – The time range after the target time. Defaults to “52w”.
- Returns:
The closest record.
- Return type:
pd.Series[Any] | pd.DataFrame
- corr_vars.utils.helpers.get_cb(df, join_key='case_id', primary_key='case_id', tmin_col='tmin', tmax_col='tmax', ttarget_col=None)[source]#
Get the case bounds Dataframe. This is used to apply a time filter to a native dynamic variable.
- Parameters:
df (pd.DataFrame | pl.DataFrame) – The cohort dataframe.
join_key (str) – The join key column. Defaults to “case_id”.
primary_key (str) – The primary key column. Defaults to “case_id”.
tmin_col (str) – The column containing the tmin. Defaults to “tmin”.
tmax_col (str) – The column containing the tmax. Defaults to “tmax”.
ttarget_col (str | None) – The column containing the target time. Defaults to None. (For !closest)
- Returns:
The case bounds Dataframe with the same type as df.
- Return type:
result
- corr_vars.utils.helpers.pl_parse_time_args(obs, tmin=None, tmax=None)[source]#
Parse the time arguments to get the time series.
- Parameters:
obs (pd.DataFrame | pl.DataFrame) – The observation dataframe.
tmin (TimeBoundColumn | None) – The tmin argument. Defaults to None.
tmax (TimeBoundColumn | None) – The tmax argument. Defaults to None.
- Returns:
The observation dataframe with the same type as obs with tmin and tmax columns.
- Return type:
result
- corr_vars.utils.helpers.extract_df_data(df, col_dict=None, filter_dict=None, exact_match=False, remove_prefix=False, drop=False)[source]#
Extracts data from a DataFrame.
- Parameters:
df (pandas.DataFrame) – The DataFrame to operate on.
col_dict (dict, optional) – A dictionary mapping column names to new names. Defaults to None.
(dict[str (filter_dict) – str], optional): A dictionary where keys are column names and values are lists of values to filter rows by (may include regex pattern for exact_match=False). Defaults to None.
list – str], optional): A dictionary where keys are column names and values are lists of values to filter rows by (may include regex pattern for exact_match=False). Defaults to None.
exact_match (bool, optional) – If True, performs exact matching when filtering. Defaults to False.
remove_prefix (bool, optional) – If True, removes prefix from default_key. Defaults to False.
drop (bool, optional) – If True, drop all columns not specified in col_dict.
filter_dict (
Optional[dict[str,list[str]]])
- Returns:
A DataFrame containing the extracted data from the original DataFrame.
- Return type:
pandas.DataFrame
- corr_vars.utils.helpers.merge_consecutive(data, primary_key, recordtime='recordtime', recordtime_end='recordtime_end', time_threshold=Timedelta('0 days 00:30:00'))[source]#
Combine consecutive sessions (<30min separation) of ecmo_vv_icu into a single session.
- Parameters:
data (pd.DataFrame) – The data to merge.
primary_key (str) – The primary key column. Deprecated! Will not be used, case_id will be used instead.
recordtime (str) – The recordtime column. Defaults to “recordtime”.
recordtime_end (str) – The recordtime_end column. Defaults to “recordtime_end”.
time_threshold (pd.Timedelta) – The time threshold. Defaults to 30 minutes.
- Returns:
The merged data.
- Return type:
pd.DataFrame
- corr_vars.utils.helpers.interval_bucket_agg(data, primary_key='case_id', recordtime='recordtime', recordtime_end=None, recordtime_end_gap_treshold=None, recordtime_end_gap_behavior='drop', duration_empty_impute_duration=None, value='value', tmin=None, tmax=None, t0=None, t_missing_behavior='drop', granularity='1h', same_bin_aggregation=None)[source]#
Bins intervals clipped between tmin and tmax into bins aligned to t0 and aggregates on these bins.
- Parameters:
data (pl.DataFrame | pl.LazyFrame) – The data to transform.
primary_key (str) – The primary key column. Defaults to “case_id”.
recordtime (str) – The recordtime column. Defaults to “recordtime”.
recordtime_end (str | None) – The recordtime_end column. Defaults to None. Will impute recordtime_end from the next recordtime if set to None.
recordtime_end_gap_treshold (str | None) – Maximum allowed gap for recordtime_end. Only applies if recordtime_end is set to None.
recordtime_end_gap_behavior (str | None) – Determines whether to impute or drop rows when the recordtime_end_gap_treshold is exceeded.
duration_empty_impute_duration (str | None) – The duration to be imputed by offsetting recordtime_end. Drops rows if set to default of None or ≤0s.
value (str) – The value column. Defaults to “value”.
tmin (str | None) – The tmin column. Defaults to None.
tmax (str | None) – The tmax column. Defaults to None.
t0 (str | None) – The t0 column. Bins will align to this datetime col if specified. Defaults to None.
t_missing_behavior (str | None) – Determines whether to drop or keep rows when t0, tmin or tmax columns are specified and values are missing in data. Defaults to “drop”.
granularity (str | None) – The size of the bucket intervals. Defaults to 1 hour.
same_bin_aggregation (
Optional[Callable[[Expr],Expr]]) – (Callable[[pl.Expr], pl.Expr] | None): The aggregation to perform on value for each bin. Uses pl.sum() if set to None.
Note
The granularity, recordtime_end_impute_gap_treshold and duration_empty_impute_duration arguments are created with the following string language:
1ns (1 nanosecond) # nanoseconds are not supported by our DataFrames.
1us (1 microsecond)
1ms (1 millisecond)
1s (1 second)
1m (1 minute)
1h (1 hour)
1d (1 calendar day)
1w (1 calendar week)
1mo (1 calendar month)
1q (1 calendar quarter)
1y (1 calendar year)
Or combine them: “3d12h4m25s” # 3 days, 12 hours, 4 minutes, and 25 seconds
By “calendar day”, we mean the corresponding time on the next day (which may not be 24 hours, due to daylight savings). Similarly for “calendar week”, “calendar month”, “calendar quarter”, and “calendar year”.
If you do not wish to use “calendar day” or any other “calendar” durations, specify the duration in hours or smaller units instead.
- Returns:
The clipped and binned data. Same type as data.
- Return type:
clip_bin_data (pl.DataFrame | pl.LazyFrame)
- corr_vars.utils.helpers.clean_timeseries(data, max_roc, time_col='recordtime', value_col='value', identifier_col='icu_stay_id')[source]#
Clean timeseries by removing values that create impossible rates of change using vectorized operations with Polars for maximum performance.
- Parameters:
data (pl.DataFrame) – The data to clean.
max_roc (float) – The maximum rate of change.
time_col (str) – The time column.
value_col (str) – The value column.
identifier_col (str) – The identifier column.
- Returns:
The cleaned data with the same type as data.
- Return type:
result
- corr_vars.utils.helpers.harmonize_str_list_cols(dfs)[source]#
- Parameters:
dfs (
Iterable[DataFrame])- Return type:
list[DataFrame]
- corr_vars.utils.helpers.guess_variable_source(variable)[source]#
Guess the source of a variable.
- Parameters:
variable (
Variable)- Return type:
str|None
- corr_vars.utils.helpers.find_unknown_keys(user_cfg, schema, prefix='', warnings=None)[source]#
- Parameters:
user_cfg (
Mapping[str,Any])schema (
Mapping[str,Any])prefix (
str)warnings (
Optional[list[str]])
- Return type:
list[str]
- corr_vars.utils.helpers.deep_merge(base, override)[source]#
Recursively merge override into base with override taking precedence. Works on normal dictionaries, compatible with TypedDict runtime dicts.
- Parameters:
base (
TypeVar(T, bound=MutableMapping[str,Any]))override (
Mapping[str,Any])
- Return type:
TypeVar(T, bound=MutableMapping[str,Any])