| import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.kafka.clients.consumer.ConsumerRecords; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.kafka.clients.consumer.KafkaConsumer; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.kafka.common.serialization.StringDeserializer; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.slf4j.Logger; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.slf4j.LoggerFactory; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.time.Duration; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.Arrays; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.Properties; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public class ConsumerDemo { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public static void main(String[] args) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Logger logger = LoggerFactory.getLogger(ConsumerDemo.class.getName()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| String bootstrapServers = "127.0.0.1:9092"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| String groupId = "my-fourth-application"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| String topic = "first_topic"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // create consumer configs | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Properties properties = new Properties(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // create consumer | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // subscribe consumer to our topic(s) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| consumer.subscribe(Arrays.asList(topic)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // poll for new data | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| while(true){ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ConsumerRecords<String, String> records = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| consumer.poll(Duration.ofMillis(100)); // new in Kafka 2.0.0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (ConsumerRecord<String, String> record : records){ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info("Key: " + record.key() + ", Value: " + record.value()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info("Partition: " + record.partition() + ", Offset:" + record.offset()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}
|
Wednesday, 11 September 2019
Kafka ConsumerDemo/producer.java
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment