import com.google.common.collect.Lists; import com.twitter.hbc.ClientBuilder; | |
import com.twitter.hbc.core.Client; | |
import com.twitter.hbc.core.Constants; | |
import com.twitter.hbc.core.Hosts; | |
import com.twitter.hbc.core.HttpHosts; | |
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; | |
import com.twitter.hbc.core.processor.StringDelimitedProcessor; | |
import com.twitter.hbc.httpclient.auth.Authentication; | |
import com.twitter.hbc.httpclient.auth.OAuth1; | |
import org.apache.kafka.clients.producer.*; | |
import org.apache.kafka.common.serialization.StringSerializer; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.List; | |
import java.util.Properties; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
public class TwitterProducer { | |
Logger logger = LoggerFactory.getLogger(TwitterProducer.class.getName()); | |
// use your own credentials - don't share them with anyone | |
String consumerKey = ""; | |
String consumerSecret = ""; | |
String token = ""; | |
String secret = ""; | |
List<String> terms = Lists.newArrayList("bitcoin", "usa", "politics", "sport", "soccer"); | |
public TwitterProducer(){} | |
public static void main(String[] args) { | |
new TwitterProducer().run(); | |
} | |
public void run(){ | |
logger.info("Setup"); | |
/** Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */ | |
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000); | |
// create a twitter client | |
Client client = createTwitterClient(msgQueue); | |
// Attempts to establish a connection. | |
client.connect(); | |
// create a kafka producer | |
KafkaProducer<String, String> producer = createKafkaProducer(); | |
// add a shutdown hook | |
Runtime.getRuntime().addShutdownHook(new Thread(() -> { | |
logger.info("stopping application..."); | |
logger.info("shutting down client from twitter..."); | |
client.stop(); | |
logger.info("closing producer..."); | |
producer.close(); | |
logger.info("done!"); | |
})); | |
// loop to send tweets to kafka | |
// on a different thread, or multiple different threads.... | |
while (!client.isDone()) { | |
String msg = null; | |
try { | |
msg = msgQueue.poll(5, TimeUnit.SECONDS); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
client.stop(); | |
} | |
if (msg != null){ | |
logger.info(msg); | |
producer.send(new ProducerRecord<>("twitter_tweets", null, msg), new Callback() { | |
@Override | |
public void onCompletion(RecordMetadata recordMetadata, Exception e) { | |
if (e != null) { | |
logger.error("Something bad happened", e); | |
} | |
} | |
}); | |
} | |
} | |
logger.info("End of application"); | |
} | |
public Client createTwitterClient(BlockingQueue<String> msgQueue){ | |
/** Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) */ | |
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST); | |
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint(); | |
hosebirdEndpoint.trackTerms(terms); | |
// These secrets should be read from a config file | |
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret); | |
ClientBuilder builder = new ClientBuilder() | |
.name("Hosebird-Client-01") // optional: mainly for the logs | |
.hosts(hosebirdHosts) | |
.authentication(hosebirdAuth) | |
.endpoint(hosebirdEndpoint) | |
.processor(new StringDelimitedProcessor(msgQueue)); | |
Client hosebirdClient = builder.build(); | |
return hosebirdClient; | |
} | |
public KafkaProducer<String, String> createKafkaProducer(){ | |
String bootstrapServers = "127.0.0.1:9092"; | |
// create Producer properties | |
Properties properties = new Properties(); | |
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | |
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | |
// create safe Producer | |
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); | |
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); | |
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)); | |
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // kafka 2.0 >= 1.1 so we can keep this as 5. Use 1 otherwise. | |
// high throughput producer (at the expense of a bit of latency and CPU usage) | |
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); | |
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20"); | |
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); // 32 KB batch size | |
// create the producer | |
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); | |
return producer; | |
} | |
} |
Wednesday, 11 September 2019
kafka-producer-twitter
Subscribe to:
Post Comments (Atom)
Very well articulated and so easy to understand.
ReplyDeleteBig Data Certifications
I recommend Data Science Course to those who want to change their career and want to explore themselves in field of Data Science.
ReplyDeleteAs the opportunities of Data Scientist increases day by data, this field will help you to pursue your dreams .Day by day data increases so the companies need more data scientist to analyze the data and this would increase the job opportunities. You can get a good job in Data science by the completion of the Course.
https://www.gyansetu.in/courses/data-science-training-gurgaon/