This repository consists of a Kafka event generator and a Flink processor. The code is adapted from the Yahoo streaming benchmark (blog post here).
In this advertising use case, ad events are generated by a Kafka producer in a JSON format. The events are parsed, filtered for the ad "view" events, unneeded fields are removed, and new fields are added by joining the event with campaign data stored in Redis. Views are then aggregated by campaign and by time window and stored back into Redis, along with a timestamp to indicate when they are updated.
This experiment is based on the Yahoo Streaming Benchmark \footnote{Benchmarking Streaming Computation Engines at Yahoo! URL: https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at}. It implements a simple streaming advertisement job where there are a number of advertising campaigns and a number of advertisements for each campaign.
The authors of the benchmark created a Kafka Producer application which would generate events containing, among other things, an event_time, an event_type, and an ad_id. As per the authors recommendations, in order to generate the appropriate load and not let the producers fall behind, 30 Kafka producers were instantiated on as many nodes, each producing 15,000 msg/sec resulting in a total processing load of 450,000 msg/sec.
The job of the benchmark is to read various JSON events from Kafka, identify relevant events, and store a windowed count of these events per campaign in Redis. The job consists of the following operations: read and event from kafka; deserialize the JSON string; filter out irrelevant events (based on type field), take a projections of the relevant fields (ad_id and event_time), join each event by ad_id with its associated campaign_id stored in Redis; take a 10 second windowed count of events per campaign and store each window in Redis along with a timestamp of when the window was last updated. For the purposes of our experiments, we modified the Flink benchmark by enabling checkpointing and replacing the handwritten windowing functionality with the default Flink implementation. Although doing so does decrease update frequency to the length of each window, results should be accurate and more interesting for our experiments due to the accumulated windowing operator state at each node.
Maven and Leiningen must be installed.
Tests require Zookeeper, Kafka, Redis, HDFS, and Flink (1.10.0).
Kafka and Redis configurations need to be specified for both the generator and Flink processor.
Generator: setup/resources/benchmarkConf.yaml
Processor: processor/src/main/resources/advertising.properties
Leiningen can be used to run various commands within the setup directory.
Before events can be generated, campaign ids need to be written to Redis:
lein run -n
After campaign ids are written, a configurable number of events can be emitted each second from a Kafka producer with:
lein run -r -t [number of events to emit per second]
To execute the generator in the background use:
lein rein -r -t [number of events to emit per second] > /dev/null 2 > & 1 &
The generator will continue to produce events until the process is terminated.
To create a jar for the Flink processor, run mvn clean && mvn package
from the root directory.
The newly created "processor-1.0-SNAPSHOT.jar" can then be submitted to the Flink cluster via the Flink dashboard or by running:
flink run processor/target/processor-1.0-SNAPSHOT.jar [checkpoint interval (ms)]
The processor expects the checkpoint interval in milliseconds as an argument.
Note: The number of task managers must be greater or equal to the number of Kafka partitions specified in the configuration file.
Results are written directly to Redis.
Running lein run -g
in the setup directory will create two text files, seen.txt and updated.txt. seen.txt contains the counts of events for different campaigns and time windows. updated.txt is the latency in ms from when the last event was emitted to Kafka for that particular campaign window and when it was written into Redis.