-
Notifications
You must be signed in to change notification settings - Fork 15
/
Producer.scala
37 lines (27 loc) · 1.16 KB
/
Producer.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.kafka.demo.cakesolutions
import cakesolutions.kafka.KafkaProducer.Conf
import cakesolutions.kafka.{ KafkaProducer, KafkaProducerRecord }
import com.kafka.demo.KafkaHelper
import com.typesafe.scalalogging.Logger
import org.apache.kafka.common.serialization.StringSerializer
/*
* https://github.com/cakesolutions/scala-kafka-client/wiki/Scala-Kafka-Client#producer
*/
object Producer {
private[this] val logger = Logger(getClass.getSimpleName)
private[this] val BOOTSTRAP_SERVERS_VALUE = "localhost:9092"
private[this] val TOPIC_NAME = "example.no-schema.cakesolutions"
private[cakesolutions] def newProducer(bootstrapServers: String): KafkaProducer[String, String] =
KafkaProducer(Conf(new StringSerializer(), new StringSerializer(), bootstrapServers))
def main(args: Array[String]): Unit = {
logger.info(s"Start to produce on $TOPIC_NAME")
val producer = newProducer(BOOTSTRAP_SERVERS_VALUE)
KafkaHelper
.produceMessages { case (i, message) =>
KafkaProducerRecord(TOPIC_NAME, s"$i", message)
}
.foreach(producer.send)
producer.close()
logger.info(s"Finish to produce on $TOPIC_NAME")
}
}