Tuesday 22 May 2018

TwitterPopularTag Kafka sparkstreaming

import org.apache.spark.streaming.{Seconds,Streaming Context}
import org.apche.spark.SparkContect._
import org.apache.spark.Streaming.twitter._
import org.apache.spark.SparkConf

object TwitterPopularTags{
            def main(args:Array[String])
            {
            if(args.length<4)
            {
            System.err.println("Usage:TwitterPopularTags<Consumer Key><Consumer secret>"+".      <acess token><access token secret>[<filters>]")
            System.exit(1)

            }
            StreaningExamples.setStreamingStreamingLogLevels()

            val Array(consumerkey,consumerSecret,acessToken,acessTokenSecrete) = args.take(4)
            val filters = args.takeRight(args.length-4)
            System.setProperty("twitter4j.oauth.consumerKey",consumerKey)
            System.setProperty("twitter4j.oauth.consumerSecret",consumerSecret)
            System.setProperty("twitter4j.oauth.acessToken",acessToken)
            System.setProperty("twitter4j.oauth.accessTokenSecret",acessTokenSecret)

            val sparkConf = new sparkConf().setAppName("TwitterPopularTags").setMaster("local[2]")
            val ssc = new StreamingContext(sparkConf,Seconds(2))
            val stram = TwitterUtils.createStream(ssc,None,filters)

            val hashTags = stream.flatMap(status => status.getText.split("").filter(_.startsWith("#")))

            val topCounts60 = hashTags.map((_._1)).reduceByKeyAndWindow(_+_,Seconds(60))
               .map{case(topic,count) => (count,topic)}
               .transform(_.sortByKey(false))


               val topCounts10 = hashTags.map((_._1)).reduceByKeyAndWindow(_+_,Seconds(10))
               .map{case(topic,count) => (count,topic)}
               .transform(_.sortByKey(false))

               //Print Popular hastags
               
               topCounts60.foreachRDD(rdd=> {
               val topList = rdd.take(10)
               println("\n popular toics in last 60 Seconds (%s total):".format(rdd.count()))
      topList.foreach{case(count,tag)=>println("%s(%s tweets)".format(tag,count))}
               })


               topCounts10.foreachRDD(rdd=> {
               val topList = rdd.take(10)
               println("\n popular toics in last 10 Seconds (%s total):".format(rdd.count()))
      topList.foreach{case(count,tag)=>println("%s(%s tweets)".format(tag,count))}
               })

               ssc.start()
               ssc.awaitTermination()

            }
}

No comments:

Post a Comment