object LearningScala2 {
// Flow control
// If / else syntax
if (1 > 3) println("Impossible!") else println("The world makes sense.")
//> The world makes sense.
if (1 > 3) {
println("Impossible!")
} else {
println("The world makes sense.")
} //> The world makes sense.
// Matching - like switch in other languages:
val number = 3 //> number : Int = 3
number match {
case 1 => println("One")
case 2 => println("Two")
case 3 => println("Three")
case _ => println("Something else")
} //> Three
// For loops
for (x <- 1 to 4) {
val squared = x * x
println(squared)
} //> 1
//| 4
//| 9
//| 16
// While loops
var x = 10 //> x : Int = 10
while (x >= 0) {
println(x)
x -= 1
} //> 10
//| 9
//| 8
//| 7
//| 6
//| 5
//| 4
//| 3
//| 2
//| 1
//| 0
x = 0
do { println(x); x+=1 } while (x <= 10) //> 0
//| 1
//| 2
//| 3
//| 4
//| 5
//| 6
//| 7
//| 8
//| 9
//| 10
// Expressions
// "Returns" the final value in a block automatically
{val x = 10; x + 20} //> res0: Int = 30
println({val x = 10; x + 20}) //> 30
// EXERCISE
// Write some code that prints out the first 10 values of the Fibonacci sequence.
// This is the sequence where every number is the sum of the two numbers before it.
// So, the result should be 0, 1, 1, 2, 3, 5, 8, 13, 21, 34
}
object LearningScala3 {
// Functions
// Format is def <function name>(parameter name: type...) : return type = { expression }
// Don't forget the = before the expression!
def squareIt(x: Int) : Int = {
x * x
} //> squareIt: (x: Int)Int
def cubeIt(x: Int): Int = {x * x * x} //> cubeIt: (x: Int)Int
println(squareIt(2)) //> 4
println(cubeIt(2)) //> 8
// Functions can take other functions as parameters
def transformInt(x: Int, f: Int => Int) : Int = {
f(x)
} //> transformInt: (x: Int, f: Int => Int)Int
val result = transformInt(2, cubeIt) //> result : Int = 8
println (result) //> 8
// "Lambda functions", "anonymous functions", "function literals"
// You can declare functions inline without even giving them a name
// This happens a lot in Spark.
transformInt(3, x => x * x * x) //> res0: Int = 27
transformInt(10, x => x / 2)
transformInt(2, x => {val y = x * 2; y * y})
// This is really important!
// EXERCISEß
// Strings have a built-in .toUpperCase method. For example, "foo".toUpperCase gives you back FOO.
// Write a function that converts a string to upper-case, and use that function of a few test strings.
// Then, do the same thing using a function literal instead of a separate, named function.
}
object LearningScala4 {
// Data structures
// Tuples (Also really common with Spark!!)
// Immutable lists
// Often thought of as database fields, or columns.
// Useful for passing around entire rows of data.
val captainStuff = ("Picard", "Enterprise-D", "NCC-1701-D")
//> captainStuff : (String, String, String) = (Picard,Enterprise-D,NCC-1701-D)
//|
println(captainStuff) //> (Picard,Enterprise-D,NCC-1701-D)
// You refer to individual fields with their ONE-BASED index:
println(captainStuff._1) //> Picard
println(captainStuff._2) //> Enterprise-D
println(captainStuff._3) //> NCC-1701-D
// You can create a key/value pair with ->
val picardsShip = "Picard" -> "Enterprise-D" //> picardsShip : (String, String) = (Picard,Enterprise-D)
println(picardsShip._2) //> Enterprise-D
// You can mix different types in a tuple
val aBunchOfStuff = ("Kirk", 1964, true) //> aBunchOfStuff : (String, Int, Boolean) = (Kirk,1964,true)
// Lists
// Like a tuple, but it's an actual Collection object that has more functionality.
// Also, it cannot hold items of different types.
// It's a singly-linked list under the hood.
val shipList = List("Enterprise", "Defiant", "Voyager", "Deep Space Nine")
//> shipList : List[String] = List(Enterprise, Defiant, Voyager, Deep Space Nin
//| e)
// Access individual members using () with ZERO-BASED index (confused yet?)
println(shipList(1)) //> Defiant
// head and tail give you the first item, and the remaining ones.
println(shipList.head) //> Enterprise
println(shipList.tail) //> List(Defiant, Voyager, Deep Space Nine)
// Iterating though a list
for (ship <- shipList) {println(ship)} //> Enterprise
//| Defiant
//| Voyager
//| Deep Space Nine
// Let's apply a function literal to a list! map() can be used to apply any function to every item in a collection.
val backwardShips = shipList.map( (ship: String) => {ship.reverse})
//> backwardShips : List[String] = List(esirpretnE, tnaifeD, regayoV, eniN eca
//| pS peeD)
for (ship <- backwardShips) {println(ship)} //> esirpretnE
//| tnaifeD
//| regayoV
//| eniN ecapS peeD
// reduce() can be used to combine together all the items in a collection using some function.
val numberList = List(1, 2, 3, 4, 5) //> numberList : List[Int] = List(1, 2, 3, 4, 5)
val sum = numberList.reduce( (x: Int, y: Int) => x + y)
//> sum : Int = 15
println(sum) //> 15
// filter() can remove stuff you don't want. Here we'll introduce wildcard syntax while we're at it.
val iHateFives = numberList.filter( (x: Int) => x != 5)
//> iHateFives : List[Int] = List(1, 2, 3, 4)
val iHateThrees = numberList.filter(_ != 3) //> iHateThrees : List[Int] = List(1, 2, 4, 5)
// Note that Spark has its own map, reduce, and filter functions that can distribute these operations. But they work the same way!
// Also, you understand MapReduce now :)
// Concatenating lists
val moreNumbers = List(6, 7, 8) //> moreNumbers : List[Int] = List(6, 7, 8)
val lotsOfNumbers = numberList ++ moreNumbers //> lotsOfNumbers : List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8)
// More list fun
val reversed = numberList.reverse //> reversed : List[Int] = List(5, 4, 3, 2, 1)
val sorted = reversed.sorted //> sorted : List[Int] = List(1, 2, 3, 4, 5)
val lotsOfDuplicates = numberList ++ numberList //> lotsOfDuplicates : List[Int] = List(1, 2, 3, 4, 5, 1, 2, 3, 4, 5)
val distinctValues = lotsOfDuplicates.distinct //> distinctValues : List[Int] = List(1, 2, 3, 4, 5)
val maxValue = numberList.max //> maxValue : Int = 5
val total = numberList.sum //> total : Int = 15
val hasThree = iHateThrees.contains(3) //> hasThree : Boolean = false
// Maps
// Useful for key/value lookups on distinct keys
// Like dictionaries in other languages
val shipMap = Map("Kirk" -> "Enterprise", "Picard" -> "Enterprise-D", "Sisko" -> "Deep Space Nine", "Janeway" -> "Voyager")
//> shipMap : scala.collection.immutable.Map[String,String] = Map(Kirk -> Ente
//| rprise, Picard -> Enterprise-D, Sisko -> Deep Space Nine, Janeway -> Voyage
//| r)
println(shipMap("Janeway")) //> Voyager
// Dealing with missing keys
println(shipMap.contains("Archer")) //> false
val archersShip = util.Try(shipMap("Archer")) getOrElse "Unknown"
//> archersShip : String = Unknown
println(archersShip) //> Unknown
// EXERCISE
// Create a list of the numbers 1-20; your job is to print out numbers that are evenly divisible by three. (Scala's
// modula operator, like other languages, is %, which gives you the remainder after division. For example, 9 % 3 = 0
// because 9 is evenly divisible by 3.) Do this first by iterating through all the items in the list and testing each
// one as you go. Then, do it again by using a filter function on the list instead.
// That's enough for now!
// There is MUCH more to learn about Scala. We didn't cover many other collection types, including mutable collections.
// And we didn't even touch on object-oriented Scala. The book "Learning Scala" from O'Reilly is great if you want to
// go into more depth - but you've got enough to get through this course for now.
}
/** Count up how many of each star rating exists in the MovieLens 100K data set. */
/** Compute the average number of friends by age in a social network. */
/** A function that splits a line of input into (age, numFriends) tuples. */
// Create a tuple that is our result.
// Lots going on here...
// We are starting with an RDD of form (age, numFriends) where age is the KEY and numFriends is the VALUE
// We use mapValues to convert each numFriends value to a tuple of (numFriends, 1)
// Then we use reduceByKey to sum up the total numFriends and total instances for each age, by
// adding together all the numFriends values and 1's respectively.
val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2))
// To compute the average we divide totalFriends / totalInstances for each age.
// Collect the results from the RDD (This kicks off computing the DAG and actually executes the job)
// Sort and print the final results.
EZE00100082 minimum temperature: 7.70 F
ITE00100554 minimum temperature: 5.36 F
MaxTemperatures
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.math.max
/** Find the maximum temperature by weather station for a year */
object MaxTemperatures {
def parseLine(line:String)= {
val fields = line.split(",")
val stationID = fields(0)
val entryType = fields(2)
val temperature = fields(3).toFloat * 0.1f * (9.0f / 5.0f) + 32.0f
(stationID, entryType, temperature)
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "MaxTemperatures")
val lines = sc.textFile("../1800.csv")
val parsedLines = lines.map(parseLine)
val maxTemps = parsedLines.filter(x => x._2 == "TMAX")
val stationTemps = maxTemps.map(x => (x._1, x._3.toFloat))
val maxTempsByStation = stationTemps.reduceByKey( (x,y) => max(x,y))
val results = maxTempsByStation.collect()
for (result <- results.sorted) {
val station = result._1
val temp = result._2
val formattedTemp = f"$temp%.2f F"
println(s"$station max temperature: $formattedTemp")
}
}
}
OutPut:-
EZE00100082 max temperature: 90.14 F
ITE00100554 max temperature: 90.14 F
WordCount
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
/** Count up how many of each word appears in a book as simply as possible. */
object WordCount {
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "WordCount")
// Read each line of my book into an RDD
val input = sc.textFile("../book.txt")
// Split into words separated by a space character
val words = input.flatMap(x => x.split(" "))
// Count up the occurrences of each word
val wordCounts = words.countByValue()
// Print the results.
wordCounts.foreach(println)
}
}
OutPut:-
(foolproof,1)
(precious,1)
(inflammatory,1)
(referrer,,1)
(hourly,3)
(embedded,1)
(way).,1)
(touch,,1)
(of.,3)
(salesperson,5)
WordCountBetter
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
/** Count up how many of each word occurs in a book, using regular expressions. */
object WordCountBetter {
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "WordCountBetter")
// Load each line of my book into an RDD
val input = sc.textFile("../book.txt")
// Split using a regular expression that extracts words
val words = input.flatMap(x => x.split("\\W+"))
// Normalize everything to lowercase
val lowercaseWords = words.map(x => x.toLowerCase())
// Count of the occurrences of each word
val wordCounts = lowercaseWords.countByValue()
// Print the results
wordCounts.foreach(println)
}
}
OutPut:-
(silicon,6)
(brookings,1)
(driving,4)
(vision,6)
(government,9)
(complete,2)
(clock,1)
(notice,8)
(check,15)
(click,21)
(how,163)
(afford,4)
(barely,1)
(planned,3)
(whom,2)
(my,215)
(sierra,1)
(space,5)
(b,1)
(gain,4)
WordCountBetterSorted
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
/** Count up how many of each word occurs in a book, using regular expressions and sorting the final results */
object WordCountBetterSorted {
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using the local machine
val sc = new SparkContext("local", "WordCountBetterSorted")
// Load each line of my book into an RDD
val input = sc.textFile("../book.txt")
// Split using a regular expression that extracts words
val words = input.flatMap(x => x.split("\\W+"))
// Normalize everything to lowercase
val lowercaseWords = words.map(x => x.toLowerCase())
// Count of the occurrences of each word
val wordCounts = lowercaseWords.map(x => (x, 1)).reduceByKey( (x,y) => x + y )
// Flip (word, count) tuples to (count, word) and then sort by key (the counts)
val wordCountsSorted = wordCounts.map( x => (x._2, x._1) ).sortByKey()
// Print the results, flipping the (count, word) results to word: count as we go.
for (result <- wordCountsSorted) {
val count = result._1
val word = result._2
println(s"$word: $count")
}
}
}
Out Put:-
unceremoniously: 1
memorable: 1
pirate: 1
susceptible: 1
absolutely: 1
passionate: 1
convince: 2
raises: 2
envelope: 2
federal: 2
gap: 2
ignore: 2
copyrights: 2
automobile: 2
recently: 2
incur: 2
unnecessary: 2
park: 2
TotalSpentByCustomer
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
/** Compute the total amount spent per customer in some fake e-commerce data. */
object TotalSpentByCustomer {
/** Convert input data to (customerID, amountSpent) tuples */
def extractCustomerPricePairs(line: String) = {
val fields = line.split(",")
(fields(0).toInt, fields(2).toFloat)
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "TotalSpentByCustomer")
val input = sc.textFile("../customer-orders.csv")
val mappedInput = input.map(extractCustomerPricePairs)
val totalByCustomer = mappedInput.reduceByKey( (x,y) => x + y )
val results = totalByCustomer.collect()
// Print the results.
results.foreach(println)
}
}
OutPut:-
(34,5330.7993)
(52,5245.0605)
(96,3924.23)
(4,4815.05)
(16,4979.0605)
TotalSpentByCustomerSorted
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
/** Compute the total amount spent per customer in some fake e-commerce data. */
object TotalSpentByCustomerSorted {
/** Convert input data to (customerID, amountSpent) tuples */
def extractCustomerPricePairs(line: String) = {
var fields = line.split(",")
(fields(0).toInt, fields(2).toFloat)
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "TotalSpentByCustomerSorted")
val input = sc.textFile("../customer-orders.csv")
val mappedInput = input.map(extractCustomerPricePairs)
val totalByCustomer = mappedInput.reduceByKey( (x,y) => x + y )
val flipped = totalByCustomer.map( x => (x._2, x._1) )
val totalByCustomerSorted = flipped.sortByKey()
val results = totalByCustomerSorted.collect()
// Print the results.
results.foreach(println)
}
}
Out Put:-
(3309.3804,45)
(3790.5698,79)
(3924.23,96)
(4042.65,23)
(4172.29,99)
popular movies
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
/** Find the movies with the most ratings. */
object PopularMovies {
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "PopularMovies")
// Read in each rating line
val lines = sc.textFile("../ml-100k/u.data")
// Map to (movieID, 1) tuples
val movies = lines.map(x => (x.split("\t")(1).toInt, 1))
// Count up all the 1's for each movie
val movieCounts = movies.reduceByKey( (x, y) => x + y )
// Flip (movieID, count) to (count, movieID)
val flipped = movieCounts.map( x => (x._2, x._1) )
// Sort
val sortedMovies = flipped.sortByKey()
// Collect and print results
val results = sortedMovies.collect()
results.foreach(println)
}
}
Out Put:-
(1,1543)
(1,1667)
(1,1675)
(1,1613)
(1,1309)
(1,1571)
(1,1647)
(1,1633)
(1,1593)
(1,1655)
(1,1557)
(2,1304)
(2,1500)
(2,1390)
(2,1550)
PopularMoviesNicer
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.io.Source
import java.nio.charset.CodingErrorAction
import scala.io.Codec
/** Find the movies with the most ratings. */
object PopularMoviesNicer {
/** Load up a Map of movie IDs to movie names. */
def loadMovieNames() : Map[Int, String] = {
// Handle character encoding issues:
implicit val codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
// Create a Map of Ints to Strings, and populate it from u.item.
var movieNames:Map[Int, String] = Map()
val lines = Source.fromFile("../ml-100k/u.item").getLines()
for (line <- lines) {
var fields = line.split('|')
if (fields.length > 1) {
movieNames += (fields(0).toInt -> fields(1))
}
}
return movieNames
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "PopularMoviesNicer")
// Create a broadcast variable of our ID -> movie name map
var nameDict = sc.broadcast(loadMovieNames)
// Read in each rating line
val lines = sc.textFile("../ml-100k/u.data")
// Map to (movieID, 1) tuples
val movies = lines.map(x => (x.split("\t")(1).toInt, 1))
// Count up all the 1's for each movie
val movieCounts = movies.reduceByKey( (x, y) => x + y )
// Flip (movieID, count) to (count, movieID)
val flipped = movieCounts.map( x => (x._2, x._1) )
// Sort
val sortedMovies = flipped.sortByKey()
// Fold in the movie names from the broadcast variable
val sortedMoviesWithNames = sortedMovies.map( x => (nameDict.value(x._2), x._1) )
// Collect and print results
val results = sortedMoviesWithNames.collect()
results.foreach(println)
}
}
Out Put:-
(Johns (1996),1)
(Next Step, The (1995),1)
(Sunchaser, The (1996),1)
(Tokyo Fist (1995),1)
(Very Natural Thing, A (1974),1)
(Touki Bouki (Journey of the Hyena) (1973),1)
(Hana-bi (1997),1)
(? k?ldum klaka (Cold Fever) (1994),1)
(Death in Brunswick (1991),1)
(Favor, The (1994),1)
(Yankee Zulu (1994),1)
(New York Cop (1996),2)
(Santa with Muscles (1996),2)
(Innocent Sleep, The (1995),2)
(Destiny Turns on the Radio (1995),2)
(Babyfever (1994),2)
(Beyond Bedlam (1993),2)
(Condition Red (1995),2)
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.util.LongAccumulator
import org.apache.log4j._
import scala.collection.mutable.ArrayBuffer
/** Finds the degrees of separation between two Marvel comic book characters, based
* on co-appearances in a comic.
*/
object DegreesOfSeparation {
// The characters we want to find the separation between.
val startCharacterID = 5306 //SpiderMan
val targetCharacterID = 14 //ADAM 3,031 (who?)
// We make our accumulator a "global" Option so we can reference it in a mapper later.
var hitCounter:Option[LongAccumulator] = None
// Some custom data types
// BFSData contains an array of hero ID connections, the distance, and color.
type BFSData = (Array[Int], Int, String)
// A BFSNode has a heroID and the BFSData associated with it.
type BFSNode = (Int, BFSData)
/** Converts a line of raw input into a BFSNode */
def convertToBFS(line: String): BFSNode = {
// Split up the line into fields
val fields = line.split("\\s+")
// Extract this hero ID from the first field
val heroID = fields(0).toInt
// Extract subsequent hero ID's into the connections array
var connections: ArrayBuffer[Int] = ArrayBuffer()
for ( connection <- 1 to (fields.length - 1)) {
connections += fields(connection).toInt
}
// Default distance and color is 9999 and white
var color:String = "WHITE"
var distance:Int = 9999
// Unless this is the character we're starting from
if (heroID == startCharacterID) {
color = "GRAY"
distance = 0
}
return (heroID, (connections.toArray, distance, color))
}
/** Create "iteration 0" of our RDD of BFSNodes */
def createStartingRdd(sc:SparkContext): RDD[BFSNode] = {
val inputFile = sc.textFile("../marvel-graph.txt")
return inputFile.map(convertToBFS)
}
/** Expands a BFSNode into this node and its children */
def bfsMap(node:BFSNode): Array[BFSNode] = {
// Extract data from the BFSNode
val characterID:Int = node._1
val data:BFSData = node._2
val connections:Array[Int] = data._1
val distance:Int = data._2
var color:String = data._3
// This is called from flatMap, so we return an array
// of potentially many BFSNodes to add to our new RDD
var results:ArrayBuffer[BFSNode] = ArrayBuffer()
// Gray nodes are flagged for expansion, and create new
// gray nodes for each connection
if (color == "GRAY") {
for (connection <- connections) {
val newCharacterID = connection
val newDistance = distance + 1
val newColor = "GRAY"
// Have we stumbled across the character we're looking for?
// If so increment our accumulator so the driver script knows.
if (targetCharacterID == connection) {
if (hitCounter.isDefined) {
hitCounter.get.add(1)
}
}
// Create our new Gray node for this connection and add it to the results
val newEntry:BFSNode = (newCharacterID, (Array(), newDistance, newColor))
results += newEntry
}
// Color this node as black, indicating it has been processed already.
color = "BLACK"
}
// Add the original node back in, so its connections can get merged with
// the gray nodes in the reducer.
val thisEntry:BFSNode = (characterID, (connections, distance, color))
results += thisEntry
return results.toArray
}
/** Combine nodes for the same heroID, preserving the shortest length and darkest color. */
def bfsReduce(data1:BFSData, data2:BFSData): BFSData = {
// Extract data that we are combining
val edges1:Array[Int] = data1._1
val edges2:Array[Int] = data2._1
val distance1:Int = data1._2
val distance2:Int = data2._2
val color1:String = data1._3
val color2:String = data2._3
// Default node values
var distance:Int = 9999
var color:String = "WHITE"
var edges:ArrayBuffer[Int] = ArrayBuffer()
// See if one is the original node with its connections.
// If so preserve them.
if (edges1.length > 0) {
edges ++= edges1
}
if (edges2.length > 0) {
edges ++= edges2
}
// Preserve minimum distance
if (distance1 < distance) {
distance = distance1
}
if (distance2 < distance) {
distance = distance2
}
// Preserve darkest color
if (color1 == "WHITE" && (color2 == "GRAY" || color2 == "BLACK")) {
color = color2
}
if (color1 == "GRAY" && color2 == "BLACK") {
color = color2
}
if (color2 == "WHITE" && (color1 == "GRAY" || color1 == "BLACK")) {
color = color1
}
if (color2 == "GRAY" && color1 == "BLACK") {
color = color1
}
return (edges.toArray, distance, color)
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "DegreesOfSeparation")
// Our accumulator, used to signal when we find the target
// character in our BFS traversal.
hitCounter = Some(sc.longAccumulator("Hit Counter"))
var iterationRdd = createStartingRdd(sc)
var iteration:Int = 0
for (iteration <- 1 to 10) {
println("Running BFS Iteration# " + iteration)
// Create new vertices as needed to darken or reduce distances in the
// reduce stage. If we encounter the node we're looking for as a GRAY
// node, increment our accumulator to signal that we're done.
val mapped = iterationRdd.flatMap(bfsMap)
// Note that mapped.count() action here forces the RDD to be evaluated, and
// that's the only reason our accumulator is actually updated.
println("Processing " + mapped.count() + " values.")
if (hitCounter.isDefined) {
val hitCount = hitCounter.get.value
if (hitCount > 0) {
println("Hit the target character! From " + hitCount +
" different direction(s).")
return
}
}
// Reducer combines data for each character ID, preserving the darkest
// color and shortest path.
iterationRdd = mapped.reduceByKey(bfsReduce)
}
}
}
OutPut:-
Running BFS Iteration# 1
Processing 8330 values.
Running BFS Iteration# 2
Processing 220615 values.
Hit the target character! From 1 different direction(s).
MostPopularSuperhero
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
/** Find the superhero with the most co-appearances. */
object MostPopularSuperhero {
// Function to extract the hero ID and number of connections from each line
def countCoOccurences(line: String) = {
var elements = line.split("\\s+")
( elements(0).toInt, elements.length - 1 )
}
// Function to extract hero ID -> hero name tuples (or None in case of failure)
def parseNames(line: String) : Option[(Int, String)] = {
var fields = line.split('\"')
if (fields.length > 1) {
return Some(fields(0).trim().toInt, fields(1))
} else {
return None // flatmap will just discard None results, and extract data from Some results.
}
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "MostPopularSuperhero")
// Build up a hero ID -> name RDD
val names = sc.textFile("../marvel-names.txt")
val namesRdd = names.flatMap(parseNames)
// Load up the superhero co-apperarance data
val lines = sc.textFile("../marvel-graph.txt")
// Convert to (heroID, number of connections) RDD
val pairings = lines.map(countCoOccurences)
// Combine entries that span more than one line
val totalFriendsByCharacter = pairings.reduceByKey( (x,y) => x + y )
// Flip it to # of connections, hero ID
val flipped = totalFriendsByCharacter.map( x => (x._2, x._1) )
// Find the max # of connections
val mostPopular = flipped.max()
// Look up the name (lookup returns an array of results, so we need to access the first result with (0)).
val mostPopularName = namesRdd.lookup(mostPopular._2)(0)
// Print out our answer!
println(s"$mostPopularName is the most popular superhero with ${mostPopular._1} co-appearances.")
}
}
Output:-
CAPTAIN AMERICA is the most popular superhero with 1933 co-appearances.
SparkSQL
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.log4j._
object SparkSQL {
case class Person(ID:Int, name:String, age:Int, numFriends:Int)
def mapper(line:String): Person = {
val fields = line.split(',')
val person:Person = Person(fields(0).toInt, fields(1), fields(2).toInt, fields(3).toInt)
return person
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Use new SparkSession interface in Spark 2.0
val spark = SparkSession
.builder
.appName("SparkSQL")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
.getOrCreate()
val lines = spark.sparkContext.textFile("../fakefriends.csv")
val people = lines.map(mapper)
// Infer the schema, and register the DataSet as a table.
import spark.implicits._
val schemaPeople = people.toDS
schemaPeople.printSchema()
schemaPeople.createOrReplaceTempView("people")
// SQL can be run over DataFrames that have been registered as a table
val teenagers = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")
val results = teenagers.collect()
results.foreach(println)
spark.stop()
}
}
OutPut:-
root
|-- ID: integer (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
|-- numFriends: integer (nullable = false)
[21,Miles,19,268]
[52,Beverly,19,269]
[54,Brunt,19,5]
[106,Beverly,18,499]
[115,Dukat,18,397]
[133,Quark,19,265]
[136,Will,19,335]
[225,Elim,19,106]
[304,Will,19,404]
[341,Data,18,326]
[366,Keiko,19,119]
[373,Quark,19,272]
[377,Beverly,18,418]
[404,Kasidy,18,24]
PopularMoviesDataSets
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.log4j._
import scala.io.Source
import java.nio.charset.CodingErrorAction
import scala.io.Codec
import org.apache.spark.sql.functions._
/** Find the movies with the most ratings. */
object PopularMoviesDataSets {
/** Load up a Map of movie IDs to movie names. */
def loadMovieNames() : Map[Int, String] = {
// Handle character encoding issues:
implicit val codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
// Create a Map of Ints to Strings, and populate it from u.item.
var movieNames:Map[Int, String] = Map()
val lines = Source.fromFile("../ml-100k/u.item").getLines()
for (line <- lines) {
var fields = line.split('|')
if (fields.length > 1) {
movieNames += (fields(0).toInt -> fields(1))
}
}
return movieNames
}
// Case class so we can get a column name for our movie ID
final case class Movie(movieID: Int)
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Use new SparkSession interface in Spark 2.0
val spark = SparkSession
.builder
.appName("PopularMovies")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
.getOrCreate()
// Read in each rating line and extract the movie ID; construct an RDD of Movie objects.
val lines = spark.sparkContext.textFile("../ml-100k/u.data").map(x => Movie(x.split("\t")(1).toInt))
// Convert to a DataSet
import spark.implicits._
val moviesDS = lines.toDS()
// Some SQL-style magic to sort all movies by popularity in one line!
val topMovieIDs = moviesDS.groupBy("movieID").count().orderBy(desc("count")).cache()
// Show the results at this point:
/*
|movieID|count|
+-------+-----+
| 50| 584|
| 258| 509|
| 100| 508|
*/
topMovieIDs.show()
// Grab the top 10
val top10 = topMovieIDs.take(10)
// Load up the movie ID -> name map
val names = loadMovieNames()
// Print the results
println
for (result <- top10) {
// result is just a Row at this point; we need to cast it back.
// Each row has movieID, count as above.
println (names(result(0).asInstanceOf[Int]) + ": " + result(1))
}
// Stop the session
spark.stop()
}
}
Out Put:-
+-------+-----+
|movieID|count|
+-------+-----+
| 50| 583|
| 258| 509|
| 100| 508|
| 181| 507|
| 294| 485|
| 286| 481|
| 288| 478|
| 1| 452|
| 300| 431|
| 121| 429|
| 174| 420|
| 127| 413|
| 56| 394|
| 7| 392|
| 98| 390|
| 237| 384|
| 117| 378|
| 172| 367|
| 222| 365|
| 204| 350|
+-------+-----+
only showing top 20 rows
Star Wars (1977): 583
Contact (1997): 509
Fargo (1996): 508
Return of the Jedi (1983): 507
Liar Liar (1997): 485
English Patient, The (1996): 481
Scream (1996): 478
Toy Story (1995): 452
Air Force One (1997): 431
Independence Day (ID4) (1996): 429
DataFrames
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.log4j._
object DataFrames {
case class Person(ID:Int, name:String, age:Int, numFriends:Int)
def mapper(line:String): Person = {
val fields = line.split(',')
val person:Person = Person(fields(0).toInt, fields(1), fields(2).toInt, fields(3).toInt)
return person
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Use new SparkSession interface in Spark 2.0
val spark = SparkSession
.builder
.appName("SparkSQL")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
.getOrCreate()
// Convert our csv file to a DataSet, using our Person case
// class to infer the schema.
import spark.implicits._
val lines = spark.sparkContext.textFile("../fakefriends.csv")
val people = lines.map(mapper).toDS().cache()
// There are lots of other ways to make a DataFrame.
// For example, spark.read.json("json file path")
// or sqlContext.table("Hive table name")
println("Here is our inferred schema:")
people.printSchema()
println("Let's select the name column:")
people.select("name").show()
println("Filter out anyone over 21:")
people.filter(people("age") < 21).show()
println("Group by age:")
people.groupBy("age").count().show()
println("Make everyone 10 years older:")
people.select(people("name"), people("age") + 10).show()
spark.stop()
}
}
OutPut:-
Here is our inferred schema:
root
|-- ID: integer (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
|-- numFriends: integer (nullable = false)
Let's select the name column:
+--------+
| name|
+--------+
| Will|
|Jean-Luc|
| Hugh|
| Deanna|
| Quark|
| Weyoun|
| Gowron|
| Will|
| Jadzia|
| Hugh|
| Odo|
| Ben|
| Keiko|
|Jean-Luc|
| Hugh|
| Rom|
| Weyoun|
| Odo|
|Jean-Luc|
| Geordi|
+--------+
only showing top 20 rows
Filter out anyone over 21:
+---+-------+---+----------+
| ID| name|age|numFriends|
+---+-------+---+----------+
| 21| Miles| 19| 268|
| 48| Nog| 20| 1|
| 52|Beverly| 19| 269|
| 54| Brunt| 19| 5|
| 60| Geordi| 20| 100|
| 73| Brunt| 20| 384|
|106|Beverly| 18| 499|
|115| Dukat| 18| 397|
|133| Quark| 19| 265|
|136| Will| 19| 335|
|225| Elim| 19| 106|
|304| Will| 19| 404|
|327| Julian| 20| 63|
|341| Data| 18| 326|
|349| Kasidy| 20| 277|
|366| Keiko| 19| 119|
|373| Quark| 19| 272|
|377|Beverly| 18| 418|
|404| Kasidy| 18| 24|
|409| Nog| 19| 267|
+---+-------+---+----------+
only showing top 20 rows
Group by age:
+---+-----+
|age|count|
+---+-----+
| 31| 8|
| 65| 5|
| 53| 7|
| 34| 6|
| 28| 10|
| 26| 17|
| 27| 8|
| 44| 12|
| 22| 7|
| 47| 9|
| 52| 11|
| 40| 17|
| 20| 5|
| 57| 12|
| 54| 13|
| 48| 10|
| 19| 11|
| 64| 12|
| 41| 9|
| 43| 7|
+---+-----+
only showing top 20 rows
Make everyone 10 years older:
+--------+----------+
| name|(age + 10)|
+--------+----------+
| Will| 43|
|Jean-Luc| 36|
| Hugh| 65|
| Deanna| 50|
| Quark| 78|
| Weyoun| 69|
| Gowron| 47|
| Will| 64|
| Jadzia| 48|
| Hugh| 37|
| Odo| 63|
| Ben| 67|
| Keiko| 64|
|Jean-Luc| 66|
| Hugh| 53|
| Rom| 46|
| Weyoun| 32|
| Odo| 45|
|Jean-Luc| 55|
| Geordi| 70|
+--------+----------+
only showing top 20 rows
PopularHashtags
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
/** Listens to a stream of Tweets and keeps track of the most popular
* hashtags over a 5 minute window.
*/
object PopularHashtags {
/** Makes sure only ERROR messages get logged to avoid log spam. */
def setupLogging() = {
import org.apache.log4j.{Level, Logger}
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
}
/** Configures Twitter service credentials using twiter.txt in the main workspace directory */
def setupTwitter() = {
import scala.io.Source
for (line <- Source.fromFile("../twitter.txt").getLines) {
val fields = line.split(" ")
if (fields.length == 2) {
System.setProperty("twitter4j.oauth." + fields(0), fields(1))
}
}
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Configure Twitter credentials using twitter.txt
setupTwitter()
// Set up a Spark streaming context named "PopularHashtags" that runs locally using
// all CPU cores and one-second batches of data
val ssc = new StreamingContext("local[*]", "PopularHashtags", Seconds(1))
// Get rid of log spam (should be called after the context is set up)
setupLogging()
// Create a DStream from Twitter using our streaming context
val tweets = TwitterUtils.createStream(ssc, None)
// Now extract the text of each status update into DStreams using map()
val statuses = tweets.map(status => status.getText())
// Blow out each word into a new DStream
val tweetwords = statuses.flatMap(tweetText => tweetText.split(" "))
// Now eliminate anything that's not a hashtag
val hashtags = tweetwords.filter(word => word.startsWith("#"))
// Map each hashtag to a key/value pair of (hashtag, 1) so we can count them up by adding up the values
val hashtagKeyValues = hashtags.map(hashtag => (hashtag, 1))
// Now count them up over a 5 minute window sliding every one second
val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow( (x,y) => x + y, (x,y) => x - y, Seconds(300), Seconds(1))
// You will often see this written in the following shorthand:
//val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow( _ + _, _ -_, Seconds(300), Seconds(1))
// Sort the results by the count values
val sortedResults = hashtagCounts.transform(rdd => rdd.sortBy(x => x._2, false))
// Print the top 10
sortedResults.print
// Set a checkpoint directory, and kick it all off
// I could watch this all day!
ssc.checkpoint("C:/checkpoint/")
ssc.start()
ssc.awaitTermination()
}
}