package retail
import java.sql.Timestamp
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
/**
* Created by itversity on 19/05/18.
*/
object GetStreamingDepartmentTrafficKafka {
def main(args: Array[String]): Unit = {
val conf = ConfigFactory.load.getConfig(args(0))
val spark = SparkSession.
builder().
master(conf.getString("execution.mode")).
appName("Get Streaming Department Traffic").
getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark.sql.shuffle.partitions", "2")
val lines = spark.readStream.
format("kafka").
option("kafka.bootstrap.servers", conf.getString("bootstrap.server")).
option("subscribe", "retail").
option("includeTimestamp", true).
load.
selectExpr("CAST(value AS STRING)", "timestamp").
as[(String, Timestamp)]
val departmentTraffic = lines.
where(split(split($"value", " ")(6), "/")(1) === "department").
select(split(split($"value", " ")(6), "/")(2).alias("department_name"), $"timestamp").
groupBy(
window($"timestamp", "20 seconds", "20 seconds"),$"department_name"
).
count()
val query = departmentTraffic.
writeStream.
outputMode("update").
format("console").
trigger(Trigger.ProcessingTime("20 seconds")).
start()
query.awaitTermination()
}
}
Monday, 10 September 2018
Kafka and Spark Structured Streaming
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment