import java.util.{Collections, Properties}
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import scala.collection.JavaConversions._
object KafkaConsumerExample {
def main(args: Array[String]): Unit = {
val conf = ConfigFactory.load
val envProps = conf.getConfig(args(0))
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getString("bootstrap.server"))
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.GROUP_ID_CONFIG, "1")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(Collections.singletonList("Kafka-Testing"))
while(true){
val records = consumer.poll(500)
for (record <- records.iterator()) {
println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset())
}
}
}
}
Thursday, 23 August 2018
Consume Messages – Consumer API In scala
Subscribe to:
Post Comments (Atom)
Really very useful and helpful us ...this is a nice post....http://bit.ly/2IJn1G0
ReplyDeleteGreat Article
ReplyDeleteIEEE Projects for CSE in Big Data
Java Training in Chennai
Final Year Project Centers in Chennai
Java Training in Chennai