The pipeline collects data from sources, in the form of (csv tables / json dictionaries) data, so that in the end:
- Collects data from external sources to Luigi targets.
- Data cleansing.
- Land data to DWH.
The application code is written in python and obviously depends on it.
Python version 3.6 [Python Software Foundation License / (with) Zero-Clause BSD license (after 3.8.6 version Python)]:
Luigi [Apache License 2.0]:
Used to Luigi tasks conveyor.
Pandas [BSD-3-Clause license]:
Used to work with tabular data.
NumPy [BSD-3-Clause license]:
Used to bring the table cells to the desired value.
PyArrow [Apache-2.0 license]:
Used to save data in parquet format.
pip install luigi
pip install pandas
pip install numpy
pip install pyarrow
Wrappers for data from external sources.
- Reads datasets in the directory received from the parameter 'external_data_path'.
⚠️ All paths to partitions inside the root directory of the passed ExternalData must be in the format 'Dataset_Name/YYYY/MM/DD/'. - For all partitions where a '_Validate' flag file was found, creates a new '_Validate_Success' flag as Luigi.LocalTarget.
- Reads data from ExternalData by dates.
- Merges them into one array.
- If 'drop_list' parameter is not 'None' ('None' as default) Task will drop all columns names in this Luigi.ListParameter.
Example of 'drop_list' Luigi.ListParameter:
["drop_name", "Delete"]
- 'extract_file_mask' Luigi.Parameter as output file format and 'external_data_file_mask' as input.
- Remove all lines matching the transform_parsing_rules_drop parameter.
Example of 'transform_parsing_rules_drop' Luigi.DictParameter:
{"column_to_drop": ["False", "NaN", 0]}
- Rows will be discarded if at least one value matches in ALL keys of transform_parsing_rules_filter.
Example of 'transform_parsing_rules_filter' Luigi.DictParameter:
{"column_to_filter": ["drop_if_not_in_vip", "drop_too"], "filter_too": ["0"]}
- And provided that the string does not contain values from the transform_parsing_rules_vip keys.
Example of 'transform_parsing_rules_vip' Luigi.DictParameter:
{"data_to_save_like_vip": ["vip_value_1", "vip_value_2"], "save_too": ["vip_value_3"]}
- Has 'date_parameter' Luigi.DateParameter (today as default).
- 'transform_file_mask' Luigi.Parameter as output file format and 'extract_file_mask' as input.
- Landing result data to directory received from the Luigi.Parameter 'load_data_path'.
- Has 'date_parameter' Luigi.DateParameter (today as default).
- 'load_file_mask' Luigi.Parameter as output file format and 'transform_file_mask' as input.
If you want to use a simple launch by passing Luigi parameters through a configuration file:
- Fill the 'luigi_config.cfg' file with correct data.
- Then run the script 'luigi_pipeline.py'.
Files location:
./📂Luigi_ETL
└── 📁Pipeline
├── 📄luigi_pipeline.py
└── 📁My_Beautiful_Tasks.py
└── 📁Configuration
└── 📄luigi_config.cfg
Please note that rows with optional parameters can be removed from the 'luigi_config' if you do not need them.
Example of run script:
python3 -B -m .luigi_pipeline.py
First you need to replace the variable 'build' to variable 'run' in 'Pipeline_launcher.py' script,
with removing all the parameters passed to it.
Then you need to clear all parameters in Luigi's task instances that are called in 'luigi_pipeline.py' script.
After that, you can start Luigi by passing parameters through the terminal, or using a 'start_luigi_etl_pipeline.sh' script.
Files location:
./📂Luigi_ETL
└── 📁Pipeline
├── 📄luigi_pipeline.py
├── 📄start_luigi_etl_pipeline.sh
└── 📁My_Beautiful_Tasks
└── 📄Pipeline_launcher.py
If Your OS has a bash shell the ETL pipeline can be started using the bash script:
./start_luigi_etl_pipeline.sh
The script contains an example of all the necessary arguments to run.
To launch the pipeline through this script, do not forget to make it executable.
chmod +x ./start_luigi_etl_pipeline.sh
The script can also be run directly with python.
Example of run script:
python3 -B -m luigi_pipeline Load.LoadTask --local-scheduler \
--ExternalData.ExternalData-external-data-path "~/luigi_tasks/ExternalData" \
\
--Extract.ExtractTask-extract-data-path "~/luigi_tasks/ExtractTask" \
--Extract.ExtractTask-extract-file-mask "csv" \
--Extract.ExtractTask-external-data-file-mask "csv" \
--Extract.ExtractTask-drop-list "['column_drop_name', 'column_to_delete']" \
\
--Transform.TransformTask-file-to-transform-path "~/luigi_tasks/TransformTask" \
--Transform.TransformTask-transform-file-mask "json" \
--Transform.TransformTask-transform-parsing-rules-drop "{'column_to_drop': [False, 'NaN', 0]}" \
--Transform.TransformTask-transform-parsing-rules-filter "{'column_to_filter': ['drop_if_not_in_vip', 'drop_too'], 'filter_too': ['0']}" \
--Transform.TransformTask-transform-parsing-rules-vip "{'data_to_save_like_vip': ['vip_value_1, vip_value_2'], 'save_too': ['vip_value_3']}" \
--Transform.TransformTask-date-path-part $(date +%F --date "2022-12-01") \
\
--Load.LoadTask-load-data-path "~/luigi_tasks/LoadTask" \
--Load.LoadTask-load-file-mask "parquet"
The example above shows the launch of all tasks.
Tests are embedded inside the pipeline.
Thank you for your interest in my work.