Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .github/workflows/data-quality.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: Data quality

on:
pull_request:
push:
branches:
- main

jobs:
validate-dbt-quality-contract:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v6

- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: "3.12"

- name: Validate dbt quality contract
run: python3 scripts/validate_dbt_quality.py

- name: Compile Python files
run: python3 -m compileall -q airflow/dags spark_streaming scripts

- name: Validate Airflow Compose config
run: cd airflow && GCP_PROJECT_ID=dummy GCP_GCS_BUCKET=dummy docker compose config --quiet

- name: Validate Kafka Compose config
run: cd kafka && docker compose config --quiet
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,27 @@

A data pipeline with Kafka, Spark Streaming, dbt, Docker, Airflow, Terraform, GCP and much more!

## Слой Качества Данных

Streamify нужен, чтобы построить потоковую аналитику музыкального сервиса: события из Kafka обрабатываются Spark Streaming, складываются в lake/warehouse, а dbt собирает витрины для dashboard по прослушиваниям, пользователям, песням, артистам, локациям и времени.

В этой итерации добавлен проверяемый data-quality слой для core marts:

- добавлен dbt data-quality слой для core marts: `not_null`, `unique`, `relationships`, `accepted_values`;
- добавлены singular tests для SCD2 user dimension и orphan dimension keys в `fact_streams`;
- Airflow DAG теперь запускает `dbt build --select core --profiles-dir . --target prod`, а не только `dbt compile`;
- добавлена русская документация [docs/data_quality_checks.md](docs/data_quality_checks.md);
- добавлен CI/static validator `scripts/validate_dbt_quality.py` для проверки dbt quality contract без GCP credentials.

Локальная проверка:

```bash
python3 scripts/validate_dbt_quality.py
python3 -m compileall -q airflow/dags spark_streaming scripts
cd airflow && GCP_PROJECT_ID=dummy GCP_GCS_BUCKET=dummy docker compose config --quiet
cd ../kafka && docker compose config --quiet
```

## Description

### Objective
Expand Down Expand Up @@ -75,4 +96,4 @@ A lot can still be done :).
- Add more visualizations

### Special Mentions
I'd like to thank the [DataTalks.Club](https://datatalks.club) for offering this Data Engineering course for completely free. All the things I learnt there, enabled me to come up with this project. If you want to upskill on Data Engineering technologies, please check out the [course](https://github.com/DataTalksClub/data-engineering-zoomcamp). :)
I'd like to thank the [DataTalks.Club](https://datatalks.club) for offering this Data Engineering course for completely free. All the things I learnt there, enabled me to come up with this project. If you want to upskill on Data Engineering technologies, please check out the [course](https://github.com/DataTalksClub/data-engineering-zoomcamp). :)
32 changes: 21 additions & 11 deletions airflow/dags/dbt_test_dag.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
from datetime import datetime
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
'owner' : 'airflow'
'owner': 'airflow'
}

with DAG(
dag_id = 'dbt_test',
default_args = default_args,
description = 'Test dbt',
schedule_interval="@once", #At the 5th minute of every hour
dag_id='dbt_quality_gate',
default_args=default_args,
description='Run dbt dependencies and core data quality checks',
schedule_interval="@once",
start_date=datetime(2022,3,20),
catchup=True,
catchup=False,
tags=['streamify', 'dbt']
) as dag:

dbt_test_task = BashOperator(
task_id = "dbt_test",
bash_command = "cd /dbt && dbt deps && dbt compile --profiles-dir ."
dbt_deps = BashOperator(
task_id="dbt_deps",
bash_command="cd /dbt && dbt deps",
execution_timeout=timedelta(minutes=10),
)

dbt_test_task
dbt_build_core = BashOperator(
task_id="dbt_build_core",
bash_command=(
"cd /dbt && "
"dbt build --select core --profiles-dir . --target prod"
),
execution_timeout=timedelta(minutes=30),
)

dbt_deps >> dbt_build_core
4 changes: 2 additions & 2 deletions dbt/models/core/dim_artists.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{{ config(materialized = 'table') }}

SELECT {{ dbt_utils.surrogate_key(['artistId']) }} AS artistKey,
SELECT {{ dbt_utils.generate_surrogate_key(['artistId']) }} AS artistKey,
*
FROM (
SELECT
Expand All @@ -20,4 +20,4 @@ FROM (
'NA',
'NA'

)
)
4 changes: 2 additions & 2 deletions dbt/models/core/dim_location.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{{ config(materialized = 'table') }}

SELECT {{ dbt_utils.surrogate_key(['latitude', 'longitude', 'city', 'stateName']) }} as locationKey,
SELECT {{ dbt_utils.generate_surrogate_key(['latitude', 'longitude', 'city', 'stateName']) }} as locationKey,
*
FROM
(
Expand All @@ -21,4 +21,4 @@ FROM
'NA',
0.0,
0.0
)
)
4 changes: 2 additions & 2 deletions dbt/models/core/dim_songs.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{{ config(materialized = 'table') }}

SELECT {{ dbt_utils.surrogate_key(['songId']) }} AS songKey,
SELECT {{ dbt_utils.generate_surrogate_key(['songId']) }} AS songKey,
*
FROM (

Expand Down Expand Up @@ -32,4 +32,4 @@ FROM (
'NA',
0
)
)
)
4 changes: 2 additions & 2 deletions dbt/models/core/dim_users.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
-- The below query is constructed to accommodate changing levels from free to paid and maintaining the latest state of the user along with
-- historical record of the user's level

