package spark
import org.apahe.spark.SparkConf
import _root_.kafka serializer.DefaultDecoder
import _root_.kafka.serializer.StringDecode
import org.apache.Spark.streaming.kafka.kafkaUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
object wordcount{
def main(args:Array[String]){
valsparkconf = new Sparkconf().setAppName("kafkawordcount").setMaster("local[2]")
val ssc=new StreamingContext(SparkConf,Seconds(20))
val kafka = Map(
"Zookkeeper.Connect"->"localhost:2181",
"group.id"->"test-consumer-group",
"zookeeper.connection.timeout.ms"->"5000")
val lines = kafkaUtils.createStream[Array[Byte],String,DefaultDecouder,StringDecouder](ssc,kafkaConf,Map("testtopic"->1)), //Subscribe to topic & Partition 1
StorageLevel.Memory_ONLY)
val words = lines.flatMap{Case(x,y)=>y.split (" ")}
words.print()
ssc.start( )
}
}
import org.apahe.spark.SparkConf
import _root_.kafka serializer.DefaultDecoder
import _root_.kafka.serializer.StringDecode
import org.apache.Spark.streaming.kafka.kafkaUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
object wordcount{
def main(args:Array[String]){
valsparkconf = new Sparkconf().setAppName("kafkawordcount").setMaster("local[2]")
val ssc=new StreamingContext(SparkConf,Seconds(20))
val kafka = Map(
"Zookkeeper.Connect"->"localhost:2181",
"group.id"->"test-consumer-group",
"zookeeper.connection.timeout.ms"->"5000")
val lines = kafkaUtils.createStream[Array[Byte],String,DefaultDecouder,StringDecouder](ssc,kafkaConf,Map("testtopic"->1)), //Subscribe to topic & Partition 1
StorageLevel.Memory_ONLY)
val words = lines.flatMap{Case(x,y)=>y.split (" ")}
words.print()
ssc.start( )
}
}