diff --git a/app/src/main/resources/application.conf b/app/src/main/resources/application.conf new file mode 100644 index 0000000..01a8058 --- /dev/null +++ b/app/src/main/resources/application.conf @@ -0,0 +1,63 @@ +# app { + + app-name = "app" + spark.checkpoint.dir = "./tmp" + + akka { + loglevel = "INFO" + loggers = ["akka.event.slf4j.Slf4jLogger"] + logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + logger-startup-timeout = 60s + log-dead-letters = off + log-dead-letters-during-shutdown = off + + actor { + provider = "akka.cluster.ClusterActorRefProvider" + default-dispatcher { + # Throughput for default Dispatcher, set to 1 for as fair as possible + throughput = 10 + } + # serializers.kryo = "com.twitter.chill.akka.AkkaSerializer" + # serialization-bindings."scala.Product" = kryo + } + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + } + + cluster { + seed-nodes = [ + "akka.tcp://ClsuterSystem@127.0.0.1:9001", + "akka.tcp://ClsuterSystem@127.0.0.1:9002"] + + auto-down-unreachable-after = 10s + log-info = on + gossip-interval = 5s + publish-stats-interval = 10s + auto-down-unreachable-after = 10s + metrics.gossip-interval = 10s + metrics.collect-interval = 10s + } + } + + spark { + # # The Spark master host. Set first by environment if exists. Then system, then config. + # # Options: spark://host1:port1,host2:port2 + # # - "local" to run locally with one thread, + # # - local[4]" to run locally with 4 cores + # # - the master IP/hostname to run on a Spark standalone cluster + # # - if not set, defaults to "local[*]" to run with enough threads + # # Supports optional HA failover for more than one: host1,host2.. + # # which is used to inform: spark://host1:port1,host2:port2 + master = "local" + # # cleaner.ttl = ${?SPARK_CLEANER_TTL} + + # # The batch interval must be set based on the latency requirements + # # of your application and available cluster resources. + # # streaming.batch.interval = ${?SPARK_STREAMING_BATCH_INTERVAL} + } + +# } diff --git a/app/src/main/scala/com/app/ActorManager.scala b/app/src/main/scala/com/app/ActorManager.scala index 0b82e07..a127308 100644 --- a/app/src/main/scala/com/app/ActorManager.scala +++ b/app/src/main/scala/com/app/ActorManager.scala @@ -4,7 +4,10 @@ import akka.actor.{Actor, ActorLogging, Props, ActorRef} import akka.cluster.Member import com.common.cluster.ClusterSupervisor -import com.common.Cluster._ +import com.common.cluster._ +import com.common.CommonSettings +import com.common.Events._ +import com.spark.Events._ class ActorManager(settings:CommonSettings) extends ClusterSupervisor { @@ -16,6 +19,7 @@ class ActorManager(settings:CommonSettings) extends ClusterSupervisor { val ping = context.actorOf(Props(new PingActor), "ping-actor") val pong = context.actorOf(Props(new PongActor), "pong-actor") + val spark = context.actorOf(Props(new SparkApp("spark")), "spark-batch-linear-regression") override def preStart():Unit = { super.preStart() @@ -27,7 +31,8 @@ class ActorManager(settings:CommonSettings) extends ClusterSupervisor { log.info("/////////////////////") log.info("// Starting nodes //") log.info("/////////////////////") - ping ! Start + // ping ! Start + spark ! SparkBatchStart context become actorReceive } diff --git a/app/src/main/scala/com/app/ApplicationMain.scala b/app/src/main/scala/com/app/ApplicationMain.scala index 2a8e452..dfca670 100644 --- a/app/src/main/scala/com/app/ApplicationMain.scala +++ b/app/src/main/scala/com/app/ApplicationMain.scala @@ -1,6 +1,7 @@ package com.app import akka.actor.{ActorSystem, PoisonPill, Props} +import com.common.CommonSettings object ApplicationMain extends App { diff --git a/app/src/main/scala/com/app/Events.scala b/app/src/main/scala/com/app/Events.scala index cb88e9a..4d923f6 100644 --- a/app/src/main/scala/com/app/Events.scala +++ b/app/src/main/scala/com/app/Events.scala @@ -1,9 +1,6 @@ package com.app -import com.common.Events._ -import com.spark.Events._ object Events { - // } diff --git a/app/src/main/scala/com/app/PingActor.scala b/app/src/main/scala/com/app/PingActor.scala index 63679a9..897f346 100644 --- a/app/src/main/scala/com/app/PingActor.scala +++ b/app/src/main/scala/com/app/PingActor.scala @@ -1,7 +1,9 @@ -package com.common +package com.app import akka.actor.{Actor, ActorLogging, Props, ActorRef} +import com.common.Events._ + // class PingActor(settings:CommonSettings) extends Actor with ActorLogging { class PingActor extends Actor with ActorLogging { diff --git a/app/src/main/scala/com/app/PongActor.scala b/app/src/main/scala/com/app/PongActor.scala index 7539be8..db9b572 100644 --- a/app/src/main/scala/com/app/PongActor.scala +++ b/app/src/main/scala/com/app/PongActor.scala @@ -1,7 +1,9 @@ -package com.common +package com.app import akka.actor.{Actor, ActorLogging, Props} +import com.common.Events._ + // class PongActor(settings:CommonSettings) extends Actor with ActorLogging { class PongActor extends Actor with ActorLogging { diff --git a/app/src/main/scala/com/app/Settings.scala b/app/src/main/scala/com/app/Settings.scala index 2b249b4..c194096 100644 --- a/app/src/main/scala/com/app/Settings.scala +++ b/app/src/main/scala/com/app/Settings.scala @@ -1,6 +1,6 @@ package com.app -import com.common.Settings._ +import com.scalding.Settings._ object Settings { // diff --git a/app/src/main/scala/com/app/SparkApp.scala b/app/src/main/scala/com/app/SparkApp.scala new file mode 100644 index 0000000..98f622a --- /dev/null +++ b/app/src/main/scala/com/app/SparkApp.scala @@ -0,0 +1,31 @@ +package com.app + +import akka.actor.{Actor, ActorLogging, Props, ActorRef} +import scala.util.Random +import scala.collection.immutable.Vector + +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext +import org.apache.spark.mllib.regression.LinearRegressionWithSGD +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.linalg.Vectors + +import com.spark._ + + +class SparkApp(name:String) extends SparkBatch(name:String) { + + def createData(seed:Int, len:Int):Vector[LabeledPoint] = { + val r = new Random(seed) + val d = Vector.fill(len, 2)(r.nextGaussian) + d.map { p => LabeledPoint(p(0), Vectors.dense(p.to[Array])) } + } + + override def batch(spk:SparkContext, sql:SQLContext):Unit = { + val dat = createData(Random.nextInt, 100) + val rdd = sc.parallelize(dat) + val model = LinearRegressionWithSGD.train(rdd, 100) + + println(s"The model has intercept $model.intercept with weights $model.weights") + } +}