Concept implementing a near real-time streaming architecture with postgres, dbt and Materialize.
- dbt-core (cloud version: dbt)
- Materialize
- PostgreSQL
- Kafka (clients: Kowl / Kafka Magic)
- diagrams
git clone https://github.com/gabrielecorni/cdc_streaming.git
or, if you want to reproduce the initial phases:
mkdir cdc_streaming
cd cdc_streaming
poetry init
# < interactively define dependencies by adding dbt-materialize >
dbt init cdc_streaming
# < move folders back to root >
# < delete some folders if unnecessary >
poetry shell
Note: remember to update your
~/.dbt/profiles.yml
file as well.
docker compose up -d # start the system
dbt run # execute dbt model
< ... > # explore results
docker compose down # stop the system
Open 3 different shells at folder cdc_streaming
, named dbt
, postgres
, materialize
:
- (shell1)
dbt
->docker compose up -d
- (shell2)
postgres
->docker exec -it cdc_streaming-db-1 bash
- (shell3)
materialize
->docker run -it --rm --network=cdc_streaming_default materialize/cli
Interact with the following CLI commands:
run the dbt model (once) and generate documentation:
dbt run # build the model
dbt docs generate # build documentation
dbt docs serve # serve documentation
login to the datalake
db:
psql -U postgres -d datalake
execute the following SQL statements:
datalake=$ INSERT INTO SOURCE.USERS(ID, NAME, SURNAME) VALUES
datalake-$ (4, 'CAIO', 'MARIO');
INSERT 0 1
datalake=$ INSERT INTO SOURCE.POLICIES(ID, POLICY_DETAILS, USER_ID) VALUES
datalake-$ (5672, 'ANTI THEFT', 4),
datalake-$ (5559, 'MOTO INSURANCE', 4);
INSERT 0 2
datalake=$ UPDATE SOURCE.POLICIES SET POLICY_DETAILS='HOME INSURANCE' WHERE ID=5559;
UPDATE 1
datalake=$ DELETE FROM SOURCE.POLICIES WHERE ID=5559;
DELETE 1
datalake=$ DELETE FROM SOURCE.USERS WHERE ID=4;
DELETE 1
while executing the above-mentioned queries on Postgres (source database), see how correspondent structures change on Materialize (target database).
# DISCOVERY operations
\dv # all views in db
show sources; # all sources
show views; # all views in dbt
show materialized views; # all materialized views in dbt
# CRUD operations
SELECT * FROM CUSTOMER_DATA_RAW; # mixed source publication
SELECT * FROM USERS; # individual source view
SELECT * FROM POLICIES; # individual source view
SELECT * FROM USER_POLICIES; # staging model
SELECT * FROM USERNAMES_POLICIES; # data mart
SELECT * FROM USERNAMES_GROUPING; # data mart
within the project, two different kafka consumers have been made available:
the correspondent kafka messages on topics are:
- usernames_grouping
[
{
"before": null,
"after": {
"row": {
"codsog": 1,
"nb_policies": 2
}
}
},
{
"before": null,
"after": {
"row": {
"codsog": 3,
"nb_policies": 1
}
}
},
{
"before": null,
"after": {
"row": {
"codsog": 4,
"nb_policies": 2
}
}
},
{
"before": {
"row": {
"codsog": 4,
"nb_policies": 2
}
},
"after": {
"row": {
"codsog": 4,
"nb_policies": 1
}
}
},
{
"before": {
"row": {
"codsog": 4,
"nb_policies": 1
}
},
"after": null
}
]
- usernames_policies
[
{
"before": null,
"after": {
"row": {
"codsog": 3,
"username": "CARLOVERDI",
"codpol": 5555,
"policy_details": "CAR INSURANCE"
}
}
},
{
"before": null,
"after": {
"row": {
"codsog": 1,
"username": "MARIOROSSI",
"codpol": 1234,
"policy_details": "HOME INSURANCE"
}
}
},
{
"before": null,
"after": {
"row": {
"codsog": 1,
"username": "MARIOROSSI",
"codpol": 5678,
"policy_details": "ANTI THEFT"
}
}
},
{
"before": null,
"after": {
"row": {
"codsog": 4,
"username": "CAIOMARIO",
"codpol": 5672,
"policy_details": "ANTI THEFT"
}
}
},
{
"before": null,
"after": {
"row": {
"codsog": 4,
"username": "CAIOMARIO",
"codpol": 5559,
"policy_details": "MOTO INSURANCE"
}
}
},
{
"before": {
"row": {
"codsog": 4,
"username": "CAIOMARIO",
"codpol": 5559,
"policy_details": "MOTO INSURANCE"
}
},
"after": null
},
{
"before": null,
"after": {
"row": {
"codsog": 4,
"username": "CAIOMARIO",
"codpol": 5559,
"policy_details": "HOME INSURANCE"
}
}
},
{
"before": {
"row": {
"codsog": 4,
"username": "CAIOMARIO",
"codpol": 5559,
"policy_details": "HOME INSURANCE"
}
},
"after": null
},
{
"before": {
"row": {
"codsog": 4,
"username": "CAIOMARIO",
"codpol": 5672,
"policy_details": "ANTI THEFT"
}
},
"after": null
}
]
The auto-generated, interactive documentation can be found at localhost:8080.
These links helped me during the implementation of this project: