Wednesday 11 September 2019

FavouriteColourAppScala.scala/Java

package com.manju.learning.kafka.streams
import java.lang
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
object FavouriteColourAppScala {
def main(args: Array[String]): Unit = {
val config: Properties = new Properties
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "favourite-colour-scala")
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092")
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass)
// we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0")
val builder: KStreamBuilder = new KStreamBuilder
// Step 1: We create the topic of users keys to colours
val textLines: KStream[String, String] = builder.stream[String, String]("favourite-colour-input")
val usersAndColours: KStream[String, String] = textLines
// 1 - we ensure that a comma is here as we will split on it
.filter((key: String, value: String) => value.contains(","))
// 2 - we select a key that will be the user id (lowercase for safety)
.selectKey[String]((key: String, value: String) => value.split(",")(0).toLowerCase)
// 3 - we get the colour from the value (lowercase for safety)
.mapValues[String]((value: String) => value.split(",")(1).toLowerCase)
// 4 - we filter undesired colours (could be a data sanitization step)
.filter((user: String, colour: String) => List("green", "blue", "red").contains(colour))
val intermediaryTopic = "user-keys-and-colours-scala"
usersAndColours.to(intermediaryTopic)
// step 2 - we read that topic as a KTable so that updates are read correctly
val usersAndColoursTable: KTable[String, String] = builder.table(intermediaryTopic)
// step 3 - we count the occurrences of colours
val favouriteColours: KTable[String, lang.Long] = usersAndColoursTable
// 5 - we group by colour within the KTable
.groupBy((user: String, colour: String) => new KeyValue[String, String](colour, colour))
.count("CountsByColours")
// 6 - we output the results to a Kafka Topic - don't forget the serializers
favouriteColours.to(Serdes.String, Serdes.Long, "favourite-colour-output-scala")
val streams: KafkaStreams = new KafkaStreams(builder, config)
streams.cleanUp()
streams.start()
// print the topology
System.out.println(streams.toString)
// shutdown hook to correctly close the streams application
Runtime.getRuntime.addShutdownHook(new Thread {
override def run(): Unit = {
streams.close()
}
})
}
}
package com.manju.learning.kafka.streams;
import java.util.Properties;
import java.util.Arrays;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
public class FavouriteColourApp {
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "favourite-colour-java");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
KStreamBuilder builder = new KStreamBuilder();
// Step 1: We create the topic of users keys to colours
KStream<String, String> textLines = builder.stream("favourite-colour-input");
KStream<String, String> usersAndColours = textLines
// 1 - we ensure that a comma is here as we will split on it
.filter((key, value) -> value.contains(","))
// 2 - we select a key that will be the user id (lowercase for safety)
.selectKey((key, value) -> value.split(",")[0].toLowerCase())
// 3 - we get the colour from the value (lowercase for safety)
.mapValues(value -> value.split(",")[1].toLowerCase())
// 4 - we filter undesired colours (could be a data sanitization step
.filter((user, colour) -> Arrays.asList("green", "blue", "red").contains(colour));
usersAndColours.to("user-keys-and-colours");
// step 2 - we read that topic as a KTable so that updates are read correctly
KTable<String, String> usersAndColoursTable = builder.table("user-keys-and-colours");
// step 3 - we count the occurrences of colours
KTable<String, Long> favouriteColours = usersAndColoursTable
// 5 - we group by colour within the KTable
.groupBy((user, colour) -> new KeyValue<>(colour, colour))
.count("CountsByColours");
// 6 - we output the results to a Kafka Topic - don't forget the serializers
favouriteColours.to(Serdes.String(), Serdes.Long(),"favourite-colour-output");
KafkaStreams streams = new KafkaStreams(builder, config);
// only do this in dev - not in prod
streams.cleanUp();
streams.start();
// print the topology
System.out.println(streams.toString());
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}

No comments:

Post a Comment