Skip to content

Commit

Permalink
load kafka config from application.conf (KafkaEventLoad/LoadKafkaEven…
Browse files Browse the repository at this point in the history
…tExample.scala
  • Loading branch information
yennanliu committed May 31, 2020
1 parent 0cababc commit b2aa1b3
Showing 1 changed file with 10 additions and 0 deletions.
10 changes: 10 additions & 0 deletions src/main/scala/KafkaEventLoad/LoadKafkaEventExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.functions.{get_json_object, json_tuple}

import com.typesafe.config.ConfigFactory

/*
* modify from
* https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
Expand All @@ -26,6 +28,8 @@ object LoadKafkaEventExample {

def main(args: Array[String]): Unit = {

// spark config

val sc = new SparkContext("local[*]", "LoadKafkaEventExample")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val spark = SparkSession
Expand All @@ -42,6 +46,12 @@ object LoadKafkaEventExample {

import spark.implicits._

// kafka config

val kafkaconfig = ConfigFactory.load().getConfig("kafka")
val bootstrapservers:String = kafkaconfig.getString("BOOTSTRAP_SERVERS")
val topic:String = kafkaconfig.getString("TOPIC")

// Subscribe to 1 topic

val df = spark
Expand Down

0 comments on commit b2aa1b3

Please sign in to comment.