From e1e91ad64425c5cbe96be14e732232bf0c458915 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=94=D0=B5=D0=BD=D0=B8=D1=81=20=D0=98=D1=80=D0=B8=D0=BD?= =?UTF-8?q?=D1=8F=D0=BA=D0=BE=D0=B2?= <109888488+karnaksp@users.noreply.github.com> Date: Mon, 15 Jun 2026 06:38:41 +0300 Subject: [PATCH] Add dbt quality checks for Streamify core marts --- .github/workflows/data-quality.yml | 31 +++ README.md | 23 ++- airflow/dags/dbt_test_dag.py | 32 ++- dbt/models/core/dim_artists.sql | 4 +- dbt/models/core/dim_location.sql | 4 +- dbt/models/core/dim_songs.sql | 4 +- dbt/models/core/dim_users.sql | 4 +- dbt/models/core/schema.yml | 191 +++++++++++++++++- dbt/package-lock.yml | 5 + dbt/packages.yml | 2 +- .../assert_dim_users_scd2_no_overlap.sql | 54 +++++ .../assert_fact_streams_no_orphan_keys.sql | 18 ++ docs/data_quality_checks.md | 35 ++++ scripts/validate_dbt_quality.py | 128 ++++++++++++ 14 files changed, 505 insertions(+), 30 deletions(-) create mode 100644 .github/workflows/data-quality.yml create mode 100644 dbt/package-lock.yml create mode 100644 dbt/tests/assert_dim_users_scd2_no_overlap.sql create mode 100644 dbt/tests/assert_fact_streams_no_orphan_keys.sql create mode 100644 docs/data_quality_checks.md create mode 100644 scripts/validate_dbt_quality.py diff --git a/.github/workflows/data-quality.yml b/.github/workflows/data-quality.yml new file mode 100644 index 0000000..bd846fe --- /dev/null +++ b/.github/workflows/data-quality.yml @@ -0,0 +1,31 @@ +name: Data quality + +on: + pull_request: + push: + branches: + - main + +jobs: + validate-dbt-quality-contract: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v6 + + - name: Set up Python + uses: actions/setup-python@v6 + with: + python-version: "3.12" + + - name: Validate dbt quality contract + run: python3 scripts/validate_dbt_quality.py + + - name: Compile Python files + run: python3 -m compileall -q airflow/dags spark_streaming scripts + + - name: Validate Airflow Compose config + run: cd airflow && GCP_PROJECT_ID=dummy GCP_GCS_BUCKET=dummy docker compose config --quiet + + - name: Validate Kafka Compose config + run: cd kafka && docker compose config --quiet diff --git a/README.md b/README.md index 9a5df29..e6139e0 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,27 @@ A data pipeline with Kafka, Spark Streaming, dbt, Docker, Airflow, Terraform, GCP and much more! +## Слой Качества Данных + +Streamify нужен, чтобы построить потоковую аналитику музыкального сервиса: события из Kafka обрабатываются Spark Streaming, складываются в lake/warehouse, а dbt собирает витрины для dashboard по прослушиваниям, пользователям, песням, артистам, локациям и времени. + +В этой итерации добавлен проверяемый data-quality слой для core marts: + +- добавлен dbt data-quality слой для core marts: `not_null`, `unique`, `relationships`, `accepted_values`; +- добавлены singular tests для SCD2 user dimension и orphan dimension keys в `fact_streams`; +- Airflow DAG теперь запускает `dbt build --select core --profiles-dir . --target prod`, а не только `dbt compile`; +- добавлена русская документация [docs/data_quality_checks.md](docs/data_quality_checks.md); +- добавлен CI/static validator `scripts/validate_dbt_quality.py` для проверки dbt quality contract без GCP credentials. + +Локальная проверка: + +```bash +python3 scripts/validate_dbt_quality.py +python3 -m compileall -q airflow/dags spark_streaming scripts +cd airflow && GCP_PROJECT_ID=dummy GCP_GCS_BUCKET=dummy docker compose config --quiet +cd ../kafka && docker compose config --quiet +``` + ## Description ### Objective @@ -75,4 +96,4 @@ A lot can still be done :). - Add more visualizations ### Special Mentions -I'd like to thank the [DataTalks.Club](https://datatalks.club) for offering this Data Engineering course for completely free. All the things I learnt there, enabled me to come up with this project. If you want to upskill on Data Engineering technologies, please check out the [course](https://github.com/DataTalksClub/data-engineering-zoomcamp). :) \ No newline at end of file +I'd like to thank the [DataTalks.Club](https://datatalks.club) for offering this Data Engineering course for completely free. All the things I learnt there, enabled me to come up with this project. If you want to upskill on Data Engineering technologies, please check out the [course](https://github.com/DataTalksClub/data-engineering-zoomcamp). :) diff --git a/airflow/dags/dbt_test_dag.py b/airflow/dags/dbt_test_dag.py index 173b259..c8ed7db 100644 --- a/airflow/dags/dbt_test_dag.py +++ b/airflow/dags/dbt_test_dag.py @@ -1,25 +1,35 @@ -from datetime import datetime +from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash import BashOperator default_args = { - 'owner' : 'airflow' + 'owner': 'airflow' } with DAG( - dag_id = 'dbt_test', - default_args = default_args, - description = 'Test dbt', - schedule_interval="@once", #At the 5th minute of every hour + dag_id='dbt_quality_gate', + default_args=default_args, + description='Run dbt dependencies and core data quality checks', + schedule_interval="@once", start_date=datetime(2022,3,20), - catchup=True, + catchup=False, tags=['streamify', 'dbt'] ) as dag: - dbt_test_task = BashOperator( - task_id = "dbt_test", - bash_command = "cd /dbt && dbt deps && dbt compile --profiles-dir ." + dbt_deps = BashOperator( + task_id="dbt_deps", + bash_command="cd /dbt && dbt deps", + execution_timeout=timedelta(minutes=10), ) - dbt_test_task + dbt_build_core = BashOperator( + task_id="dbt_build_core", + bash_command=( + "cd /dbt && " + "dbt build --select core --profiles-dir . --target prod" + ), + execution_timeout=timedelta(minutes=30), + ) + + dbt_deps >> dbt_build_core diff --git a/dbt/models/core/dim_artists.sql b/dbt/models/core/dim_artists.sql index 6bb4ddf..472861e 100644 --- a/dbt/models/core/dim_artists.sql +++ b/dbt/models/core/dim_artists.sql @@ -1,6 +1,6 @@ {{ config(materialized = 'table') }} -SELECT {{ dbt_utils.surrogate_key(['artistId']) }} AS artistKey, +SELECT {{ dbt_utils.generate_surrogate_key(['artistId']) }} AS artistKey, * FROM ( SELECT @@ -20,4 +20,4 @@ FROM ( 'NA', 'NA' - ) \ No newline at end of file + ) diff --git a/dbt/models/core/dim_location.sql b/dbt/models/core/dim_location.sql index 6a7e63b..216577f 100644 --- a/dbt/models/core/dim_location.sql +++ b/dbt/models/core/dim_location.sql @@ -1,6 +1,6 @@ {{ config(materialized = 'table') }} -SELECT {{ dbt_utils.surrogate_key(['latitude', 'longitude', 'city', 'stateName']) }} as locationKey, +SELECT {{ dbt_utils.generate_surrogate_key(['latitude', 'longitude', 'city', 'stateName']) }} as locationKey, * FROM ( @@ -21,4 +21,4 @@ FROM 'NA', 0.0, 0.0 - ) \ No newline at end of file + ) diff --git a/dbt/models/core/dim_songs.sql b/dbt/models/core/dim_songs.sql index 2701910..26afb9b 100644 --- a/dbt/models/core/dim_songs.sql +++ b/dbt/models/core/dim_songs.sql @@ -1,6 +1,6 @@ {{ config(materialized = 'table') }} -SELECT {{ dbt_utils.surrogate_key(['songId']) }} AS songKey, +SELECT {{ dbt_utils.generate_surrogate_key(['songId']) }} AS songKey, * FROM ( @@ -32,4 +32,4 @@ FROM ( 'NA', 0 ) - ) \ No newline at end of file + ) diff --git a/dbt/models/core/dim_users.sql b/dbt/models/core/dim_users.sql index f4d9c37..d485318 100644 --- a/dbt/models/core/dim_users.sql +++ b/dbt/models/core/dim_users.sql @@ -4,7 +4,7 @@ -- The below query is constructed to accommodate changing levels from free to paid and maintaining the latest state of the user along with -- historical record of the user's level -SELECT {{ dbt_utils.surrogate_key(['userId', 'rowActivationDate', 'level']) }} as userKey, * +SELECT {{ dbt_utils.generate_surrogate_key(['userId', 'rowActivationDate', 'level']) }} as userKey, * FROM ( SELECT CAST(userId AS BIGINT) as userId, firstName, lastName, gender, level, CAST(registration as BIGINT) as registration, minDate as rowActivationDate, @@ -46,4 +46,4 @@ FROM {{ source('staging', 'listen_events') }} WHERE userId = 0 or userId = 1 GROUP BY userId, firstName, lastName, gender, level, registration -) \ No newline at end of file +) diff --git a/dbt/models/core/schema.yml b/dbt/models/core/schema.yml index 01648ca..972e6f2 100644 --- a/dbt/models/core/schema.yml +++ b/dbt/models/core/schema.yml @@ -1,12 +1,185 @@ version: 2 sources: - - name: staging - database: "{{ env_var('GCP_PROJECT_ID') }}" - schema: streamify_stg - tables: - - name: listen_events - - name: page_view_events - - name: auth_events - - name: songs - - name: state_codes \ No newline at end of file + - name: staging + database: "{{ env_var('GCP_PROJECT_ID') }}" + schema: streamify_stg + tables: + - name: listen_events + - name: page_view_events + - name: auth_events + - name: songs + - name: state_codes + +models: + - name: fact_streams + description: Stream listening fact table joined to user, song, artist, location and datetime dimensions. + columns: + - name: userKey + description: Surrogate key for the SCD2 user dimension row active at event time. + tests: + - not_null + - relationships: + to: ref('dim_users') + field: userKey + - name: artistKey + description: Surrogate key for the listened artist. + tests: + - not_null + - relationships: + to: ref('dim_artists') + field: artistKey + - name: songKey + description: Surrogate key for the listened song. + tests: + - not_null + - relationships: + to: ref('dim_songs') + field: songKey + - name: dateKey + description: Hour-level datetime key. + tests: + - not_null + - relationships: + to: ref('dim_datetime') + field: dateKey + - name: locationKey + description: Surrogate key for event location. + tests: + - not_null + - relationships: + to: ref('dim_location') + field: locationKey + - name: ts + description: Original listening event timestamp. + tests: + - not_null + + - name: dim_users + description: User dimension with SCD2 rows for listening subscription level changes. + columns: + - name: userKey + description: Surrogate key for a user-level validity interval. + tests: + - not_null + - unique + - name: userId + description: Source user identifier from listen events. + tests: + - not_null + - name: gender + description: User gender as produced by Eventsim. + tests: + - accepted_values: + values: ['M', 'F'] + - name: level + description: Subscription level tracked as SCD2 attribute. + tests: + - not_null + - accepted_values: + values: ['free', 'paid'] + - name: rowActivationDate + description: First date when this SCD2 row is active. + tests: + - not_null + - name: rowExpirationDate + description: First date when this SCD2 row stops being active. + tests: + - not_null + - name: currentRow + description: Flag for the latest SCD2 row per natural user key. + tests: + - not_null + - accepted_values: + values: [0, 1] + + - name: dim_songs + description: Song dimension from Million Song Dataset seed/source data. + columns: + - name: songKey + description: Surrogate key for a song. + tests: + - not_null + - unique + - name: songId + description: Source song identifier. + tests: + - not_null + - name: artistName + description: Normalized artist name used for fact joins. + tests: + - not_null + - name: title + description: Song title used for fact joins. + tests: + - not_null + + - name: dim_artists + description: Artist dimension from song metadata. + columns: + - name: artistKey + description: Surrogate key for an artist. + tests: + - not_null + - unique + - name: name + description: Normalized artist name. + tests: + - not_null + + - name: dim_location + description: Location dimension built from listen event geography and state codes. + columns: + - name: locationKey + description: Surrogate key for city/state/coordinate combination. + tests: + - not_null + - unique + - name: city + description: Event city, with NA fallback for unknown location. + tests: + - not_null + - name: stateCode + description: Two-letter state code or NA fallback. + tests: + - not_null + - name: stateName + description: Full state name or NA fallback. + tests: + - not_null + + - name: dim_datetime + description: Hourly datetime dimension for dashboard time slicing. + columns: + - name: dateKey + description: UNIX seconds for the hour timestamp. + tests: + - not_null + - unique + - name: date + description: Hour timestamp. + tests: + - not_null + - unique + - name: weekendFlag + description: Weekend flag derived from day of week. + tests: + - not_null + - accepted_values: + values: [true, false] + + - name: wide_streams + description: Dashboard-friendly denormalized view over fact_streams and core dimensions. + columns: + - name: userKey + description: User dimension key inherited from fact_streams. + tests: + - not_null + - name: songKey + description: Song dimension key inherited from fact_streams. + tests: + - not_null + - name: timestamp + description: Listening event timestamp. + tests: + - not_null diff --git a/dbt/package-lock.yml b/dbt/package-lock.yml new file mode 100644 index 0000000..b530d66 --- /dev/null +++ b/dbt/package-lock.yml @@ -0,0 +1,5 @@ +packages: + - name: dbt_utils + package: dbt-labs/dbt_utils + version: 1.3.3 +sha1_hash: 5ffdb7983bbd653b524c5344daf6cb5fd9eaf293 diff --git a/dbt/packages.yml b/dbt/packages.yml index 9e74ff0..1600223 100755 --- a/dbt/packages.yml +++ b/dbt/packages.yml @@ -1,3 +1,3 @@ packages: - package: dbt-labs/dbt_utils - version: 0.8.0 \ No newline at end of file + version: 1.3.3 diff --git a/dbt/tests/assert_dim_users_scd2_no_overlap.sql b/dbt/tests/assert_dim_users_scd2_no_overlap.sql new file mode 100644 index 0000000..3b73413 --- /dev/null +++ b/dbt/tests/assert_dim_users_scd2_no_overlap.sql @@ -0,0 +1,54 @@ +WITH user_windows AS ( + SELECT + userId, + firstName, + lastName, + gender, + rowActivationDate, + rowExpirationDate, + LEAD(rowActivationDate) OVER ( + PARTITION BY userId + ORDER BY rowActivationDate, rowExpirationDate + ) AS next_row_activation_date + FROM {{ ref('dim_users') }} +), + +overlapping_windows AS ( + SELECT * + FROM user_windows + WHERE next_row_activation_date IS NOT NULL + AND rowExpirationDate > next_row_activation_date +), + +invalid_current_rows AS ( + SELECT + userId, + COUNTIF(currentRow = 1) AS current_row_count + FROM {{ ref('dim_users') }} + GROUP BY userId + HAVING current_row_count != 1 +) + +SELECT + 'overlapping_window' AS issue_type, + userId, + firstName, + lastName, + gender, + rowActivationDate, + rowExpirationDate, + next_row_activation_date +FROM overlapping_windows + +UNION ALL + +SELECT + 'invalid_current_row_count' AS issue_type, + userId, + NULL AS firstName, + NULL AS lastName, + NULL AS gender, + NULL AS rowActivationDate, + NULL AS rowExpirationDate, + NULL AS next_row_activation_date +FROM invalid_current_rows diff --git a/dbt/tests/assert_fact_streams_no_orphan_keys.sql b/dbt/tests/assert_fact_streams_no_orphan_keys.sql new file mode 100644 index 0000000..bf3244f --- /dev/null +++ b/dbt/tests/assert_fact_streams_no_orphan_keys.sql @@ -0,0 +1,18 @@ +SELECT + fact_streams.* +FROM {{ ref('fact_streams') }} AS fact_streams +LEFT JOIN {{ ref('dim_users') }} AS dim_users + ON fact_streams.userKey = dim_users.userKey +LEFT JOIN {{ ref('dim_artists') }} AS dim_artists + ON fact_streams.artistKey = dim_artists.artistKey +LEFT JOIN {{ ref('dim_songs') }} AS dim_songs + ON fact_streams.songKey = dim_songs.songKey +LEFT JOIN {{ ref('dim_datetime') }} AS dim_datetime + ON fact_streams.dateKey = dim_datetime.dateKey +LEFT JOIN {{ ref('dim_location') }} AS dim_location + ON fact_streams.locationKey = dim_location.locationKey +WHERE dim_users.userKey IS NULL + OR dim_artists.artistKey IS NULL + OR dim_songs.songKey IS NULL + OR dim_datetime.dateKey IS NULL + OR dim_location.locationKey IS NULL diff --git a/docs/data_quality_checks.md b/docs/data_quality_checks.md new file mode 100644 index 0000000..030dba2 --- /dev/null +++ b/docs/data_quality_checks.md @@ -0,0 +1,35 @@ +# Streamify Data Quality Checks + +Streamify строит потоковую аналитику музыкального сервиса: Kafka принимает события, Spark Streaming пишет их в lake, Airflow запускает batch-слой, а dbt собирает core marts для dashboard. Этот документ фиксирует data-quality слой для dbt core marts и Airflow gate, который проверяет, что факты прослушиваний не теряют связи с измерениями. + +## Что Проверяется + +| Layer | Проверка | Зачем | +| --- | --- | --- | +| `fact_streams` | `not_null` и `relationships` для `userKey`, `artistKey`, `songKey`, `dateKey`, `locationKey` | Факт прослушивания не должен ссылаться на отсутствующие dimension rows. | +| `dim_users` | `unique`/`not_null` для `userKey`, `accepted_values` для `gender`, `level`, `currentRow` | SCD2 dimension должна сохранять валидные статусы пользователя. | +| `dim_songs`, `dim_artists`, `dim_location`, `dim_datetime` | `unique`/`not_null` для surrogate keys и обязательных атрибутов | Dashboard joins должны быть стабильными и воспроизводимыми. | +| `assert_dim_users_scd2_no_overlap.sql` | Нет пересекающихся SCD2 intervals и ровно одна current row на natural user key | История `free`/`paid` статусов не должна давать двойной матч факта. | +| `assert_fact_streams_no_orphan_keys.sql` | Нет orphan dimension keys после joins | Факт не должен терять семантику при построении wide view. | + +## Airflow Flow + +`airflow/dags/dbt_test_dag.py` теперь описывает DAG `dbt_quality_gate`: + +1. `dbt_deps` устанавливает dbt packages. +2. `dbt_build_core` выполняет `dbt build --select core --profiles-dir . --target prod`. + +`dbt build` важнее compile-only проверки: он запускает models и tests одним gate для production target. + +## Локальная Проверка Без GCP Credentials + +Полный `dbt build` требует BigQuery credentials. Для PR/CI без доступа к GCP добавлен static quality validator: + +```bash +python3 scripts/validate_dbt_quality.py +python3 -m compileall -q airflow/dags spark_streaming +cd airflow && docker compose config +cd ../kafka && docker compose config +``` + +Эта проверка не заменяет runtime `dbt build`, но защищает инженерный контракт: модельные tests, singular tests, Airflow `dbt build` DAG и документация должны оставаться синхронизированными. diff --git a/scripts/validate_dbt_quality.py b/scripts/validate_dbt_quality.py new file mode 100644 index 0000000..c3b86d8 --- /dev/null +++ b/scripts/validate_dbt_quality.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import sys +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] + + +def read(path: str) -> str: + file_path = ROOT / path + if not file_path.exists(): + raise AssertionError(f"Missing required file: {path}") + return file_path.read_text(encoding="utf-8") + + +def require_markers(path: str, markers: list[str]) -> None: + text = read(path) + for marker in markers: + if marker not in text: + raise AssertionError(f"{path} must contain {marker!r}") + + +def validate_schema_yml() -> None: + schema = read("dbt/models/core/schema.yml") + for model_name in [ + "fact_streams", + "dim_users", + "dim_songs", + "dim_artists", + "dim_location", + "dim_datetime", + "wide_streams", + ]: + if f"name: {model_name}" not in schema: + raise AssertionError(f"schema.yml must document model {model_name}") + + for marker in [ + "relationships:", + "accepted_values:", + "field: userKey", + "field: artistKey", + "field: songKey", + "field: dateKey", + "field: locationKey", + "values: ['free', 'paid']", + "values: [0, 1]", + "values: [true, false]", + ]: + if marker not in schema: + raise AssertionError(f"schema.yml must contain {marker!r}") + + +def validate_singular_tests() -> None: + require_markers( + "dbt/tests/assert_dim_users_scd2_no_overlap.sql", + [ + "overlapping_windows", + "invalid_current_rows", + "COUNTIF(currentRow = 1)", + "HAVING current_row_count != 1", + ], + ) + require_markers( + "dbt/tests/assert_fact_streams_no_orphan_keys.sql", + [ + "LEFT JOIN {{ ref('dim_users') }}", + "LEFT JOIN {{ ref('dim_artists') }}", + "LEFT JOIN {{ ref('dim_songs') }}", + "LEFT JOIN {{ ref('dim_datetime') }}", + "LEFT JOIN {{ ref('dim_location') }}", + "WHERE dim_users.userKey IS NULL", + ], + ) + + +def validate_airflow_dag() -> None: + require_markers( + "airflow/dags/dbt_test_dag.py", + [ + "dag_id='dbt_quality_gate'", + "task_id=\"dbt_deps\"", + "task_id=\"dbt_build_core\"", + "dbt build --select core --profiles-dir . --target prod", + "dbt_deps >> dbt_build_core", + ], + ) + + +def validate_docs() -> None: + require_markers( + "README.md", + [ + "## Слой Качества Данных", + "Streamify нужен, чтобы построить потоковую аналитику музыкального сервиса", + "docs/data_quality_checks.md", + "scripts/validate_dbt_quality.py", + ], + ) + require_markers( + "docs/data_quality_checks.md", + [ + "потоковую аналитику музыкального сервиса", + "fact_streams", + "assert_dim_users_scd2_no_overlap.sql", + "assert_fact_streams_no_orphan_keys.sql", + "dbt_quality_gate", + "python3 scripts/validate_dbt_quality.py", + ], + ) + + +def main() -> int: + validate_schema_yml() + validate_singular_tests() + validate_airflow_dag() + validate_docs() + print("OK: dbt quality checks, Airflow DAG and docs are aligned.") + return 0 + + +if __name__ == "__main__": + try: + raise SystemExit(main()) + except AssertionError as error: + print(f"ERROR: {error}", file=sys.stderr) + raise SystemExit(1)