Tuesday 27 August 2019

kafka Spark steaming

import org.apache.log4j.{Level,Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.kafkautils
import org.apache.spark.streaming.{Seconds,StreamingContext}

Object kafkar {
Def main (args:Array[String]) {
Logger.getLogger(“org”).setLevel(Level.OFF)
Logger.getLogger(“akka”).setLevel(Level.OFF)

          println(“program started”)
 
val cons = new SparkConf().setMaster(“local[4]”).setAppName(“kafkar”)
        val sac = new streamingContext(conf,Seconds(2))

// my kafka topic name is ‘test’

        val kafkaStream = kafkaUtils.createStream(sac,”localhost:2181”,”spark-streaming-consumer-group”,Map(“mutest”->5))

kafkastream.print()
Sac.start
Sac.awaitTermination()

}


No comments:

Post a Comment