Wednesday 30 May 2018

Spark Streaming with Kafka in Scala

package spark
import org.apahe.spark.SparkConf
import _root_.kafka serializer.DefaultDecoder
import _root_.kafka.serializer.StringDecode
import org.apache.Spark.streaming.kafka.kafkaUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._

object wordcount{
def main(args:Array[String]){

valsparkconf = new Sparkconf().setAppName("kafkawordcount").setMaster("local[2]")
val ssc=new StreamingContext(SparkConf,Seconds(20))


val kafka = Map(
"Zookkeeper.Connect"->"localhost:2181",
"group.id"->"test-consumer-group",
"zookeeper.connection.timeout.ms"->"5000")


val lines = kafkaUtils.createStream[Array[Byte],String,DefaultDecouder,StringDecouder](ssc,kafkaConf,Map("testtopic"->1)), //Subscribe to topic & Partition 1
StorageLevel.Memory_ONLY)

val words = lines.flatMap{Case(x,y)=>y.split (" ")}
words.print()
ssc.start( )
}
}

Tuesday 22 May 2018

word count program spark streaming


package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{SecondsStreamingContext}

/**
 * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
 *
 * Usage: NetworkWordCount <hostname> <port>
 * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
 *
 * To run this on your local machine, you need to first run a Netcat server
 *    `$ nc -lk 9999`
 * and then run the example
 *    `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
 */
object NetworkWordCount {
  def main(argsArray[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = newSparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}



or

import org.apahe.spark.SparkContext
import org.apache.spark.SparkContext._
importorg.apache.spark.SparkConf
imort org.apache.spark.Straming._
imort org.apche.spark.SparkConf
import org.apche.spark.storage.StorageLevel1

Object NetworkSparkStream{
     def main(args:Array[String]){
            val conf = new SparkConf().setMaster("local[2]").
            setAppName("My first Streaming App")
val ssc = new StreamingContext(Conf,Seconds(10))
val lines = ssc.socketTextstream("192.168.56.101",999,storageLevel.MEMORY_AND_DISK_SER_2)
textfile.flatMap(line=>Line.split(""))
.map(word=>(word,1))
.reduceBykey(_+_).print()
ssc.start()
ssc.awaitTermination()
            }
}

TwitterPopularTag Kafka sparkstreaming

import org.apache.spark.streaming.{Seconds,Streaming Context}
import org.apche.spark.SparkContect._
import org.apache.spark.Streaming.twitter._
import org.apache.spark.SparkConf

object TwitterPopularTags{
            def main(args:Array[String])
            {
            if(args.length<4)
            {
            System.err.println("Usage:TwitterPopularTags<Consumer Key><Consumer secret>"+".      <acess token><access token secret>[<filters>]")
            System.exit(1)

            }
            StreaningExamples.setStreamingStreamingLogLevels()

            val Array(consumerkey,consumerSecret,acessToken,acessTokenSecrete) = args.take(4)
            val filters = args.takeRight(args.length-4)
            System.setProperty("twitter4j.oauth.consumerKey",consumerKey)
            System.setProperty("twitter4j.oauth.consumerSecret",consumerSecret)
            System.setProperty("twitter4j.oauth.acessToken",acessToken)
            System.setProperty("twitter4j.oauth.accessTokenSecret",acessTokenSecret)

            val sparkConf = new sparkConf().setAppName("TwitterPopularTags").setMaster("local[2]")
            val ssc = new StreamingContext(sparkConf,Seconds(2))
            val stram = TwitterUtils.createStream(ssc,None,filters)

            val hashTags = stream.flatMap(status => status.getText.split("").filter(_.startsWith("#")))

            val topCounts60 = hashTags.map((_._1)).reduceByKeyAndWindow(_+_,Seconds(60))
               .map{case(topic,count) => (count,topic)}
               .transform(_.sortByKey(false))


               val topCounts10 = hashTags.map((_._1)).reduceByKeyAndWindow(_+_,Seconds(10))
               .map{case(topic,count) => (count,topic)}
               .transform(_.sortByKey(false))

               //Print Popular hastags
               
               topCounts60.foreachRDD(rdd=> {
               val topList = rdd.take(10)
               println("\n popular toics in last 60 Seconds (%s total):".format(rdd.count()))
      topList.foreach{case(count,tag)=>println("%s(%s tweets)".format(tag,count))}
               })


               topCounts10.foreachRDD(rdd=> {
               val topList = rdd.take(10)
               println("\n popular toics in last 10 Seconds (%s total):".format(rdd.count()))
      topList.foreach{case(count,tag)=>println("%s(%s tweets)".format(tag,count))}
               })

               ssc.start()
               ssc.awaitTermination()

            }
}