Airflow DAG for processing OMOP EHR deliveries in Cloud Composer.
The DAG id is ehr-pipeline.
For each site delivery, the pipeline:
- Finds the most recent delivery folder in the site's GCS bucket.
- Exports Connect reference data for the site.
- Converts source files to Parquet.
- Validates and normalizes OMOP files.
- Upgrades delivered CDM versions when required.
- Filters participants using Connect status data.
- Runs vocabulary harmonization for supported clinical tables.
- Generates derived tables.
- Loads the processed data to BigQuery.
- Runs DQD, Achilles, PASS, and delivery reporting.
This repository contains the Composer DAG and task helpers. The heavy processing is executed by external services:
OMOP_PROCESSOR_ENDPOINThandles file conversion, validation, normalization, CDM upgrades, vocabulary work, BigQuery preparation, and related OMOP operations.OMOP_ANALYZER_ENDPOINThandles DQD, Achilles, Atlas table creation, and report generation.
The DAG coordinates site discovery, task ordering, retries, and pipeline logging.
- Google Cloud Composer
- BigQuery datasets for CDM and analytics results
- Per-site GCS buckets with deliveries stored as top-level
YYYY-MM-DDfolders - Reachable processor and analyzer services
- Composer worker permissions to:
- read and write the configured GCS buckets
- invoke the processor and analyzer services
- create, truncate, and load BigQuery tables
The DAG reads site configuration from:
/home/airflow/gcs/dags/dependencies/ehr/config/site_config.yml
Example:
site:
site_name:
display_name: "Site Display Name"
gcs_bucket: "my-site-bucket"
file_delivery_format: ".csv"
project_id: "gcp-project-id"
cdm_bq_dataset: "omop_cdm"
analytics_bq_dataset: "omop_analytics"
omop_version: "5.3"
date_format: "%Y-%m-%d"
datetime_format: "%Y-%m-%d %H:%M:%S"
overwrite_site_vocab_with_standard: true
site_connect_id: 123456789Fields used by the DAG:
display_name: human-readable site name used in logs and reportsgcs_bucket: bucket name only, withoutgs://file_delivery_format: expected source file format, typically.csvor.csv.gzproject_id: GCP project for BigQuery operationscdm_bq_dataset: target OMOP datasetanalytics_bq_dataset: target analytics dataset for DQD, Achilles, PASS, and Atlas outputsomop_version: delivered CDM versiondate_formatanddatetime_format: source formatting hints for normalizationoverwrite_site_vocab_with_standard: whentrue, standard vocabulary is loaded and site vocab tables are skippedsite_connect_id: Connect identifier used for site-level participant export
Defined in dags/dependencies/ehr/constants.py:
OMOP_PROCESSOR_ENDPOINTOMOP_ANALYZER_ENDPOINTOMOP_TARGET_VOCAB_VERSIONOMOP_TARGET_CDM_VERSIONOMOP_VOCAB_GCS_PATHCONNECT_DATASET_ID
The main flow in dags/ehr_pipeline.py is:
check_api_healthid_sites_to_processget_unprocessed_files- Per-site
retrieve_connect_data - Per-file
convert_file,validate_file,normalize_file,cdm_upgrade,filter_participants - Per-site
populate_cdm_source_file vocab_harmonizationtask groupgenerate_derived_tablestask groupload_to_bigquerytask groupcleanup- Per-site
generate_report_csv - Per-site
dqd,achilles,pass_analysis - Per-site
atlas_results_tablesandgenerate_delivery_report mark_delivery_completelog_done
The vocab_harmonization task group runs these steps in order:
harmonize_vocab_source_targetharmonize_vocab_target_remapharmonize_vocab_target_replacementharmonize_vocab_domain_checkharmonize_vocab_omop_etlharmonize_vocab_consolidateharmonize_vocab_discover_tablesflatten_table_configsharmonize_vocab_deduplicate_table
Only tables listed in VOCAB_HARMONIZED_TABLES are processed by these steps.
Derived tables are generated after vocabulary harmonization and before BigQuery loading.
The current derived tables are defined by DERIVED_DATA_TABLES in dags/dependencies/ehr/constants.py.
- Upload
dags/to the Composer DAGs bucket, preserving paths. - Ensure
site_config.ymlis present at the configured path in the DAGs bucket. - Set the required environment variables in Composer.
- Grant Composer access to the configured GCS buckets, BigQuery datasets, and service endpoints.
- Trigger
ehr-pipelinefrom Airflow.
- A site is processed when its latest delivery has no log row or its most recent log row is in an error state.
load_harmonized_tablesskips cleanly when no harmonized clinical tables were produced.load_remainingskips vocabulary tables already handled elsewhere and skipscdm_source, which is loaded duringcleanup.log_doneinspects task states and fails the DAG if any upstream task failed, even if a mapped task failure was otherwise easy to miss.
dags/
ehr_pipeline.py
dependencies/
ehr/
analysis.py
bq.py
constants.py
dag_helpers.py
file_config.py
omop.py
participant_filter.py
processing.py
processing_jobs.py
storage_backend.py
utils.py
validation.py
vocab.py
config/
site_config.yml