import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.common.serialization.Serdes; | |
import org.apache.kafka.streams.KafkaStreams; | |
import org.apache.kafka.streams.StreamsConfig; | |
import org.apache.kafka.streams.kstream.KStream; | |
import org.apache.kafka.streams.kstream.KStreamBuilder; | |
import java.util.Properties; | |
public class StreamsStarterApp { | |
public static void main(String[] args) { | |
Properties config = new Properties(); | |
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-starter-app"); | |
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | |
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | |
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |
KStreamBuilder builder = new KStreamBuilder(); | |
KStream<String, String> kStream = builder.stream("input-topic-name"); | |
// do stuff | |
kStream.to("word-count-output"); | |
KafkaStreams streams = new KafkaStreams(builder, config); | |
streams.cleanUp(); // only do this in dev - not in prod | |
streams.start(); | |
// print the topology | |
System.out.println(streams.toString()); | |
// shutdown hook to correctly close the streams application | |
Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); | |
} | |
} |
Wednesday, 11 September 2019
StreamsStarterApp.java
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment