In this section, the consumer is enhanced to store the messages in a Postgres DB. The goal is to show the client scalability and resilience
-
Stop the consumer and producer and restore the normal producer behavior
oc scale deployment kafka-consumer --replicas=0 oc scale deployment kafka-producer --replicas=0 oc edit configmap kafka-producer-config
update the evironment variable to its original value
PRODUCER_PARTED: "false"
-
Deploy the database (basic ephemaral deployment)
oc create configmap event-db-init-data --from-file=./kafka-producer/src/main/resources/import.sql oc apply -f k8s/20-postgres.yaml
-
Edit the consumer configuration to enable persistence:
oc edit configmap/kafka-consumer-config
Change this environment variable:
TRACKING_DB: "true"
-
Reset the producer and the consumer
oc scale deployment kafka-consumer --replicas=1 oc scale deployment kafka-producer --replicas=1
-
Deploy a simple Python application to poll the DB and detect duplicate or missing messages
oc new-build --strategy docker --binary --name=db-watcher oc start-build db-watcher --from-dir python-db-watcher/ --follow oc new-app -l app.kubernetes.io/part-of=event-application -e POSTGRES_SVC=event-db db-watcher
-
Open the db-watcher logs
oc logs --tail 10 -f --selector="deployment=db-watcher"
NoteAt this point, there should be no missing or duplicate messages, so the log should be empty. -
Scale up the consumer
oc scale deployment kafka-consumer --replicas=2
NoteWhen the new consumer pod become active, the other consumer gives up half of his partitions to the new one. For a while you should spot some missing messages in db-watcher
log. However, it’s a transient condition. -
Test the consumer’s resiliance and consistency by brutely shutting down one of the two pods.
TipUse the following command: oc delete --force pod <pod-name>
Up to now, the producer has been using an in-memory counter to generate an ordered sequence of messages.
By changing the producer configuration, it’s possible to leverage a PostgresSQL sequence to maintain a shared counter. As a result, it’s possible to scale up the producer and probe the producer failover.
-
Open the producer configuration and set
SEQUENCE_DB`to `true
oc edit configmap kafka-producer-config
-
Restart the producer scaling it down and up to 2 instance:
oc scale deployment kafka-producer --replicas=0 oc scale deployment kafka-producer --replicas=2
-
After a few seconds you can start dropping the producer pods:
oc delete --force pod kafka-producer-<id>
-
Analyzing the db-watcher log, you are unlikely to find a missing message, because you should stop the producer at the exact moment when it gets the sequence number from the DB and before it sends it to Kafka. For educational purposes, the producer exposes an endpoint which throw a runtime exception at exactly that point:
-
Open a shell into one of the producer pod:
oc rsh kafka-producer-5d8856fb9f-cmt7r
-
Inject and HTTP PUT request on the local endpoint:
$ curl -X PUT localhost:8080
-
-
Now, looking at the db-watcher log you should see a missing message.