CUB-HDP Data Source#

The CUB-HDP (Charité University Berlin - Health Data Platform) source provides access to the clinical data warehouse based on Hadoop/Impala infrastructure. This is the primary data source for extracting patient data from Charité’s electronic health records.

Overview#

The CUB-HDP source connects to the Charité Health Data Lake (HDL) and provides access to:

  • Patient demographics and administrative data

  • Laboratory values and vital signs

  • Medication administration records

  • Therapy and procedure documentation

  • Diagnostic codes (ICD-10)

  • Imaging and reporting data

The data is organized in a star schema with fact tables containing the measurements and dimension tables providing contextual information.

Configuration#

Database Access#

from corr_vars import Cohort

# Basic configuration with password file
cohort = Cohort(
    obs_level="icu_stay",
    sources={
        "cub_hdp": {
            "database": "db_hypercapnia_prepared",
            "conn_args": {"password_file": True},  # True uses ~/password.txt
            "icu_stay": {"merge_consecutive": True},
        }
    }
)

Password Management#

Passwords and connection settings go in the conn_args sub-dict:

# Using a custom password file path
cohort = Cohort(
    sources={
        "cub_hdp": {
            "database": "db_hypercapnia_prepared",
            "conn_args": {"password_file": "/path/to/my/password.txt"},
        }
    }
)

# Custom server with explicit credentials
cohort = Cohort(
    sources={
        "cub_hdp": {
            "database": "db_hypercapnia_prepared",
            "conn_args": {
                "remote_hostname": "custom-server.charite.de",
                "username": "myusername",
                "password_file": True,
            },
        }
    }
)

Data Filtering#

Apply initial filters via the filter sub-dict:

# Filter by admission date range
cohort = Cohort(
    sources={
        "cub_hdp": {
            "database": "db_hypercapnia_prepared",
            "conn_args": {"password_file": True},
            "filter": {
                "extraction_start_date": "2023-01-01",
                "extraction_end_date": "2024-01-01",
            },
        }
    }
)

# Add an arbitrary SQL predicate on top of date filters
cohort = Cohort(
    sources={
        "cub_hdp": {
            "database": "db_hypercapnia_prepared",
            "conn_args": {"password_file": True},
            "filter": {
                "extraction_start_date": "2023-01-01",
                "additional_filters": "c_station LIKE '%ITS%'",
                # "_d2" shorthand (last 2 months) is also valid in additional_filters
            },
        }
    }
)

# Convenience filters (applied on top of date range)
cohort = Cohort(
    sources={
        "cub_hdp": {
            "database": "db_hypercapnia_prepared",
            "conn_args": {"password_file": True},
            "filter": {
                "include_adults_only": True,
                "exclude_dhzb": True,
                "exclude_brain_death": True,
            },
        }
    }
)

Variable Types#

Native Dynamic Variables#

Extract time-series data directly from database tables:

from corr_vars.sources.cub_hdp import Variable

# Extract blood pressure measurements
bp_var = Variable(
    var_name="blood_pressure_sys",
    table="it_copra6_hierachy_v2",
    where="c_katalog_leistungtext LIKE '%Blutdruck syst%'",
    value_dtype="DOUBLE",
    cleaning={"value": {"low": 50, "high": 300}},
    dynamic=True
)

Complex Variables#

For variables requiring custom Python logic:

# Custom variable with Python function
complex_var = Variable(
    var_name="modified_sofa_score",
    requires=["blood_creatinine", "blood_bilirubin", "gcs_score"],
    complex=True,
    dynamic=False
)

# The Python function would be defined in variables.py:
# def modified_sofa_score(var, cohort):
#     # Custom calculation logic here
#     return calculated_data

Available Databases#

Available Databases#

Database Name

Purpose

Description

db_hypercapnia_prepared

General ICU research

Main database with comprehensive ICU and hospital data

db_corror_prepared

Outcomes research

