Wednesday 11 September 2019

kafka-streams-filter-tweets

import com.google.gson.JsonParser;
import
org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class StreamsFilterTweets {
public static void main(String[] args) {
// create properties
Properties properties = new Properties();
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "demo-kafka-streams");
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
// create a topology
StreamsBuilder streamsBuilder = new StreamsBuilder();
// input topic
KStream<String, String> inputTopic = streamsBuilder.stream("twitter_tweets");
KStream<String, String> filteredStream = inputTopic.filter(
// filter for tweets which has a user of over 10000 followers
(k, jsonTweet) -> extractUserFollowersInTweet(jsonTweet) > 10000
);
filteredStream.to("important_tweets");
// build the topology
KafkaStreams kafkaStreams = new KafkaStreams(
streamsBuilder.build(),
properties
);
// start our streams application
kafkaStreams.start();
}
private static JsonParser jsonParser = new JsonParser();
private static Integer extractUserFollowersInTweet(String tweetJson){
// gson library
try {
return jsonParser.parse(tweetJson)
.getAsJsonObject()
.get("user")
.getAsJsonObject()
.get("followers_count")
.getAsInt();
}
catch (NullPointerException e){
return 0;
}
}
}

kafka-producer-twitter


import com.google.common.collect.Lists; import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class TwitterProducer {
Logger logger = LoggerFactory.getLogger(TwitterProducer.class.getName());
// use your own credentials - don't share them with anyone
String consumerKey = "";
String consumerSecret = "";
String token = "";
String secret = "";
List<String> terms = Lists.newArrayList("bitcoin", "usa", "politics", "sport", "soccer");
public TwitterProducer(){}
public static void main(String[] args) {
new TwitterProducer().run();
}
public void run(){
logger.info("Setup");
/** Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);
// create a twitter client
Client client = createTwitterClient(msgQueue);
// Attempts to establish a connection.
client.connect();
// create a kafka producer
KafkaProducer<String, String> producer = createKafkaProducer();
// add a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("stopping application...");
logger.info("shutting down client from twitter...");
client.stop();
logger.info("closing producer...");
producer.close();
logger.info("done!");
}));
// loop to send tweets to kafka
// on a different thread, or multiple different threads....
while (!client.isDone()) {
String msg = null;
try {
msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
client.stop();
}
if (msg != null){
logger.info(msg);
producer.send(new ProducerRecord<>("twitter_tweets", null, msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("Something bad happened", e);
}
}
});
}
}
logger.info("End of application");
}
public Client createTwitterClient(BlockingQueue<String> msgQueue){
/** Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) */
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
hosebirdEndpoint.trackTerms(terms);
// These secrets should be read from a config file
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);
ClientBuilder builder = new ClientBuilder()
.name("Hosebird-Client-01") // optional: mainly for the logs
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue));
Client hosebirdClient = builder.build();
return hosebirdClient;
}
public KafkaProducer<String, String> createKafkaProducer(){
String bootstrapServers = "127.0.0.1:9092";
// create Producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// create safe Producer
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // kafka 2.0 >= 1.1 so we can keep this as 5. Use 1 otherwise.
// high throughput producer (at the expense of a bit of latency and CPU usage)
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); // 32 KB batch size
// create the producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
return producer;
}
}