Skip to content

Rahul06x1/omnipipe

Repository files navigation

OmniPipe

An end-to-end, governed data platform on Google Cloud

CI License: MIT

OmniPipe is a miniature but complete data platform for a fictional city-mobility & weather analytics product. It replicates an operational database to BigQuery via Datastream CDC, enriches it with a public weather API, orchestrates ingest→transform→validate→activate with Cloud Workflows, enforces PII governance, and reverse-syncs a curated KPI table to an external destination — all deployed across dev/prod with reusable Terraform modules and reusable CI. The capstone of a four-project GCP data-engineering portfolio.

Standalone learning project. Public weather API + a self-seeded mock OLTP database; no proprietary data, no secrets in the repo.


Architecture

                  ┌──────────────────── Cloud Workflows (daily) ────────────────────┐
Cloud Scheduler ─►│ init → load_weather(CF) → dbt(job) → validate(DVT job)          │
                  │      → reverse_sync(CF) → finish (run summary)                   │
                  └──────────────────────────────────────────────────────────────────┘
Cloud SQL (Postgres, seeded) ──Datastream CDC──► BigQuery raw.customers / raw.trips
Open-Meteo API ──► Weather Cloud Function ─────► BigQuery raw.weather
dbt (Cloud Run): staging → prod (PII: SHA-256 + policy tags) → analysis.kpis
Validation (DVT Cloud Run job): config-driven checks → ops.validation_results (fails on error)
analysis.kpis ──► Reverse-sync Cloud Function ──► external webhook (watermarked, idempotent)
Governance: Data Catalog taxonomy + policy tags · Observability: per-service alerts · Audit: ops.*

What it demonstrates

Capability How
Change Data Capture Datastream: Cloud SQL Postgres → BigQuery raw
Multi-source ingestion CDC + scheduled weather API Cloud Function
Orchestration Cloud Workflows (init→load→transform→validate→activate) + Scheduler
ELT + layering dbt staging → prod → analysis, partitioned/clustered facts
PII governance SHA-256 pseudonymization + Data Catalog policy tags on sensitive columns
Data validation config-driven DVT job; failing checks halt the workflow
Reverse-sync / activation curated KPIs → external webhook, watermarked & idempotent
Reusable IaC Terraform modules (platform, pii_policy, monitoring, workload_identity) across envs/dev + envs/prod
Reusable CI workflow_call templates (_python-ci, _terraform-ci) consumed per component
Observability per-service log-based error metric + alert; audit tables in ops

Repository layout

omnipipe/
├── ingestion/weather/        # A1 Cloud Function: Open-Meteo -> raw.weather
├── reverse_sync/kpi_export/  # A2 Cloud Function: analysis.kpis -> webhook (watermarked)
├── validation/dvt_job/       # Cloud Run job: config-driven data checks -> ops.validation_results
├── dbt/                      # staging -> prod (PII) -> analysis, macros, runner.sh
├── orchestration/            # Cloud Workflows definition (workflow.yaml.tftpl)
├── seed/postgres/            # mock OLTP schema + seed (the CDC source)
├── infra/terraform/
│   ├── modules/{platform,pii_policy,monitoring,workload_identity}/
│   └── envs/{dev,prod}/      # thin roots composing the platform module
└── .github/workflows/        # reusable templates + ci/deploy callers

Run the tests locally

for svc in ingestion/weather reverse_sync/kpi_export validation/dvt_job; do
  (cd $svc && pip install -e . pytest ruff && ruff check . && pytest -q)
done

cd dbt && dbt deps && GCP_PROJECT=dummy dbt parse --profiles-dir . --target dev

cd infra/terraform/envs/dev && terraform init -backend=false && terraform validate

Deploy (per environment)

  1. Bootstrap once: a deployer SA + the workload_identity module (WIF pool bound to this repo). Create the Artifact Registry repo omnipipe, a function-source bucket, and a Terraform-state bucket per env.
  2. Set repo Actions variables: GCP_PROJECT_ID, WIF_PROVIDER, DEPLOYER_SA, FUNCTION_SOURCE_BUCKET, TFSTATE_BUCKET.
  3. Push to maindeploy.yaml builds the dvt/dbt images, uploads the function zips, and runs terraform plan/apply for envs/dev.
  4. Seed the OLTP source: psql "$POSTGRES_URL" -f seed/postgres/schema.sql && \ psql "$POSTGRES_URL" -f seed/postgres/seed.sql (see seed/README.md).

Design notes

  • CDC over polling — Datastream streams row changes continuously; dbt dedups to the latest version per key.
  • Governance by default — PII never leaves staging in the clear: prod.dim_customer stores SHA-256 hashes, and the name column carries a Data Catalog policy tag minted by the pii_policy module.
  • Validate before activate — the DVT job runs after dbt and fails the workflow on a bad check, so only validated data is reverse-synced.
  • Idempotent activation — reverse-sync tracks a per-destination watermark in ops.reverse_sync_state; re-runs push only new rows.
  • DRY infra & CI — one platform module is instantiated per environment; CI job bodies live in workflow_call templates, not copied per component (the gap seen in many real monorepos).
  • Cost-awaredb-f1-micro Cloud SQL, scale-to-zero functions, partitioned/clustered facts; terraform destroy per env when idle.

Key decisions & what I learned

  • CDC over polling for the OLTP source. Datastream streams row changes continuously (including deletes) and dbt dedups to the latest version per key. Lesson: CDC removes whole classes of "did I miss an update?" bugs that polling invites.
  • Validate before activate. The DVT job runs after dbt and fails the workflow on a bad check, so only validated data is ever reverse-synced. Lesson: put data-quality gates inside the orchestration, not beside it.
  • PII governance by default. prod.dim_customer stores SHA-256 pseudonyms and the name column carries a Data Catalog policy tag minted by the pii_policy module — raw PII never leaves staging. Learned: column-level access control + pseudonymization in BigQuery.
  • DRY infra and CI. One platform Terraform module is instantiated per environment, and CI job bodies live in workflow_call templates rather than copied per component. This is the exact gap I observed in a real monorepo and the biggest architectural lesson here.
  • Idempotent activation. Reverse-sync tracks a per-destination watermark in ops.reverse_sync_state, so re-runs push only new rows. Lesson: reverse-ETL needs the same idempotency discipline as ingestion.

Skills demonstrated

Change Data Capture (Datastream) · Cloud SQL · workflow orchestration (Cloud Workflows) · multi-source ELT with dbt · data governance (policy tags, pseudonymization) · data validation gating · reverse-sync / reverse-ETL · multi-environment Terraform with reusable modules · reusable CI (workflow_call) · Workload Identity Federation · observability & audit logging · end-to-end platform ownership.


Teardown

cd infra/terraform/envs/dev && terraform destroy   # repeat for envs/prod

About

End-to-end governed GCP data platform — Datastream CDC, Cloud Workflows, dbt with PII governance, reverse-sync, multi-env Terraform & reusable CI

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages