import org.apache.log4j.{Level,Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.kafkautils
import org.apache.spark.streaming.{Seconds,StreamingContext}
Object kafkar {
Def main (args:Array[String]) {
Logger.getLogger(“org”).setLevel(Level.OFF)
Logger.getLogger(“akka”).setLevel(Level.OFF)
println(“program started”)
val cons = new SparkConf().setMaster(“local[4]”).setAppName(“kafkar”)
val sac = new streamingContext(conf,Seconds(2))
// my kafka topic name is ‘test’
val kafkaStream = kafkaUtils.createStream(sac,”localhost:2181”,”spark-streaming-consumer-group”,Map(“mutest”->5))
kafkastream.print()
Sac.start
Sac.awaitTermination()
}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.kafkautils
import org.apache.spark.streaming.{Seconds,StreamingContext}
Object kafkar {
Def main (args:Array[String]) {
Logger.getLogger(“org”).setLevel(Level.OFF)
Logger.getLogger(“akka”).setLevel(Level.OFF)
println(“program started”)
val cons = new SparkConf().setMaster(“local[4]”).setAppName(“kafkar”)
val sac = new streamingContext(conf,Seconds(2))
// my kafka topic name is ‘test’
val kafkaStream = kafkaUtils.createStream(sac,”localhost:2181”,”spark-streaming-consumer-group”,Map(“mutest”->5))
kafkastream.print()
Sac.start
Sac.awaitTermination()
}
No comments:
Post a Comment