| import org.apache.kafka.clients.consumer.ConsumerConfig; | |
| import org.apache.kafka.common.serialization.Serdes; | |
| import org.apache.kafka.streams.KafkaStreams; | |
| import org.apache.kafka.streams.StreamsConfig; | |
| import org.apache.kafka.streams.kstream.KStream; | |
| import org.apache.kafka.streams.kstream.KStreamBuilder; | |
| import java.util.Properties; | |
| public class StreamsStarterApp { | |
| public static void main(String[] args) { | |
| Properties config = new Properties(); | |
| config.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-starter-app"); | |
| config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | |
| config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | |
| config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |
| config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |
| KStreamBuilder builder = new KStreamBuilder(); | |
| KStream<String, String> kStream = builder.stream("input-topic-name"); | |
| // do stuff | |
| kStream.to("word-count-output"); | |
| KafkaStreams streams = new KafkaStreams(builder, config); | |
| streams.cleanUp(); // only do this in dev - not in prod | |
| streams.start(); | |
| // print the topology | |
| System.out.println(streams.toString()); | |
| // shutdown hook to correctly close the streams application | |
| Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); | |
| } | |
| } |
Wednesday, 11 September 2019
StreamsStarterApp.java
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment