Friday, 30 June 2017

MongoDB -Casbah

Casbah - Scala toolkit for MongoDB

 Casbah is a Scala toolkit for MongoDB  and it  integrates a layer on top of the official mongo-java-driver for 
better integration with Scala. 

The recommended way to get started is with a dependency management system.  
 ---------------------------------------------

You could get the source from :
https://github.com/alvinj/ScalaCasbahConnections

Then you could modify your
-----------------
manju@manju-VirtualBox:~/
mongoConnector/ScalaCasbahConnections$ cat build.sbt
organization := "com.alvinalexander"

name := "ScalatraCasbahMongo"

version := "0.1.0-SNAPSHOT"

scalaVersion := "2.11.8"

libraryDependencies += "org.mongodb" %% "casbah" % "3.1.1"

libraryDependencies += "com.mongodb.casbah" % "casbah-gridfs_2.8.1" % "2.1.5-1"

libraryDependencies += "org.slf4j" % "slf4j-log4j12" % "1.7.24"

resolvers += "Sonatype OSS Snapshots" at "http://oss.sonatype.org/content/repositories/snapshots/"
spb@spb-VirtualBox:~/mongoConnector/ScalaCasbahConnections$

manju@manju-VirtualBox:~/
mongoConnector/ScalaCasbahConnections$ sbt run
[info] Loading project definition from /home/spb/mongoConnector/ScalaCasbahConnections/project
[info] Set current project to ScalatraCasbahMongo (in build file:/home/spb/mongoConnector/Scala
CasbahConnections/)
[info] Compiling 1 Scala source to /home/spb/mongoConnector/ScalaCasbahConnections/target/
scala-2.11/classes...
[warn] there was one deprecation warning; re-run with -deprecation for details
[warn] one warning found
[info] Running casbahtests.MainDriver
debug: a
log4j:WARN No appenders could be found for logger (com.mongodb.casbah.commons.conversions.
scala.RegisterConversionHelpers$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
debug: b
debug: c
debug: d
debug: e
debug: f
debug: g
debug: h
debug: i
debug: j
debug: k
debug: l
debug: m
debug: n
debug: o
debug: p
debug: q
debug: r
debug: s
debug: t
debug: u
debug: v
debug: w
debug: x
debug: y
debug: z
sleeping at the end
  sleeping: 1
  sleeping: 2
  sleeping: 3
  sleeping: 4
  sleeping: 5
  sleeping: 6
  sleeping: 7
  sleeping: 8
  sleeping: 9
  sleeping: 10
  sleeping: 11
  sleeping: 12
  sleeping: 13
  sleeping: 14
  sleeping: 15
  sleeping: 16
  sleeping: 17
  sleeping: 18
  sleeping: 19
  sleeping: 20
  sleeping: 21
  sleeping: 22
  sleeping: 23
  sleeping: 24
  sleeping: 25
  sleeping: 26
  sleeping: 27
  sleeping: 28
  sleeping: 29
  sleeping: 30
game over
[success] Total time: 62 s, completed 13 Mar, 2017 5:37:31 PM
spb@spb-VirtualBox:~/mongoConnector/ScalaCasbahConnections$
spb@spb-VirtualBox:~/mongoConnector/ScalaCasbahConnections$ sbt package
[info] Loading project definition from /home/spb/mongoConnector/ScalaCasbahConnections/project
[info] Set current project to ScalatraCasbahMongo (in build file:/home/spb/mongoConnector/Scala
CasbahConnections/)
[info] Packaging /home/spb/mongoConnector/ScalaCasbahConnections/target/scala-2.11/scala
tracasbahmongo_2.11-0.1.0-SNAPSHOT.jar ...
[info] Done packaging.
[success] Total time: 1 s, completed 13 Mar, 2017 5:54:42 PM
spb@spb-VirtualBox:~/mongoConnector/ScalaCasbahConnections$
------------------------------------------------------------


spb@spb-VirtualBox:~/Scala_project$ mongo
MongoDB shell version: 3.2.12
connecting to: test
Server has startup warnings:
> show dbs
local  0.000GB
mydb   0.000GB
> show dbs
finance  0.000GB
local    0.000GB
mydb     0.000GB
> show collections
> use finance
switched to db finance
> show collections
stocks
> db.stocks.find()
{ "_id" : ObjectId("58cd184edffa1f1829bfbc94"), "name" : "a", "symbol" : "a" }
{ "_id" : ObjectId("58cd184fdffa1f1829bfbc95"), "name" : "b", "symbol" : "b" }
{ "_id" : ObjectId("58cd1850dffa1f1829bfbc96"), "name" : "c", "symbol" : "c" }
{ "_id" : ObjectId("58cd1851dffa1f1829bfbc97"), "name" : "d", "symbol" : "d" }
{ "_id" : ObjectId("58cd1852dffa1f1829bfbc98"), "name" : "e", "symbol" : "e" }
{ "_id" : ObjectId("58cd1853dffa1f1829bfbc99"), "name" : "f", "symbol" : "f" }
{ "_id" : ObjectId("58cd1854dffa1f1829bfbc9a"), "name" : "g", "symbol" : "g" }
{ "_id" : ObjectId("58cd1855dffa1f1829bfbc9b"), "name" : "h", "symbol" : "h" }
{ "_id" : ObjectId("58cd1856dffa1f1829bfbc9c"), "name" : "i", "symbol" : "i" }
{ "_id" : ObjectId("58cd1857dffa1f1829bfbc9d"), "name" : "j", "symbol" : "j" }
{ "_id" : ObjectId("58cd1858dffa1f1829bfbc9e"), "name" : "k", "symbol" : "k" }
{ "_id" : ObjectId("58cd1859dffa1f1829bfbc9f"), "name" : "l", "symbol" : "l" }
{ "_id" : ObjectId("58cd185adffa1f1829bfbca0"), "name" : "m", "symbol" : "m" }
{ "_id" : ObjectId("58cd185bdffa1f1829bfbca1"), "name" : "n", "symbol" : "n" }
{ "_id" : ObjectId("58cd185cdffa1f1829bfbca2"), "name" : "o", "symbol" : "o" }
{ "_id" : ObjectId("58cd185ddffa1f1829bfbca3"), "name" : "p", "symbol" : "p" }
{ "_id" : ObjectId("58cd185edffa1f1829bfbca4"), "name" : "q", "symbol" : "q" }
{ "_id" : ObjectId("58cd185fdffa1f1829bfbca5"), "name" : "r", "symbol" : "r" }
{ "_id" : ObjectId("58cd1860dffa1f1829bfbca6"), "name" : "s", "symbol" : "s" }
{ "_id" : ObjectId("58cd1861dffa1f1829bfbca7"), "name" : "t", "symbol" : "t" }
Type "it" for more
>
-------------------------

----------------

There are two ways of getting the data from MongoDB to Apache Spark.
Method 1: Using Casbah (Layer on MongDB Java Driver)
val uriRemote = MongoClientURI("mongodb://RemoteURL:27017/")
val mongoClientRemote =  MongoClient(uriRemote)
val dbRemote = mongoClientRemote("dbName")
val collectionRemote = dbRemote("collectionName")
val ipMongo = collectionRemote.find
val ipRDD = sc.makeRDD(ipMongo.toList)
ipRDD.saveAsTextFile("hdfs://path/to/hdfs")

Method 2: Spark Worker at our use
Better version of code: Using Spark worker and multiple core to use to get the data in short time.
val config = new Configuration()
config.set("mongo.job.input.format","com.mongodb.hadoop.MongoInputFormat")
config.set("mongo.input.uri", "mongodb://RemoteURL:27017/dbName.collection
Name")
val keyClassName = classOf[Object]
val valueClassName = classOf[BSONObject]
val inputFormatClassName = classOf[com.mongodb.hadoop.MongoInputFormat]
val ipRDD = sc.newAPIHadoopRDD(config,inputFormatClassName,keyClassName,
valueClassName)
ipRDD.saveAsTextFile("hdfs://path/to/hdfs")

---------------------------------------------------------------
Reference:

https://web.archive.org/web/20120402085626/http://api.mongodb.org/scala/casbah/current
/setting_up.html#setting-up-sbt


Sample Spark Application -wordcount with Simple Build Tool (SBT)


Developing and Running a Spark WordCount Application written in Scala :

Apache Spark runs on Hadoop, Mesos, standalone, or in the cloud. It can access diverse data sources including HDFS, Cassandra, HBase, and S3. You can run Spark using its standalone cluster mode, on EC2, on Hadoop YARN, or on Apache Mesos. Access data in HDFS, Cassandra, HBase, Hive, Tachyon, and any Hadoop data source.

WordCount example reads text files and counts how often words occur. The input is text files and the output is text files, each line of which contains a word and the count of how often it occured, separated by a space (" ").

Simple Build Tool (SBT) is an open source build tool for Scala and Java projects, similar to Java's Maven or Ant. Its main features are: native support for compiling Scala code and integrating with many Scala frameworks. sbt is the de facto build tool in the Scala community.

This tutorial describes how to write, compile and run a simple Spark word count application in scala language supported by Spark. 

   
                           
Pr-requisites: You could download and install listed packages:
1) sbt from: http://www.scala-sbt.org/    or  git clone https://github.com/sbt/sbt.git
2) scala from https://www.scala-lang.org/download/all.html
3) Apache spark from  http://spark.apache.org/downloads.html
 ---------------------------------------------------------------------------------

Step 1:  create the folder scala_project :
               mkdir Scala_wc_project
Step 2:   cd  scala_wc_project

Step 3 :  mkdir -p project src/{main,test}/{scala,java,resources} 

manju@manju-VirtualBox:~/Scala_wc_project$ ls
project  src
Step 4 : create wordcount.scala program at src/main/scala

manju@manju-VirtualBox:~/Scala_wc_project/src/main/scala$ vi wordcount.scala
-------------------------------------------------

 import org.apache.spark._
 import org.apache.spark.SparkContext._


 object WordCount {
    def main(args: Array[String]) {
      val inputFile = args(0)
      val outputFile = args(1)
      val conf = new SparkConf().setAppName("
wordCount")
      // Create a Scala Spark Context.
      val sc = new SparkContext(conf)
      // Load our input data.
      val input =  sc.textFile(inputFile)
      // Split up into words.
      val words = input.flatMap(line => line.split(" "))
      // Transform into word and count.
      val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}
      // Save the word count back out to a text file, causing evaluation.
      counts.saveAsTextFile(
outputFile)
    }
---------------------------------------------------------------------------
where

Step 5 : create  build.sbt at root of project directory
manju@manju-VirtualBox:~/Scala_wc_
project$ cat build.sbt
name := "manju_Word Count"

version := "1.0"

scalaVersion := "2.11.8"

sbtVersion := "0.13.13"

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "2.1.0"
------------------------------------------------------------------------------------- 
 Step 6 :  Execute sbt commands : clean, update,compile, package
This will create the  jar file (package) at target directory
manju@manju-VirtualBox:~/Scala_wc_project$ sbt clean
[info] Set current project to manju_Word Count (in build file:/home/spb/Scala_wc_project/)
[success] Total time: 1 s, completed 5 Mar, 2017 7:29:04 PM
manju@manju-VirtualBox:~/Scala_wc_project$ sbt update
[info] Set current project to manju_Word Count (in build file:/home/spb/Scala_wc_project/)
[info] Updating {file:/home/manju/Scala_wc_project/}scala_wc_project...
[info] Resolving jline#jline;2.12.1 ...
[info] Done updating.
[success] Total time: 18 s, completed 29 Jun , 2017 7:29:33 PM
manju@manju-VirtualBox:~/Scala_wc_project$ sbt compile
[info] Set current project to SPB_Word Count (in build file:/home/spb/Scala_wc_prject/)
[info] Compiling 1 Scala source to /home/manju/Scala_wc_project/target/scala-2.11/classes...
[success] Total time: 10 s, completed 29 Jun, 2017 7:29:54 PM
manju@manju-VirtualBox:~/Scala_wc_project$
manju@manju-VirtualBox:~/Scala_wc_project$ sbt package
[info] Set current project to SPB_Word Count (in build file:/home/manju/Scala_wc_project/)
[info] Packaging /home/manju/Scala_wc_project/target/scala-2.11/manju_word-count_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 2 s, completed 29 Jun, 2017 7:30:47 PM
manju@manju-VirtualBox:~/Scala_wc_project$ 

manju@manju-VirtualBox:~/Scala_wc_project/target/scala-2.11$ ls
classes  manju_word-count_2.11-1.0.jar
manju@manju-VirtualBox:~/Scala_wc_project/target/scala-2.11$ 
------------------------------------------------------------------------------------------
Step 7: Project directory structure :

Step 8 :  Run  "spb_word-count_2.11-1.0.jar" file  on spark framework  as shown here :
 
spark-submit --class WordCount --master local spb_word-count_2.11-1.0.jar /home/spb/data/input.txt /home/spb/data/output8

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/06/29 19:36:32 INFO SparkContext: Running Spark version 2.1.0
17/06/29 19:36:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable17/06/29  19:36:34 WARN Utils: Your hostname, spb-VirtualBox resolves to a loopback address: 127.0.1.1; using 192.168.1.123 instead (on interface enp0s3)17/06/29  19:36:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address17/06/29  19:36:34 INFO SecurityManager: Changing view acls to: spb17/06/29  19:36:34 INFO SecurityManager: Changing modify acls to: spb17/06/29 19:36:34 INFO SecurityManager: Changing view acls groups to: 17/06/29  19:36:34 INFO SecurityManager: Changing modify acls groups to: 17/06/29  19:36:34 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(spb); groups with view permissions: Set(); users  with modify permissions: Set(spb); groups with modify permissions: Set()17/06/29  19:36:34 INFO Utils: Successfully started service 'sparkDriver' on port 39733.17/06/29 19:36:34 INFO SparkEnv: Registering MapOutputTracker17/06/29  19:36:34 INFO SparkEnv: Registering BlockManagerMaster17/06/29  19:36:34 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information17/06/29 19:36:34 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up17/06/29  19:36:35 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-8b32665a-9d41-4c8d-bee4-42b6d5891c1517/06/29 19:36:35 INFO MemoryStore: MemoryStore started with capacity 366.3 MB17/06/29  19:36:35 INFO SparkEnv: Registering OutputCommitCoordinator17/06/29 19:36:35 INFO Utils: Successfully started service 'SparkUI' on port 4040.17/06/29  19:36:35 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.123:404017/06/29  19:36:36 INFO SparkContext: Added JAR file:/home/spb/Scala_wc_project/target/scala-2.11/spb_word-count_2.11-1.0.jar at spark://192.168.1.123:39733/jars/spb_word-count_2.11-1.0.jar with timestamp 148872279608017/06/29  19:36:36 INFO Executor: Starting executor ID driver on host localhost17/06/29  19:36:36 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37957.17/06/29  19:36:36 INFO NettyBlockTransferService: Server created on 192.168.1.123:3795717/06/29  19:36:36 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy17/06/29  19:36:36 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.123, 37957, None)17/06/29  19:36:36 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.123:37957 with 366.3 MB RAM, BlockManagerId(driver, 192.168.1.123, 37957, None)17/06/29 19:36:36 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.123, 37957, None)17/06/29  19:36:36 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.123, 37957, None)17/06/29  19:36:37 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 236.5 KB, free 366.1 MB)17/06/29  19:36:38 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 22.9 KB, free 366.0 MB)17/06/29  19:36:38 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.123:37957 (size: 22.9 KB, free: 366.3 MB)17/06/29 19:36:38 INFO SparkContext: Created broadcast 0 from textFile at wordcount.scala:1217/06/29  19:36:38 INFO FileInputFormat: Total input paths to process : 117/06/29  19:36:38 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id17/06/29  19:36:38 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id17/06/29  19:36:38 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap17/06/29  19:36:38 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition17/06/29  19:36:38 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id17/06/29  19:36:38 INFO FileOutputCommitter: File Output Committer Algorithm version is 117/06/29 19:36:38 INFO SparkContext: Starting job: saveAsTextFile at wordcount.scala:1817/06/29  19:36:39 INFO DAGScheduler: Registering RDD 3 (map at wordcount.scala:16)17/06/29  19:36:39 INFO DAGScheduler: Got job 0 (saveAsTextFile at wordcount.scala:18) with 2 output partitions17/06/29  19:36:39 INFO DAGScheduler: Final stage: ResultStage 1 (saveAsTextFile at wordcount.scala:18)17/06/29  19:36:39 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)17/06/29   19:36:39 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)17/06/29 19:36:39 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at wordcount.scala:16), which has no missing parents17/06/29  19:36:39 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.7 KB, free 366.0 MB)17/06/29  19:36:39 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.7 KB, free 366.0 MB)17/06/29  19:36:39 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.123:37957 (size: 2.7 KB, free: 366.3 MB)17/06/29  19:36:39 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:99617/06/29  19:36:39 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at wordcount.scala:16)17/06/29  19:36:39 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks17/06/29 19:36:39 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 6043 bytes)
17/03/05 19:36:39 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 6043 bytes)
17/06/29 19:36:39 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)17/06/29  19:36:39 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)17/06/29  19:36:39 INFO Executor: Fetching spark://192.168.1.123:39733/jars/spb_word-count_2.11-1.0.jar with timestamp 148872279608017/06/29 19:36:40 INFO TransportClientFactory: Successfully created connection to /192.168.1.123:39733 after 110 ms (0 ms spent in bootstraps)
17/03/05 19:36:40 INFO Utils: Fetching spark://192.168.1.123:39733/jars/spb_word-count_2.11-1.0.jar to /tmp/spark-58cba0fb-b20f-46f4-
bb17-3201f9dec45b/userFiles-69c444db-7e90-48cb-805b-12225146686f/fetchFileTemp6212103198181174966.tmp
17/03/05 19:36:40 INFO Executor: Adding file:/tmp/spark-58cba0fb-b20f-
46f4-bb17-3201f9dec45b/userFiles-69c444db-7e90-48cb-805b-12225146686f/spb_word-count_2.11-1.0.jar to class loader
17/03/05 19:36:40 INFO HadoopRDD: Input split: file:/home/spb/data/input.txt:
0+3917/06/29  19:36:40 INFO HadoopRDD: Input split: file:/home/spb/data/input.txt:39+4017/06/29  19:36:41 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1819 bytes result sent to driver17/06/29  19:36:41 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1819 bytes result sent to driver17/06/29  19:36:41 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1221 ms on localhost (executor driver) (1/2)17/06/29 19:36:41 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 1201 ms on localhost (executor driver) (2/2)17/06/29  19:36:41 INFO DAGScheduler: ShuffleMapStage 0 (map at wordcount.scala:16) finished in 1.355 s17/06/29  19:36:41 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 17/06/29  19:36:41 INFO DAGScheduler: looking for newly runnable stages17/06/29  19:36:41 INFO DAGScheduler: running: Set()17/06/29  19:36:41 INFO DAGScheduler: waiting: Set(ResultStage 1)17/06/29 19:36:41 INFO DAGScheduler: failed: Set()17/06/29 19:36:41 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at wordcount.scala:18), which has no missing parents17/06/29  19:36:41 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 72.8 KB, free 366.0 MB)17/06/29  19:36:41 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 26.4 KB, free 365.9 MB)17/06/29  19:36:41 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.123:37957 (size: 26.4 KB, free: 366.2 MB)17/06/29  19:36:41 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:99617/06/29  19:36:41 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at wordcount.scala:18)17/06/29 19:36:41 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks17/06/29  19:36:41 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, executor driver, partition 0, ANY, 5827 bytes)17/06/29  19:36:41 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, executor driver, partition 1, ANY, 5827 bytes)17/06/29  19:36:41 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)17/06/29  19:36:41 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)17/06/29  19:36:41 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks17/06/29 19:36:41 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks17/06/29  19:36:41 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 22 ms17/06/29  19:36:41 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 38 ms17/06/29  19:36:41 INFO FileOutputCommitter: File Output Committer Algorithm version is 117/06/29  19:36:41 INFO FileOutputCommitter: File Output Committer Algorithm version is 117/06/29  19:36:41 INFO FileOutputCommitter: Saved output of task 'attempt_20170305193638_0001_m_000000_2' to file:/home/spb/data/output8/_temporary/0/task_20170305193638_0001_m_00000017/06/29  19:36:41 INFO SparkHadoopMapRedUtil: attempt_20170305193638_0001_m_000000_2: Committed17/06/29 19:36:41 INFO FileOutputCommitter: Saved output of task 'attempt_20170305193638_0001_m_000001_3' to file:/home/spb/data/output8/_temporary/0/task_20170305193638_0001_m_000001
17/03/05 19:36:41 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1977 bytes result sent to driver
17/06/29  19:36:41 INFO SparkHadoopMapRedUtil: attempt_20170305193638_0001_m_000001_3: Committed17/06/29  19:36:41 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1890 bytes result sent to driver
1
17/06/29  19:36:41 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 519 ms on localhost (executor driver) (1/2)17/06/29  19:36:41 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 519 ms on localhost (executor driver) (2/2)17/06/29  19:36:41 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at wordcount.scala:18) finished in 0.532 s17/06/29  19:36:41 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 17/06/29 19:36:41 INFO DAGScheduler: Job 0 finished: saveAsTextFile at wordcount.scala:18, took 2.968627 s17/06/29  19:36:42 INFO SparkContext: Invoking stop() from shutdown hook17/06/29  19:36:42 INFO SparkUI: Stopped Spark web UI at http://192.168.1.123:404017/06/29  19:36:42 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!17/06/29  19:36:42 INFO MemoryStore: MemoryStore cleared17/06/29  19:36:42 INFO BlockManager: BlockManager stopped17/06/29  19:36:42 INFO BlockManagerMaster: BlockManagerMaster stopped17/06/29  19:36:42 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!17/06/29 19:36:42 INFO SparkContext: Successfully stopped SparkContext17/06/29  19:36:42 INFO ShutdownHookManager: Shutdown hook called17/06/29  19:36:42 INFO ShutdownHookManager: Deleting directory /tmp/spark-58cba0fb-b20f-46f4-bb17-3201f9dec45b

manju@manju-VirtualBox:~/Scala_wc_project/target/scala-2.11$
NOTE:
             --class: The entry point for your wordcount application 
             --master: The master URL for the cluster/local (e.g. spark://23.195.26.187:7077)
---------------------------------------------
 Step 9:  Verify  the wordcount output file  as mentioned in previous step.

manju@manju-virtualbox:~/scala_wc_project/target/scala-2.11$ cat /home/manju/data/input.txt

Hello world
Welcome to spark world
Bye world
manju@manju-virtualbox:~/scala_wc_project/target/scala-2.11$ cat /home/manju/data/output3/part-0000*
(Scala,1)
(Bye,1)
(Hello,1)
(Welcome,1)
(World,4)
(Spark,1)
(to,2)
(the,2)
manju@manju-virtualbox:~/scala_wc_project/target/scala-2.11$

HOW TO RUN SBT (Simple Build Tool)




1) Create directory
manju@ubuntu:$mkdir sparksample

2) Change the directory
manju@ubuntu:$cd sparksample/

3) Create one more directory
 manju@ubuntu:$mkdir src

4) In src create main(in same directory  or  same folder)
manju@ubuntu:$mkdir src/main

5) In src create on more directory
 manju@ubuntu:$mkdir src/main/scala

6) By using vi editor  create one sbt doc
   manju@ubuntu:$vi sparksample.sbt

7) After create .sbt file give sbt
   manju@ubuntu:$sbt
   manju@ubuntu:$compile
   manju@ubuntu:$package
   manju@ubuntu:$exit

8) Give ls command
 manju@ubuntu:$sparksample $ls


project sparksample.sbt src target word count.spark

  manju@ubuntu:$cd target/
  manju@ubuntu:l$s
  manju@ubuntu:$cd scala 2.11/
  manju@ubuntu:$ls
  manju@ubuntu:$ spark-submit -- class Helloword sparksample-2.11-1.0 jar



object name-hello word
jar name-helloword sparksample-2.11-1.0.jar








Monday, 19 June 2017

Spark on windows & Mac installation

Spark on Windows - Installation Steps
=====================================

Step 1: Download and  install JDK

http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

Note:

1. Download the appropriate version based on the Operating System installed on your machine
2. Install the JDK, by following the wizard

Step 2: Download Spark

http://spark.apache.org/downloads.html

Choose the following options for the download
1. Choose a Spark release
2. Pre-built for Hadoop 2.4 or later
3. Direct Download
4. Click on the spark-1.4.1-bin-hadoop2.4.tgz

Note:

This is a tar file, to unzip this file, you would need a utility like winrar.

Download and install winrar

http://www.rarlab.com/download.htm

Step 3: Extract the tar file

Step 4: Copy the contents of the tar files into

C:\spark\ folder

Step 5: Update the log4j.properties to set the messages to WARN

C:\spark\conf\log4j.properties.template

Set the property - log4j.rootCategory=WARN

Save the file as log4j.properties

Step 6: Download winutils.exe from the course resources folder

1. Create a folder C:\winutils\bin
2. Copy the winutils.exe file into C:\bin\winutils.exe

Step 7: Set the environment variables (Inform Windows where is Spark)


SPARK_HOME =  C:\spark
JAVA_HOME = C:\Program Files\Java\jdk1.8.0_71
HADOOP_HOME = C:\winutils
PATH = %SPARK_HOME%\bin;%JAVA_HOME%\bin (append to the existing path)

In Windows
==========

1) Right click on Start Menu
2) Click on Control Panel
3) Click on System and Security
4) Click on System
5) Click on Advanced System Settings
6) Click on Environment Variable button
1. Add New Environment Variable
2. Variable Name: SPARK_HOME, Variable Value: C:\spark, Save the variable
3. Variable Name: JAVA_HOME, Variable Value: C:\Program Files\Java\jdk1.8.0_71
4. Variable Name: PATH, Variable Value (append to the end of the string) %SPARK_HOME%\bin;%JAVA_HOME%\bin
5. Variable Name: HADOOP_HOME, Variable Value: C:\winutils

In Mac
======

http://hathaway.cc/post/69201163472/how-to-edit-your-path-environment-variables-on-mac
http://osxdaily.com/2014/08/14/add-new-path-to-path-command-line/


Step 8: Open a terminal and start the spark shell

1. Command Prompt App
2. cd C:\spark\bin
3. spark-shell --master "local [4]" (or)
        4. spark-shell