Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ A data pipeline with Kafka, Spark Streaming, dbt, Docker, Airflow, Terraform, GC

## Слой Качества Данных

Streamify нужен, чтобы построить потоковую аналитику музыкального сервиса: события из Kafka обрабатываются Spark Streaming, складываются в lake/warehouse, а dbt собирает витрины для dashboard по прослушиваниям, пользователям, песням, артистам, локациям и времени.
Streamify нужен, чтобы построить потоковую аналитику музыкального сервиса: события из Kafka обрабатываются Spark Streaming, складываются в lake/warehouse, а dbt собирает таблицы для dashboard по прослушиваниям, пользователям, песням, артистам, локациям и времени.

В этой итерации добавлен проверяемый data-quality слой для core marts:

Expand Down Expand Up @@ -100,9 +100,7 @@ A lot can still be done :).
- Confluent Cloud for Kafka
- Create your own VPC network
- Build dimensions and facts incrementally instead of full refresh
- Write data quality tests
- Create dimensional models for additional business processes
- Include CI/CD
- Add more visualizations

### Special Mentions
Expand Down
69 changes: 40 additions & 29 deletions setup/airflow.md
Original file line number Diff line number Diff line change
@@ -1,99 +1,110 @@
## Setup Airflow VM
## Настройка Airflow VM

![airflow](../images/airflow.jpg)

We will setup airflow on docker in a dedicated compute instance. dbt is setup inside airflow.
Airflow запускается в Docker на выделенной compute instance. dbt находится внутри Airflow runtime и запускается DAG-ом.

- Establish SSH connection
- Подключиться по SSH:

```bash
ssh streamify-airflow
```

- Clone git repo
- Склонировать repository:

```bash
git clone https://github.com/ankurchavda/streamify.git && \
cd streamify
```
- Install anaconda, docker & docker-compose.

- Установить anaconda, docker и docker-compose:

```bash
bash ~/streamify/scripts/vm_setup.sh && \
exec newgrp docker
```
- Move the service account json file from local to the VM machine in `~/.google/credentials/` directory. Make sure it is named as `google_credentials.json` else the dags will fail!

- You can use [sftp](https://youtu.be/ae-CV2KfoN0?t=2442) to transfer the file.
- Перенести service account json file с локальной машины на VM в directory `~/.google/credentials/`.

Файл должен называться `google_credentials.json`, иначе DAGs не смогут использовать credentials.

- Set the evironment variables (same as Terraform values)-
- Для передачи файла можно использовать [sftp](https://youtu.be/ae-CV2KfoN0?t=2442).

- GCP Project ID
- Установить environment variables, совпадающие со значениями Terraform:

- Cloud Storage Bucket Name
- GCP Project ID;
- Cloud Storage Bucket Name.

```bash
export GCP_PROJECT_ID=project-id
export GCP_GCS_BUCKET=bucket-name
```

**Note**: You will have to setup these env vars every time you create a new shell session.
**Note:** эти env vars нужно задавать в каждой новой shell session.

- Start Airflow. (This shall take a few good minutes, grab a coffee!)
- Запустить Airflow. Это может занять несколько минут:

```bash
bash ~/streamify/scripts/airflow_startup.sh && cd ~/streamify/airflow
```

- Airflow should be available on port `8080` a couple of minutes after the above setup is complete. Login with default username & password as **airflow**.
- Через пару минут Airflow должен быть доступен на port `8080`. Default username и password: **airflow**.

- Airflow will be running in detached mode. To see the logs from docker run the below command
- Airflow работает в detached mode. Чтобы смотреть Docker logs:

```bash
docker-compose --follow
docker-compose logs --follow
```

- To stop airflow
- Остановить Airflow:

```bash
docker-compose down
```

### DAGs

The setup has two dags
В setup есть два DAGs:

- `load_songs_dag`
- Trigger first and only once to load a onetime song file into BigQuery
- Запустить первым и только один раз, чтобы загрузить one-time song file в BigQuery.

![songs_dag](../images/songs_dag.png)

- `streamify_dag`
- Trigger after `load_songs_dag` to make sure the songs table table is available for the transformations
- This dag will run hourly at the 5th minute and perform transformations to create the dimensions and fact.
- Запускать после `load_songs_dag`, чтобы songs table была доступна для transformations.
- DAG запускается каждый час на пятой минуте и создает dimensions и fact.

![streamify_dag](../images/streamify_dag.png)

- DAG Flow -
- We first create an external table for the data that was received in the past hour.
- We then create an empty table to which our hourly data will be appended. Usually, this will only ever run in the first run.
- Then we insert or append the hourly data, into the table.
- And then, delete the external table.
- Finally, run the dbt transformation, to create our dimensions and facts.
DAG flow:

- создать external table для данных, полученных за последний час;
- создать empty table, куда будет append-иться hourly data; обычно это требуется только на первом run;
- insert/append hourly data в table;
- удалить external table;
- запустить dbt transformation для создания dimensions и facts.

### dbt

The transformations happen using dbt which is triggered by Airflow. The dbt lineage should look something like this -
Transformations выполняются через dbt, который запускается Airflow. dbt lineage должен выглядеть примерно так:

![img](../images/dbt.png)

Dimensions:

- `dim_artists`
- `dim_songs`
- `dim_datetime`
- `dim_location`
- `dim_users`

Facts:

- `fact_streams`
- Partitioning:
- Data is partitioned on the timestamp column by hour to provide faster data updates for a dashboard that shows data for the last few hours.
- Data partitioned by timestamp column by hour, чтобы dashboard быстрее обновлял данные за последние часы.

Итоговая view для dashboarding:

Finally, we create `wide_stream` view to aid dashboarding.
- `wide_stream`
34 changes: 20 additions & 14 deletions setup/debug.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
## Debug Guide
## Debug guide

### General Guidelines
### General guidelines

- Always make sure the ENV variables are set
- Start the processes in this order - Kafka -> Eventsim -> Spark Streaming -> Airflow
- Monitor the CPU utilization for your VMs to see if something's wrong
- Всегда проверяйте, что ENV variables установлены.
- Запускайте процессы в порядке: Kafka -> Eventsim -> Spark Streaming -> Airflow.
- Следите за CPU utilization на VM, если pipeline ведет себя нестабильно.

### Kafka
- Sometimes the `broker` & `schema-registry` containers die during startup, so the control center might not be available over 9021. You should just stop all the containers with `docker-compose down` or `ctrl+C` and then rerun `docker-compose up`.
- Did not set the `KAFKA_ADDRESS` env var. Kafka will then write to localhost, which will not allow Spark to read messages.

- Иногда containers `broker` и `schema-registry` падают при startup, поэтому Control Center может быть недоступен на `9021`. Остановите containers через `docker-compose down` или `ctrl+C`, затем повторите `docker-compose up`.
- Если `KAFKA_ADDRESS` не установлен, Kafka будет писать в localhost, и Spark не сможет читать messages.

### Eventsim
- If you start with a high number of users - 2-3 Million+, then eventsim sometimes might not startup and get stuck at generating events. Lower the number of users. Start two parallel processes with users divided.

- Если запускать Eventsim с большим числом users, например 2-3 million+, он может зависнуть на generating events. Уменьшите number of users или запустите два parallel processes с разделенными users.

### Spark

- > Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

Make sure that the `KAFKA_ADDRESS` env variable is set with the external IP Address of the Kafka VM. If it's set and things still don't seem to work, restart the cluster :/.
Ошибка:

> Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

Проверьте, что `KAFKA_ADDRESS` установлен в External IP Address Kafka VM. Если переменная установлена, но чтение все равно не работает, перезапустите cluster.

### Airflow

- Permission denied to dbt for writing logs
- The `airflow_startup.sh` handles changing permission for the dbt folder, so you should be good. In case you happen to delete and recreate the folder, or not run the `airflow_startup.sh` script in the first place, then change the dbt folder permissions manually -
- Permission denied для dbt logs.
- `airflow_startup.sh` меняет permissions для dbt folder. Если folder был удален/создан заново или `airflow_startup.sh` не запускался, измените permissions вручную:

```bash
sudo chmod -R 777 dbt/
```
```
64 changes: 35 additions & 29 deletions setup/gcp.md
Original file line number Diff line number Diff line change
@@ -1,40 +1,46 @@
## GCP

### Initial Setup

Checkout this [video](https://www.youtube.com/watch?v=Hajwnmj0xfQ&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb&index=11&t=3s)

1. Create an account with your Google email ID
2. Setup your first [project](https://console.cloud.google.com/) if you haven't already
* eg. "Streamify", and note down the "Project ID" (we'll use this later when deploying infra with TF)
3. Setup [service account & authentication](https://cloud.google.com/docs/authentication/getting-started) for this project
* Grant `Viewer` role to begin with.
* Download service-account-keys (`.json`) for auth. (Please do not share this key file publicly. Keep it secure!)
* Rename the `.json` key file to `google_credentials.json`
4. Download [SDK](https://cloud.google.com/sdk/docs/quickstart) for local setup
5. Set environment variable to point to your downloaded GCP keys:
### Initial setup

Для общего контекста можно посмотреть [видео](https://www.youtube.com/watch?v=Hajwnmj0xfQ&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb&index=11&t=3s).

1. Создайте Google Cloud account через Google email ID.
2. Создайте первый [project](https://console.cloud.google.com/), если он еще не создан.
- Например: `Streamify`.
- Сохраните `Project ID`: он понадобится при деплое infrastructure через Terraform.
3. Настройте [service account & authentication](https://cloud.google.com/docs/authentication/getting-started) для проекта.
- Для начала выдайте role `Viewer`.
- Скачайте service-account keys (`.json`) для auth.
- Не публикуйте key file и не коммитьте его в git.
- Переименуйте `.json` key file в `google_credentials.json`.
4. Установите [Google Cloud SDK](https://cloud.google.com/sdk/docs/quickstart) локально.
5. Укажите environment variable с path до downloaded GCP keys:

```shell
export GOOGLE_APPLICATION_CREDENTIALS="<path/to/your/google_credentials.json>"

# Refresh token/session, and verify authentication
gcloud auth application-default login
```

### Setup for Access

1. [IAM Roles](https://cloud.google.com/storage/docs/access-control/iam-roles) for Service account:
* Go to the *IAM* section of *IAM & Admin* https://console.cloud.google.com/iam-admin/iam
* Click the *Edit principal* icon for your service account.
* Add these roles in addition to *Viewer* : **Storage Admin** + **Storage Object Admin** + **BigQuery Admin**

2. Enable these APIs for your project:
* https://console.cloud.google.com/apis/library/iam.googleapis.com
* https://console.cloud.google.com/apis/library/iamcredentials.googleapis.com
* **Note:** You might have to enable a few APIs here and there like DataProc etc.

3. Please ensure `GOOGLE_APPLICATION_CREDENTIALS` environment variable is set.

### Setup for access

1. IAM roles для service account:
- Откройте раздел *IAM* в *IAM & Admin*: https://console.cloud.google.com/iam-admin/iam.
- Нажмите *Edit principal* для service account.
- Добавьте к `Viewer` roles: **Storage Admin**, **Storage Object Admin**, **BigQuery Admin**.

2. Включите APIs для проекта:
- https://console.cloud.google.com/apis/library/iam.googleapis.com
- https://console.cloud.google.com/apis/library/iamcredentials.googleapis.com
- Note: по ходу запуска могут потребоваться дополнительные APIs, например Dataproc.

3. Проверьте, что `GOOGLE_APPLICATION_CREDENTIALS` установлен:

```shell
export GOOGLE_APPLICATION_CREDENTIALS="<path/to/your/service-account-authkeys>.json"
```

#### [Installation Reference - DataTalks Club](https://github.com/DataTalksClub/data-engineering-zoomcamp/blob/main/week_1_basics_n_setup/1_terraform_gcp/2_gcp_overview.md#initial-setup)
#### Installation reference

- [DataTalks Club GCP overview](https://github.com/DataTalksClub/data-engineering-zoomcamp/blob/main/week_1_basics_n_setup/1_terraform_gcp/2_gcp_overview.md#initial-setup)
60 changes: 31 additions & 29 deletions setup/kafka.md
Original file line number Diff line number Diff line change
@@ -1,80 +1,82 @@
## Setup Kafka VM
## Настройка Kafka VM

![kafka](../images/kafka.jpg)

We will setup Kafka and eventsim in two separate docker processes in a dedicated compute instance. Eventsim will communicate with port `9092` of the `broker` container of Kafka to send events.
Kafka и Eventsim запускаются как два отдельных Docker process на выделенной compute instance. Eventsim отправляет события в `broker` container Kafka на port `9092`.

- Establish SSH connection
- Подключиться по SSH:

```bash
ssh streamify-kafka
```

- Clone git repo and cd into Kafka folder
- Склонировать repository и перейти в Kafka folder:

```bash
git clone https://github.com/ankurchavda/streamify.git && \
cd streamify/kafka
```

- Install anaconda, docker & docker-compose.
- Установить anaconda, docker и docker-compose:

```bash
bash ~/streamify/scripts/vm_setup.sh && \
exec newgrp docker
```

- Set the evironment variables -
- Установить environment variable:

- External IP of the Kafka VM
- External IP Kafka VM:

```bash
export KAFKA_ADDRESS=IP.ADD.RE.SS
```

**Note**: You will have to setup these env vars every time you create a new shell session. Or if you stop/start your VM
**Note:** эти env vars нужно задавать в каждой новой shell session или после stop/start VM.

- Start Kafka
- Запустить Kafka:

```bash
cd ~/streamify/kafka && \
docker-compose build && \
docker-compose up
docker-compose up
```

**Note**: Sometimes the `broker` & `schema-registry` containers die during startup. You should just stop all the containers with `docker-compose down` and then rerun `docker-compose up`.
**Note:** иногда containers `broker` и `schema-registry` падают при startup. Остановите все containers через `docker-compose down`, затем повторите `docker-compose up`.

- The Kafka Control Center should be available on port `9021`. Open and check if everything is working fine.
- Kafka Control Center должен быть доступен на port `9021`. Откройте UI и проверьте, что сервисы работают.

- Open another terminal session for the Kafka VM and start sending messages to your Kafka broker with Eventsim
- Откройте еще одну terminal session для Kafka VM и запустите Eventsim:

```bash
bash ~/streamify/scripts/eventsim_startup.sh
```

This will start creating events for 1 Million users spread out from the current time to the next 24 hours.
The container will run in detached mode. Follow the logs to see the progress.
Скрипт начнет создавать events для 1 million users на интервале от текущего времени до следующих 24 часов. Container работает в detached mode.

- To follow the logs
- Смотреть logs:

```bash
docker logs --follow million_events
```

- The messages should start flowing-in in a few minutes.

- You should see four topics -
- Через несколько минут messages должны начать поступать в Kafka.

- Ожидаемые topics:

- `listen_events`
- `page_view_events`
- `auth_events`
- `status_change_events`

- listen_events
- page_view_events
- auth_events
- status_change_events
![topics](../images/topics.png)

- **Note:** If you happen to re-rerun the evenstim container and face the following error -

>docker: Error response from daemon: Conflict. The container name "/million_events" is already in use by container

then run the below command
- **Note:** если при повторном запуске Eventsim появляется ошибка:

> docker: Error response from daemon: Conflict. The container name "/million_events" is already in use by container

выполните:

```bash
docker system prune
```
```
Loading