import org.apache.spark.streaming.{Seconds,Streaming Context}
import org.apche.spark.SparkContect._
import org.apache.spark.Streaming.twitter._
import org.apache.spark.SparkConf
object TwitterPopularTags{
def main(args:Array[String])
{
if(args.length<4)
{
System.err.println("Usage:TwitterPopularTags<Consumer Key><Consumer secret>"+". <acess token><access token secret>[<filters>]")
System.exit(1)
}
StreaningExamples.setStreamingStreamingLogLevels()
val Array(consumerkey,consumerSecret,acessToken,acessTokenSecrete) = args.take(4)
val filters = args.takeRight(args.length-4)
System.setProperty("twitter4j.oauth.consumerKey",consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret",consumerSecret)
System.setProperty("twitter4j.oauth.acessToken",acessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret",acessTokenSecret)
val sparkConf = new sparkConf().setAppName("TwitterPopularTags").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(2))
val stram = TwitterUtils.createStream(ssc,None,filters)
val hashTags = stream.flatMap(status => status.getText.split("").filter(_.startsWith("#")))
val topCounts60 = hashTags.map((_._1)).reduceByKeyAndWindow(_+_,Seconds(60))
.map{case(topic,count) => (count,topic)}
.transform(_.sortByKey(false))
val topCounts10 = hashTags.map((_._1)).reduceByKeyAndWindow(_+_,Seconds(10))
.map{case(topic,count) => (count,topic)}
.transform(_.sortByKey(false))
//Print Popular hastags
topCounts60.foreachRDD(rdd=> {
val topList = rdd.take(10)
println("\n popular toics in last 60 Seconds (%s total):".format(rdd.count()))
topList.foreach{case(count,tag)=>println("%s(%s tweets)".format(tag,count))}
})
topCounts10.foreachRDD(rdd=> {
val topList = rdd.take(10)
println("\n popular toics in last 10 Seconds (%s total):".format(rdd.count()))
topList.foreach{case(count,tag)=>println("%s(%s tweets)".format(tag,count))}
})
ssc.start()
ssc.awaitTermination()
}
}
No comments:
Post a Comment