OmniPipe is a miniature but complete data platform for a fictional city-mobility & weather analytics product. It replicates an operational database to BigQuery via Datastream CDC, enriches it with a public weather API, orchestrates ingest→transform→validate→activate with Cloud Workflows, enforces PII governance, and reverse-syncs a curated KPI table to an external destination — all deployed across dev/prod with reusable Terraform modules and reusable CI. The capstone of a four-project GCP data-engineering portfolio.
Standalone learning project. Public weather API + a self-seeded mock OLTP database; no proprietary data, no secrets in the repo.
┌──────────────────── Cloud Workflows (daily) ────────────────────┐
Cloud Scheduler ─►│ init → load_weather(CF) → dbt(job) → validate(DVT job) │
│ → reverse_sync(CF) → finish (run summary) │
└──────────────────────────────────────────────────────────────────┘
Cloud SQL (Postgres, seeded) ──Datastream CDC──► BigQuery raw.customers / raw.trips
Open-Meteo API ──► Weather Cloud Function ─────► BigQuery raw.weather
dbt (Cloud Run): staging → prod (PII: SHA-256 + policy tags) → analysis.kpis
Validation (DVT Cloud Run job): config-driven checks → ops.validation_results (fails on error)
analysis.kpis ──► Reverse-sync Cloud Function ──► external webhook (watermarked, idempotent)
Governance: Data Catalog taxonomy + policy tags · Observability: per-service alerts · Audit: ops.*
| Capability | How |
|---|---|
| Change Data Capture | Datastream: Cloud SQL Postgres → BigQuery raw |
| Multi-source ingestion | CDC + scheduled weather API Cloud Function |
| Orchestration | Cloud Workflows (init→load→transform→validate→activate) + Scheduler |
| ELT + layering | dbt staging → prod → analysis, partitioned/clustered facts |
| PII governance | SHA-256 pseudonymization + Data Catalog policy tags on sensitive columns |
| Data validation | config-driven DVT job; failing checks halt the workflow |
| Reverse-sync / activation | curated KPIs → external webhook, watermarked & idempotent |
| Reusable IaC | Terraform modules (platform, pii_policy, monitoring, workload_identity) across envs/dev + envs/prod |
| Reusable CI | workflow_call templates (_python-ci, _terraform-ci) consumed per component |
| Observability | per-service log-based error metric + alert; audit tables in ops |
omnipipe/
├── ingestion/weather/ # A1 Cloud Function: Open-Meteo -> raw.weather
├── reverse_sync/kpi_export/ # A2 Cloud Function: analysis.kpis -> webhook (watermarked)
├── validation/dvt_job/ # Cloud Run job: config-driven data checks -> ops.validation_results
├── dbt/ # staging -> prod (PII) -> analysis, macros, runner.sh
├── orchestration/ # Cloud Workflows definition (workflow.yaml.tftpl)
├── seed/postgres/ # mock OLTP schema + seed (the CDC source)
├── infra/terraform/
│ ├── modules/{platform,pii_policy,monitoring,workload_identity}/
│ └── envs/{dev,prod}/ # thin roots composing the platform module
└── .github/workflows/ # reusable templates + ci/deploy callers
for svc in ingestion/weather reverse_sync/kpi_export validation/dvt_job; do
(cd $svc && pip install -e . pytest ruff && ruff check . && pytest -q)
done
cd dbt && dbt deps && GCP_PROJECT=dummy dbt parse --profiles-dir . --target dev
cd infra/terraform/envs/dev && terraform init -backend=false && terraform validate- Bootstrap once: a deployer SA + the
workload_identitymodule (WIF pool bound to this repo). Create the Artifact Registry repoomnipipe, a function-source bucket, and a Terraform-state bucket per env. - Set repo Actions variables:
GCP_PROJECT_ID,WIF_PROVIDER,DEPLOYER_SA,FUNCTION_SOURCE_BUCKET,TFSTATE_BUCKET. - Push to
main—deploy.yamlbuilds thedvt/dbtimages, uploads the function zips, and runsterraform plan/applyforenvs/dev. - Seed the OLTP source:
psql "$POSTGRES_URL" -f seed/postgres/schema.sql && \ psql "$POSTGRES_URL" -f seed/postgres/seed.sql(seeseed/README.md).
- CDC over polling — Datastream streams row changes continuously; dbt dedups to the latest version per key.
- Governance by default — PII never leaves
stagingin the clear:prod.dim_customerstores SHA-256 hashes, and the name column carries a Data Catalog policy tag minted by thepii_policymodule. - Validate before activate — the DVT job runs after dbt and fails the workflow on a bad check, so only validated data is reverse-synced.
- Idempotent activation — reverse-sync tracks a per-destination watermark in
ops.reverse_sync_state; re-runs push only new rows. - DRY infra & CI — one
platformmodule is instantiated per environment; CI job bodies live inworkflow_calltemplates, not copied per component (the gap seen in many real monorepos). - Cost-aware —
db-f1-microCloud SQL, scale-to-zero functions, partitioned/clustered facts;terraform destroyper env when idle.
- CDC over polling for the OLTP source. Datastream streams row changes continuously (including deletes) and dbt dedups to the latest version per key. Lesson: CDC removes whole classes of "did I miss an update?" bugs that polling invites.
- Validate before activate. The DVT job runs after dbt and fails the workflow on a bad check, so only validated data is ever reverse-synced. Lesson: put data-quality gates inside the orchestration, not beside it.
- PII governance by default.
prod.dim_customerstores SHA-256 pseudonyms and the name column carries a Data Catalog policy tag minted by thepii_policymodule — raw PII never leaves staging. Learned: column-level access control + pseudonymization in BigQuery. - DRY infra and CI. One
platformTerraform module is instantiated per environment, and CI job bodies live inworkflow_calltemplates rather than copied per component. This is the exact gap I observed in a real monorepo and the biggest architectural lesson here. - Idempotent activation. Reverse-sync tracks a per-destination watermark in
ops.reverse_sync_state, so re-runs push only new rows. Lesson: reverse-ETL needs the same idempotency discipline as ingestion.
Change Data Capture (Datastream) · Cloud SQL · workflow orchestration (Cloud Workflows) ·
multi-source ELT with dbt · data governance (policy tags, pseudonymization) · data
validation gating · reverse-sync / reverse-ETL · multi-environment Terraform with reusable
modules · reusable CI (workflow_call) · Workload Identity Federation · observability &
audit logging · end-to-end platform ownership.
cd infra/terraform/envs/dev && terraform destroy # repeat for envs/prod