CUB-HDP Data Source#
The CUB-HDP (Charité University Berlin - Health Data Platform) source provides access to the clinical data warehouse based on Hadoop/Impala infrastructure. This is the primary data source for extracting patient data from Charité’s electronic health records.
Overview#
The CUB-HDP source connects to the Charité Health Data Lake (HDL) and provides access to:
Patient demographics and administrative data
Laboratory values and vital signs
Medication administration records
Therapy and procedure documentation
Diagnostic codes (ICD-10)
Imaging and reporting data
The data is organized in a star schema with fact tables containing the measurements and dimension tables providing contextual information.
Configuration#
Database Access#
from corr_vars import Cohort
# Basic configuration with password file
cohort = Cohort(
obs_level="icu_stay",
sources={
"cub_hdp": {
"database": "db_hypercapnia_prepared",
"password_file": True, # Uses ~/password.txt
"merge_consecutive": True
}
}
)
Password Management#
For security, passwords should be stored in files rather than in code:
# Using custom password file location
cohort = Cohort(
sources={
"cub_hdp": {
"database": "db_hypercapnia_prepared",
"password_file": "/path/to/my/password.txt"
}
}
)
# Using connection arguments for custom server
cohort = Cohort(
sources={
"cub_hdp": {
"database": "db_hypercapnia_prepared",
"conn_args": {
"remote_hostname": "custom-server.charite.de",
"username": "myusername"
}
}
}
)
Data Filtering#
Apply initial filters to limit the cohort scope:
# Filter by admission date range
cohort = Cohort(
sources={
"cub_hdp": {
"database": "db_hypercapnia_prepared",
"password_file": True,
"filters": "c_aufnahme >= '2023-01-01' AND c_aufnahme < '2024-01-01'"
}
}
)
# Use shorthand for recent admissions (last 2 months)
cohort = Cohort(
sources={
"cub_hdp": {
"database": "db_hypercapnia_prepared",
"password_file": True,
"filters": "_d2" # Special shorthand
}
}
)
Variable Types#
Native Dynamic Variables#
Extract time-series data directly from database tables:
from corr_vars.sources.cub_hdp import Variable
# Extract blood pressure measurements
bp_var = Variable(
var_name="blood_pressure_sys",
table="it_copra6_hierachy_v2",
where="c_katalog_leistungtext LIKE '%Blutdruck syst%'",
value_dtype="DOUBLE",
cleaning={"value": {"low": 50, "high": 300}},
dynamic=True
)
Complex Variables#
For variables requiring custom Python logic:
# Custom variable with Python function
complex_var = Variable(
var_name="modified_sofa_score",
requires=["blood_creatinine", "blood_bilirubin", "gcs_score"],
complex=True,
dynamic=False
)
# The Python function would be defined in variables.py:
# def modified_sofa_score(var, cohort):
# # Custom calculation logic here
# return calculated_data
Available Databases#
Database Name |
Purpose |
Description |
|---|---|---|
db_hypercapnia_prepared |
General ICU research |
Main database with comprehensive ICU and hospital data |
db_corror_prepared |
Outcomes research |
Specialized database for outcomes research projects |
Data Tables#
Key tables available in the CUB-HDP source:
- Patient Management
it_ishmed_fall: Hospital admissions and demographicsit_ishmed_bewegung: Patient transfers and bed movements
- Laboratory Data
it_ishmed_labor: Laboratory test resultsit_copra6_hierachy_v2: COPRA vital signs and monitoring data
- Medications
it_ishmed_verordnung: Medication ordersit_copra6_therapy: ICU therapy documentation
- Procedures
it_ishmed_ops: Procedure codes (OPS)it_ishmed_icd: Diagnostic codes (ICD-10)
Complete Example#
from corr_vars import Cohort
from corr_vars.sources.aggregation import NativeStatic
# Initialize cohort with CUB-HDP source
cohort = Cohort(
obs_level="icu_stay",
load_default_vars=False,
sources={
"cub_hdp": {
"database": "db_hypercapnia_prepared",
"password_file": True,
"merge_consecutive": True,
"filters": "c_aufnahme >= '2023-01-01'"
}
}
)
print(f"Loaded {len(cohort.obs)} ICU stays")
# Add some common variables
cohort.add_variable("blood_sodium") # Dynamic variable
cohort.add_variable("age_on_admission") # Static variable
# Create custom aggregated variable
cohort.add_variable(
NativeStatic(
var_name="first_sodium_value",
select="!first value",
base_var="blood_sodium"
)
)
# Set eligibility time anchor
cohort.add_variable(
NativeStatic(
var_name="hypernatremia_onset",
select="!first recordtime",
base_var="blood_sodium",
where="value > 145"
)
)
cohort.set_t_eligible("hypernatremia_onset")
# Apply inclusion criteria
cohort.include_list([
{
"variable": "age_on_admission",
"operation": ">= 18",
"label": "Adult patients"
}
])
print(f"Final cohort: {len(cohort.obs)} patients")