package com.manju.learning.kafka.streams; | |
import java.util.Properties; | |
import java.util.Arrays; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.common.serialization.Serdes; | |
import org.apache.kafka.streams.KafkaStreams; | |
import org.apache.kafka.streams.StreamsConfig; | |
import org.apache.kafka.streams.kstream.KStream; | |
import org.apache.kafka.streams.kstream.KStreamBuilder; | |
import org.apache.kafka.streams.kstream.KTable; | |
public class WordCountApp { | |
public static void main(String[] args) { | |
Properties config = new Properties(); | |
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); | |
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); | |
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | |
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |
KStreamBuilder builder = new KStreamBuilder(); | |
// 1 - stream from Kafka | |
KStream<String, String> textLines = builder.stream("word-count-input"); | |
KTable<String, Long> wordCounts = textLines | |
// 2 - map values to lowercase | |
.mapValues(textLine -> textLine.toLowerCase()) | |
// can be alternatively written as: | |
// .mapValues(String::toLowerCase) | |
// 3 - flatmap values split by space | |
.flatMapValues(textLine -> Arrays.asList(textLine.split("\\W+"))) | |
// 4 - select key to apply a key (we discard the old key) | |
.selectKey((key, word) -> word) | |
// 5 - group by key before aggregation | |
.groupByKey() | |
// 6 - count occurrences | |
.count("Counts"); | |
// 7 - to in order to write the results back to kafka | |
wordCounts.to(Serdes.String(), Serdes.Long(), "word-count-output"); | |
KafkaStreams streams = new KafkaStreams(builder, config); | |
streams.start(); | |
// shutdown hook to correctly close the streams application | |
Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); | |
// Update: | |
// print the topology every 10 seconds for learning purposes | |
while(true){ | |
System.out.println(streams.toString()); | |
try { | |
Thread.sleep(5000); | |
} catch (InterruptedException e) { | |
break; | |
} | |
} | |
} | |
} |
Wednesday, 11 September 2019
Word Count Kafka using Java
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment