In this project, we are trying to build a data pipeline using Lambda architecture to handle massive quantities of data by taking advantage of both batch and stream processing methods. Besides, we also analyze, create real-time dashboard and apply some Machine Learning models to predict the data of the Real Estate Sale 2001-2020 records.
.
├── dags
│ ├── custom_functions
│ │ ├── __init__.py
│ │ └── functions.py
│ ├── batch_layer_dag.py
│ ├── fetch_data_dag.py
│ ├── kafka_jobs_dag.py
│ └── speed_layer_dag.py
├── deploy
│ ├── docker
│ │ └── volumes
│ │ ├── airflow
│ │ ├── kafka
│ │ ├── postgres
│ │ └── zookeeper
│ ├── .env
│ ├── .env-template
│ ├── apache-airflow.yml
│ └── docker-compose.yml
├── great_expectations
│ ├── checkpoints
│ │ ├── total_customer_by_property_type_checkpoint.yml
│ │ ├── total_customer_by_town_checkpoint.yml
│ │ └── total_sale_amount_ratio_checkpoint.yml
│ ├── expectations
│ │ ├── total_customer_by_property_type_suite.json
│ │ ├── total_customer_by_town_suite.json
│ │ └── total_sale_amount_ratio_suite.json
│ ├── plugins
│ ├── profilers
│ └── great_expectations.yml
├── images
│ └── architecture.png
├── scripts
│ └── setup.sh
├── src
│ ├── batch_layer
│ │ ├── total_customer_by_property_type_batch.py
│ │ ├── total_customer_by_town_batch.py
│ │ └── total_sale_amount_ratio_batch.py
│ ├── checkpoint
│ ├── dashboard
│ │ └── dashboard.py
│ ├── data
│ ├── data_source
│ ├── helper
│ │ ├── __init__.py
│ │ └── helper.py
│ ├── init
│ │ ├── init.sql
│ │ └── user_roles.sql
│ ├── kafka_jobs
│ │ ├── __init__.py
│ │ ├── consumer.py
│ │ └── producer.py
│ ├── logger
│ │ ├── __init__.py
│ │ └── logger.py
│ ├── logs
│ ├── model
│ │ ├── create_model.py
│ ├── speed_layer
│ │ ├── total_customer_by_property_type_speed.py
│ │ ├── total_customer_by_town_speed.py
│ │ └── total_sale_amount_ratio_speed.py
│ ├── test
│ │ ├── batch_validations
│ │ │ ├── total_customer_by_property_type_gx.py
│ │ │ ├── total_customer_by_town_gx.py
│ │ │ └── total_sale_amount_ratio_gx.py
│ │ └── utils
│ │ ├── __init__.py
│ │ └── utils.py
│ ├── tmp
│ ├── .dockerignore
│ ├── Dockerfile
│ └── requirements.txt
├── .gitignore
├── Makefile
├── README.md
- Python 3.8.*
- Apache Spark 3.2.*
- Scala 2.12.*
- Docker and Docker Compose
- Snowflake account
- Linux OS
- Environment variables file
- Nevigate to
deploy
folder - Change
.env-template
to.env
- Adjust some basic values and credentials in
.env
- Nevigate to
- Virtual environment
- Setup environment:
make setup
- Setup environment:
- Create docker network
- Create network:
docker network create kafka-airflow
- Create network:
- Snowflake credentials for great expectation
- Go to folder great_expectations:
cd great_expectations
- Modify great_expectations.yml file:
datasources > snowflake_db > execution_engine > connection_string
- Snowflake credentials:
- With password:
snowflake://<USER_NAME>:<PASSWORD>@<ACCOUNT>.<REGION>/<DATABASE>/<SCHEMA>?warehouse=<WAREHOUSE>&role=<ROLE>
- No password:
snowflake://<USER_NAME>@<ACCOUNT>.<REGION>/<DATABASE>/<SCHEMA>?authenticator=externalbrowser&warehouse=<WAREHOUSE>&role=<ROLE>
- With password:
- Go to folder great_expectations:
- Build docker image
- Build image:
make build-image
- Build image:
- Run Kafka container
- Run command:
make start-kafka
- Run command:
- Run Airflow container
- Run command:
make start-airflow
- Run command:
The SQL scripts are located in src/init/
folder. You need to run these scripts in Snowflake worksheet to create initial database for the project.
There are 2 scripts for creating database and user roles:
init.sql
: This script sets up the data warehouse, database, schema and tables.user_roles.sql
: This script used to create user role Database admin who has ability to query and CRUD tables.
Run and connect Airflow in the same machine with port 8080, the Airflow webserver address: https://localhost:8080
Run Airflow in one machine and connect the Airflow webserver from other machine, follow below steps:
- Connect to server through terminal which hosts the Airflow and bind the port 8080 (or any ports you want, you can use -L option multiple time to bind) with local port in remote machine (for safe security):
ssh -o ServerAliveInterval=120 -o ServerAliveCountMax=2 -L 8080:localhost:8080 <username>@<ip-address> -p <port>
-
Setup firewall:
- Enable ufw firewall:
sudo ufw enable
- Open port 8080:
sudo ufw allow 8080/tcp
- View list of allowed ports:
sudo ufw status verbose
- Enable ufw firewall:
-
Access Webserver UI: https://localhost:8080
All DAGs are located in dags/
folder.
-
total_customer_by_property_type_batch
: Get the data fromsale_lake.data_lake
and calculate the total customer categorized by property type. Then store the result intosale_batch.total_customer_by_property_type
. -
total_customer_by_property_type_validation
: This task runs the checkpoint to validate data that has just landed insale_batch.total_customer_by_property_type
. -
total_customer_by_town_batch
: Get the data fromsale_lake.data_lake
and calculate the total customer categorized by town. Then store the result intosale_batch.total_customer_by_town
. -
total_customer_by_town_validation
: After the previous task stores the result in Snowflake, this task will run another checkpoint to validate the data insale_batch.total_customer_by_town
. -
total_sale_amount_ratio_batch
: Get the data fromsale_lake.data_lake
and calculate the total sale amount, total customer and total sale ratio. Then store the result intosale_batch.total_sale_amount_ratio
. -
total_sale_amount_ratio_validation
: After this task runs successfully and stores the result into table, it will run a checkpoint to validate data insale_batch.total_sale_amount_ratio
. -
done
: It is a task which will be only triggered when all tasks run successfully.
-
total_customer_by_property_type_speed
: This task runs Spark Streaming to get the data which is stored in Kafka topicreal_estate_sales
, transform it and calculate the total customer grouped by property type. Then store the result intosale_speed.total_customer_by_property_type_speed
. -
total_customer_by_town_speed
: This task is similar with the previous one except it stores the result about total customer from each town intosale_speed.total_customer_by_town_speed
. -
total_sale_amount_ratio_speed
: This task executes another Spark Streaming and calculate the total sale amount, total sale ratio and total customer. Then store them intosale_speed.total_sale_amount_ratio_speed
. -
done
: This task will be triggered if one of previous task runs failed. Otherwise, it will not be executed.
-
producer
: This task using Spark Streaming to ingest real-time data. Whenever a new parquet file is moved intodata/
folder, it will read this parquet file and store the data in Kafka topicreal_estate_sales
. -
consumer
: When new data is stored in Kafka topic, this task will read these new data immediately, adjust data type of some columns, fill null value to empty columns and store them into data lake:sale_lake.data_lake
. -
done
: This task will be triggered if one of previous task runs failed. Otherwise, it will not be executed.
-
folder_is_empty
: This task uses BranchPythonOperator to check whether or nottmp/
folder is empty. There are two separated branches:- Folder is empty: to download_file branch (first branch).
- Folder is not empty: to move_file_to_folder_data branch (second branch).
-
move_file_to_folder_data
: It will move a parquet file intmp/
folder todata/
folder in a certain period of time. -
point_to_next_file
: Each parquet file intmp/
folder has a certain format with a suffix number:Real_Estate_Sales-{x}.parquet
(x: 00,01,02,...). This task will rely on the current number x and point to the next file (increase x by 1: xnext = xcurrent + 1) to move in next schedule. -
download_file
: It will downloadReal Estate Sales 2001-2020 GL
dataset from the website (link download) in csv format and store indata_source/
folder. -
convert_to_parquet
: This task converts the csv file to parquet file. -
adjust_dataframe
: After convert to parquet file, it will change the data type of all columns, rename columns and fill null value to empty cells. Then overwrite the old parquet file. -
split_file
: When the parquet file is in correct format and data type, it is divided into smaller parquet files based on the number of partitions and stored intmp/
folder. In this project, we set the partitions to 4000. You can modify the number of partition by opening the Airflow webserver, then go toAdmin > Variables
session and change the parameter PARTITION.
Note: The configuration in this section can be skipped since I have already done and created.
In this project, we will use Great Expectations version 3 API. Except for the very first step of creating a Data Context (the Great Expectations folder and associated files), all of the configuration for connecting to Datasources, creating Expectation Suites, creating Checkpoints and validating, will be accomplished using Python scripts in src/test/
folder.
This folder contains 2 sub-folders:
batch_validations
: expectations, checkpoints and validations for batch layer.utils
: helps to run, config and create datasource, batch request and checkpoints.
We have already initialized the Data Context, validations and checkpoints. But if you wish to start from scratch, keep a copy of the config_variables.yml and great_expectations.yml files and delete the great_expectations folder.
- Initialize the Data Context by running:
great_expectations init
-
Run Python script in the
test/batch_validations/
folder where batches of data can be sampled, expectation suites created, and Checkpoints can be configured.Note: Before running any of the Python scripts, run the Airflow DAG at least once, so that all the data files and tables are moved to their respective locations. Only then wilFl you be able to run the scripts and test the Datasource connections locally.
-
All checkpoints and expectation suites is located in
great_expectations/checkpoints/
andgreat_expectations/expectations/
folders respectively. -
Build data docs:
great_expectations docs build