Specialized database for outcomes research projects

Data Tables#

Key tables available in the CUB-HDP source:

Patient Management
  • it_ishmed_fall: Hospital admissions and demographics

  • it_ishmed_bewegung: Patient transfers and bed movements

Laboratory Data
  • it_ishmed_labor: Laboratory test results

  • it_copra6_hierachy_v2: COPRA vital signs and monitoring data

Medications
  • it_ishmed_verordnung: Medication orders

  • it_copra6_therapy: ICU therapy documentation

Procedures
  • it_ishmed_ops: Procedure codes (OPS)

  • it_ishmed_icd: Diagnostic codes (ICD-10)

Complete Example#

from corr_vars import Cohort
from corr_vars.sources.aggregation import NativeStatic

# Initialize cohort with CUB-HDP source
cohort = Cohort(
    obs_level="icu_stay",
    load_default_vars=False,
    sources={
        "cub_hdp": {
            "database": "db_hypercapnia_prepared",
            "conn_args": {"password_file": True},
            "icu_stay": {"merge_consecutive": True},
            "filter": {"extraction_start_date": "2023-01-01"},
        }
    }
)

print(f"Loaded {len(cohort.obs)} ICU stays")

# Add some common variables
cohort.add_variable("blood_sodium")  # Dynamic variable
cohort.add_variable("age_on_admission")  # Static variable

# Create custom aggregated variable
cohort.add_variable(
    NativeStatic(
        var_name="first_sodium_value",
        select="!first value",
        base_var="blood_sodium"
    )
)

# Set eligibility time anchor
cohort.add_variable(
    NativeStatic(
        var_name="hypernatremia_onset",
        select="!first recordtime",
        base_var="blood_sodium",
        where="value > 145"
    )
)
cohort.set_t_eligible("hypernatremia_onset")

# Apply inclusion criteria
cohort.include_list([
    {
        "variable": "age_on_admission",
        "operation": ">= 18",
        "label": "Adult patients"
    }
])

print(f"Final cohort: {len(cohort.obs)} patients")

Class Reference#

class corr_vars.sources.cub_hdp.extract.NativeDynamic(var_name, dynamic, requires=[], tmin=None, tmax=None, py=None, py_ready_polars=False, cleaning=None, allow_caching=True, screened_obs_level='hospital_stay', table=None, where=None, value_dtype=None, columns=None)[source]#

Bases: CubHDPDynamic

SQL-backed dynamic variable with the full CubHDPDynamic pipeline.

Wraps a NativeExtractor to provide caching, time-filtering, cleaning, and optional py post-processing.

Parameters:
  • table (str | None) – HDP table name.

  • where (str | None) – SQL WHERE clause appended to the generated query.

  • value_dtype (str | None) – Optional polars dtype name applied to the value column after extraction (e.g. "Float64").

  • columns (dict[str, str | dict[str, str]] | None) – Column alias/cast mapping for the query.

All other args are forwarded to CubHDPDynamic.

Note

Only use this class for defining custom variables. For CORR-specified variables, use cohort.add_variable() directly.

Parameters:
  • var_name (str)

  • dynamic (bool)

  • requires (list[str] | dict[str, RequirementsDict])

  • tmin (str | tuple[str, str] | None)

  • tmax (str | tuple[str, str] | None)

  • py (VariableCallable | None)

  • py_ready_polars (bool)

  • cleaning (dict[str, dict[Literal['low', 'high'], Any]] | None)

  • allow_caching (bool)

  • screened_obs_level (Literal['patient', 'hospital_stay'])

extract_from_db(cohort, id_column)[source]#

Return raw variable data; called by extract() on a cache miss.

The returned DataFrame must not yet have time-window columns or cleaning applied — those are handled by the shared pipeline.

Parameters:
  • cohort (Cohort)

  • id_column (Literal['case_id', 'patient_id'])

Return type:

DataFrame