package com.manju.learning.kafka.streams |
|
|
| import java.lang |
| import java.util.Properties |
|
|
| import org.apache.kafka.clients.consumer.ConsumerConfig |
| import org.apache.kafka.common.serialization.Serdes |
| import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable} |
| import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig} |
|
|
| object FavouriteColourAppScala { |
| def main(args: Array[String]): Unit = { |
|
|
| val config: Properties = new Properties |
| config.put(StreamsConfig.APPLICATION_ID_CONFIG, "favourite-colour-scala") |
| config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092") |
| config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") |
| config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass) |
| config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass) |
|
|
| // we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod |
| config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0") |
|
|
| val builder: KStreamBuilder = new KStreamBuilder |
|
|
| // Step 1: We create the topic of users keys to colours |
| val textLines: KStream[String, String] = builder.stream[String, String]("favourite-colour-input") |
|
|
| val usersAndColours: KStream[String, String] = textLines |
| // 1 - we ensure that a comma is here as we will split on it |
| .filter((key: String, value: String) => value.contains(",")) |
| // 2 - we select a key that will be the user id (lowercase for safety) |
| .selectKey[String]((key: String, value: String) => value.split(",")(0).toLowerCase) |
| // 3 - we get the colour from the value (lowercase for safety) |
| .mapValues[String]((value: String) => value.split(",")(1).toLowerCase) |
| // 4 - we filter undesired colours (could be a data sanitization step) |
| .filter((user: String, colour: String) => List("green", "blue", "red").contains(colour)) |
|
|
| val intermediaryTopic = "user-keys-and-colours-scala" |
| usersAndColours.to(intermediaryTopic) |
|
|
| // step 2 - we read that topic as a KTable so that updates are read correctly |
| val usersAndColoursTable: KTable[String, String] = builder.table(intermediaryTopic) |
|
|
| // step 3 - we count the occurrences of colours |
| val favouriteColours: KTable[String, lang.Long] = usersAndColoursTable |
| // 5 - we group by colour within the KTable |
| .groupBy((user: String, colour: String) => new KeyValue[String, String](colour, colour)) |
| .count("CountsByColours") |
|
|
| // 6 - we output the results to a Kafka Topic - don't forget the serializers |
| favouriteColours.to(Serdes.String, Serdes.Long, "favourite-colour-output-scala") |
|
|
| val streams: KafkaStreams = new KafkaStreams(builder, config) |
| streams.cleanUp() |
| streams.start() |
|
|
| // print the topology |
| System.out.println(streams.toString) |
|
|
| // shutdown hook to correctly close the streams application |
| Runtime.getRuntime.addShutdownHook(new Thread { |
| override def run(): Unit = { |
| streams.close() |
| } |
| }) |
| } |
| }
package com.manju.learning.kafka.streams; |
|
|
| import java.util.Properties; |
| import java.util.Arrays; |
| import java.util.stream.Collectors; |
|
|
| import org.apache.kafka.clients.consumer.ConsumerConfig; |
| import org.apache.kafka.common.serialization.Serdes; |
| import org.apache.kafka.streams.KafkaStreams; |
| import org.apache.kafka.streams.KeyValue; |
| import org.apache.kafka.streams.StreamsConfig; |
| import org.apache.kafka.streams.kstream.KStream; |
| import org.apache.kafka.streams.kstream.KStreamBuilder; |
| import org.apache.kafka.streams.kstream.KTable; |
|
|
| public class FavouriteColourApp { |
|
|
| public static void main(String[] args) { |
| Properties config = new Properties(); |
| config.put(StreamsConfig.APPLICATION_ID_CONFIG, "favourite-colour-java"); |
| config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); |
| config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
| config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
| config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
|
|
| // we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod |
| config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0"); |
|
|
| KStreamBuilder builder = new KStreamBuilder(); |
|
|
| // Step 1: We create the topic of users keys to colours |
| KStream<String, String> textLines = builder.stream("favourite-colour-input"); |
|
|
| KStream<String, String> usersAndColours = textLines |
| // 1 - we ensure that a comma is here as we will split on it |
| .filter((key, value) -> value.contains(",")) |
| // 2 - we select a key that will be the user id (lowercase for safety) |
| .selectKey((key, value) -> value.split(",")[0].toLowerCase()) |
| // 3 - we get the colour from the value (lowercase for safety) |
| .mapValues(value -> value.split(",")[1].toLowerCase()) |
| // 4 - we filter undesired colours (could be a data sanitization step |
| .filter((user, colour) -> Arrays.asList("green", "blue", "red").contains(colour)); |
|
|
| usersAndColours.to("user-keys-and-colours"); |
|
|
| // step 2 - we read that topic as a KTable so that updates are read correctly |
| KTable<String, String> usersAndColoursTable = builder.table("user-keys-and-colours"); |
|
|
| // step 3 - we count the occurrences of colours |
| KTable<String, Long> favouriteColours = usersAndColoursTable |
| // 5 - we group by colour within the KTable |
| .groupBy((user, colour) -> new KeyValue<>(colour, colour)) |
| .count("CountsByColours"); |
|
|
| // 6 - we output the results to a Kafka Topic - don't forget the serializers |
| favouriteColours.to(Serdes.String(), Serdes.Long(),"favourite-colour-output"); |
|
|
| KafkaStreams streams = new KafkaStreams(builder, config); |
| // only do this in dev - not in prod |
| streams.cleanUp(); |
| streams.start(); |
|
|
| // print the topology |
| System.out.println(streams.toString()); |
|
|
| // shutdown hook to correctly close the streams application |
| Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); |
| } |
| } |
|
No comments:
Post a Comment