Tuesday 22 May 2018

word count program spark streaming


package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{SecondsStreamingContext}

/**
 * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
 *
 * Usage: NetworkWordCount <hostname> <port>
 * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
 *
 * To run this on your local machine, you need to first run a Netcat server
 *    `$ nc -lk 9999`
 * and then run the example
 *    `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
 */
object NetworkWordCount {
  def main(argsArray[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = newSparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}



or

import org.apahe.spark.SparkContext
import org.apache.spark.SparkContext._
importorg.apache.spark.SparkConf
imort org.apache.spark.Straming._
imort org.apche.spark.SparkConf
import org.apche.spark.storage.StorageLevel1

Object NetworkSparkStream{
     def main(args:Array[String]){
            val conf = new SparkConf().setMaster("local[2]").
            setAppName("My first Streaming App")
val ssc = new StreamingContext(Conf,Seconds(10))
val lines = ssc.socketTextstream("192.168.56.101",999,storageLevel.MEMORY_AND_DISK_SER_2)
textfile.flatMap(line=>Line.split(""))
.map(word=>(word,1))
.reduceBykey(_+_).print()
ssc.start()
ssc.awaitTermination()
            }
}

No comments:

Post a Comment