import java.util.{Collections, Properties} import com.typesafe.config.ConfigFactory import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import scala.collection.JavaConversions._ object KafkaConsumerExample { def main(args: Array[String]): Unit = { val conf = ConfigFactory.load val envProps = conf.getConfig(args(0)) val props = new Properties() props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getString("bootstrap.server")) props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") props.put(ConsumerConfig.GROUP_ID_CONFIG, "1") val consumer = new KafkaConsumer[String, String](props) consumer.subscribe(Collections.singletonList("Kafka-Testing")) while(true){ val records = consumer.poll(500) for (record <- records.iterator()) { println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()) } } } }
Thursday, 23 August 2018
Consume Messages – Consumer API In scala
Produce Messages – Producer API in Scala
import java.util.Properties import com.typesafe.config.ConfigFactory import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} object KafkaProducerExample { def main(args: Array[String]): Unit = { val conf = ConfigFactory.load val envProps = conf.getConfig(args(0)) val props = new Properties() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getString("bootstrap.server")) props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerExample") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) val data = new ProducerRecord[String, String]("Kafka-Testing", "Key", "Value") producer.send(data) producer.close() } }
Subscribe to:
Posts (Atom)