SELECT {{ dbt_utils.surrogate_key(['userId', 'rowActivationDate', 'level']) }} as userKey, *
SELECT {{ dbt_utils.generate_surrogate_key(['userId', 'rowActivationDate', 'level']) }} as userKey, *
FROM
(
SELECT CAST(userId AS BIGINT) as userId, firstName, lastName, gender, level, CAST(registration as BIGINT) as registration, minDate as rowActivationDate,
Expand Down Expand Up @@ -46,4 +46,4 @@ FROM {{ source('staging', 'listen_events') }}
WHERE userId = 0 or userId = 1
GROUP BY userId, firstName, lastName, gender, level, registration

)
)
191 changes: 182 additions & 9 deletions dbt/models/core/schema.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,185 @@
version: 2

sources:
- name: staging
database: "{{ env_var('GCP_PROJECT_ID') }}"
schema: streamify_stg
tables:
- name: listen_events
- name: page_view_events
- name: auth_events
- name: songs
- name: state_codes
- name: staging
database: "{{ env_var('GCP_PROJECT_ID') }}"
schema: streamify_stg
tables:
- name: listen_events
- name: page_view_events
- name: auth_events
- name: songs
- name: state_codes

models:
- name: fact_streams
description: Stream listening fact table joined to user, song, artist, location and datetime dimensions.
columns:
- name: userKey
description: Surrogate key for the SCD2 user dimension row active at event time.
tests:
- not_null
- relationships:
to: ref('dim_users')
field: userKey
- name: artistKey
description: Surrogate key for the listened artist.
tests:
- not_null
- relationships:
to: ref('dim_artists')
field: artistKey
- name: songKey
description: Surrogate key for the listened song.
tests:
- not_null
- relationships:
to: ref('dim_songs')
field: songKey
- name: dateKey
description: Hour-level datetime key.
tests:
- not_null
- relationships:
to: ref('dim_datetime')
field: dateKey
- name: locationKey
description: Surrogate key for event location.
tests:
- not_null
- relationships:
to: ref('dim_location')
field: locationKey
- name: ts
description: Original listening event timestamp.
tests:
- not_null

- name: dim_users
description: User dimension with SCD2 rows for listening subscription level changes.
columns:
- name: userKey
description: Surrogate key for a user-level validity interval.
tests:
- not_null
- unique
- name: userId
description: Source user identifier from listen events.
tests:
- not_null
- name: gender
description: User gender as produced by Eventsim.
tests:
- accepted_values:
values: ['M', 'F']
- name: level
description: Subscription level tracked as SCD2 attribute.
tests:
- not_null
- accepted_values:
values: ['free', 'paid']
- name: rowActivationDate
description: First date when this SCD2 row is active.
tests:
- not_null
- name: rowExpirationDate
description: First date when this SCD2 row stops being active.
tests:
- not_null
- name: currentRow
description: Flag for the latest SCD2 row per natural user key.
tests:
- not_null
- accepted_values:
values: [0, 1]

- name: dim_songs
description: Song dimension from Million Song Dataset seed/source data.
columns:
- name: songKey
description: Surrogate key for a song.
tests:
- not_null
- unique
- name: songId
description: Source song identifier.
tests:
- not_null
- name: artistName
description: Normalized artist name used for fact joins.
tests:
- not_null
- name: title
description: Song title used for fact joins.
tests:
- not_null

- name: dim_artists
description: Artist dimension from song metadata.
columns:
- name: artistKey
description: Surrogate key for an artist.
tests:
- not_null
- unique
- name: name
description: Normalized artist name.
tests:
- not_null

- name: dim_location
description: Location dimension built from listen event geography and state codes.
columns:
- name: locationKey
description: Surrogate key for city/state/coordinate combination.
tests:
- not_null
- unique
- name: city
description: Event city, with NA fallback for unknown location.
tests:
- not_null
- name: stateCode
description: Two-letter state code or NA fallback.
tests:
- not_null
- name: stateName
description: Full state name or NA fallback.
tests:
- not_null

- name: dim_datetime
description: Hourly datetime dimension for dashboard time slicing.
columns:
- name: dateKey
description: UNIX seconds for the hour timestamp.
tests:
- not_null
- unique
- name: date
description: Hour timestamp.
tests:
- not_null
- unique
- name: weekendFlag
description: Weekend flag derived from day of week.
tests:
- not_null
- accepted_values:
values: [true, false]

- name: wide_streams
description: Dashboard-friendly denormalized view over fact_streams and core dimensions.
columns:
- name: userKey
description: User dimension key inherited from fact_streams.
tests:
- not_null
- name: songKey
description: Song dimension key inherited from fact_streams.
tests:
- not_null
- name: timestamp
description: Listening event timestamp.
tests:
- not_null
5 changes: 5 additions & 0 deletions dbt/package-lock.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
packages:
- name: dbt_utils
package: dbt-labs/dbt_utils
version: 1.3.3
sha1_hash: 5ffdb7983bbd653b524c5344daf6cb5fd9eaf293
2 changes: 1 addition & 1 deletion dbt/packages.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
packages:
- package: dbt-labs/dbt_utils
version: 0.8.0
version: 1.3.3
Loading