The goal of the project is to create a generalized Complex Event Processing Operator using the library FlinkCEP.
- Kontaxakis Antonios
- Kotidis Yannis
StreamGenerator.py creates the text file with the data, based on the requested pattern and conditions we want to examine. The data contains one event per line in the format stream_id, window_id, event
, while the last line of the file is "-1, -1, KILL" in order to understand that there are no other events and to terminate the flink job. The user needs to enter 7 command-line arguments as follows:
- Pattern (String)
- Stream length (int)
- Number of sub-streams (int)
- Window size (int)
- Number of matches (int)
- Strict contiguity (boolean)
- File name for writing data (String)
./StreamGenerator.py 'ab{1,3}(c|d)' 1000 8 100 150 True 'data.txt'
CEPdata.java sends the data to a Kafka topic. The user needs to enter 3 command-line arguments as follows:
- File name for reading data (String)
- Name of the Kafka topic for sending data (String)
- Host IP (String)
java -jar data_kafka.jar 'data.txt' 'CEPdata' 'localhost'
CEPCase_Generate.java contains all the important operations:
- Reads the data from a Kafka topic (or from a text file)
- Re-writes the wanted regular expression to a FlinkCEP pattern based on wanted conditions
- Finds the matching results
- Writes the results to a Kafka topic (or to a text file)
The user needs to enter 12 command-line arguments as follows:
- Type (String): "Kafka" for using Kafka topics to read and write, or anything else for using text files.
- File name for reading data (String) (useful when type != "Kafka")
- File name for writing results (String) (useful when type != "Kafka")
- Pattern (String)
- Parallelism (int)
- Contiguity Condition (int): 1 = strict, 2 = relaxed, 3 = non deterministic relaxed
- After match skip strategy (int): 1 = no skip, 2 = skip to next, 3 = skip past last event, 4 = skip to first, 5 = skip to last
- Pattern name (int) (useful when strategy = 4 or strategy = 5)
- Flink job name (String)
- Name of the Kafka topic for reading data (String) (useful when type = "Kafka")
- Name of the Kafka topic for writing results (String) (useful when type = "Kafka")
- Host IP (String)
./bin/flink run ./examples/flink_job.jar 'Kafka' '-' '-' 'ab{1,3}(c|d)' 4 1 3 '-' 'Example' 'CEPdata' 'CEPout' 'localhost'