val conf = ConfigFactory.load
val envProps: Config = conf.getConfig(args(0))
val sparkConf = new SparkConf().setMaster("yarn").setAppName("SiteTraffic")
val streamingContext = new StreamingContext(sparkConf, Seconds(envProps.getInt("window")))
val broadcastConfig = streamingContext.sparkContext.broadcast(envProps)
val topicsSet = Set("retail_logs")
val now = Calendar.getInstance().getTime()
val timestamp = streamingContext.sparkContext.broadcast(now)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> envProps.getString("bootstrap.server"),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "1",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val logData: DStream[String] =KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaParams)
).map(record => record.value)
val countryList = logData.map(line => {
val json: Option[Any] = JSON.parseFull(line)
val map = json.get.asInstanceOf[Map[String, Any]]
val geoIpMap = map.get("geoip").get.asInstanceOf[Map[String, Any]]
val country = geoIpMap.get("country_name").getOrElse("ALIEN").asInstanceOf[String]
val timestamp = map.get("rounded_timestamp").get.asInstanceOf[String]
((timestamp , country), 1)
}).reduceByKey(_ + _)
countryList.foreachRDD(countries =>{
countries.foreach(country =>{
insertOrUpdateMetrics(country._1._1, country._1._2, country._2, broadcastConfig.value)
})
})
streamingContext.start()
streamingContext.awaitTermination()
//HBase Connectors and Update Function:
def getHbaseConnection(config: Config): Connection ={
//Create Hbase Configuration Object
val hBaseConf: Configuration = HBaseConfiguration.create()
hBaseConf.set("hbase.zookeeper.quorum", config.getString("zookeeper.quorum"))
hBaseConf.set("hbase.zookeeper.property.clientPort", config.getString("zookeeper.port"))
hBaseConf.set("zookeeper.znode.parent","/hbase-unsecure")
hBaseConf.set("hbase.cluster.distributed","true")
//Establish Connection
val connection = ConnectionFactory.createConnection(hBaseConf)
connection
}
def insertOrUpdateMetrics(rowId: String, country: String, count: Int , envProps: Config): Unit = {
//Hbase Metadata
val columnFamily1 = "metrics"
val connection = getHbaseConnection(envProps)
val table = connection.getTable(TableName.valueOf("/user/mapr/country_count_stream"))
val row_get = new Get(Bytes.toBytes(rowId.toString))
//Insert Into Table
val result = table.get(row_get)
val value = result.getValue(Bytes.toBytes(columnFamily1),Bytes.toBytes(country))
val rowPut = new Put(Bytes.toBytes(rowId.toString))
if (value == null) {
rowPut.addColumn(Bytes.toBytes(columnFamily1),Bytes.toBytes(country),Bytes.toBytes(count.toString))
} else {
val newCount = Bytes.toString(value).toInt + count
rowPut.addColumn(Bytes.toBytes(columnFamily1),Bytes.toBytes(country),Bytes.toBytes(newCount.toString))
}
table.put(rowPut)
connection.close()
}
//Build, Deploy and Run
/opt/mapr/spark/spark-2.2.1/bin/spark-submit \
--class CountryVisitCount \
--master yarn \
--conf spark.ui.port=4926 \
--jars $(echo /external_jars/*.jar | tr ' ' ',') \
kafkaworkshopmapr_2.11-0.1.jar prod
No comments:
Post a Comment