Skip to content

Commit

Permalink
Write a mllib based spark batch job
Browse files Browse the repository at this point in the history
  • Loading branch information
ixaxaar committed Apr 6, 2015
1 parent 4d09b7e commit 054c133
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 8 deletions.
63 changes: 63 additions & 0 deletions app/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# app {

app-name = "app"
spark.checkpoint.dir = "./tmp"

akka {
loglevel = "INFO"
loggers = ["akka.event.slf4j.Slf4jLogger"]
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
logger-startup-timeout = 60s
log-dead-letters = off
log-dead-letters-during-shutdown = off

actor {
provider = "akka.cluster.ClusterActorRefProvider"
default-dispatcher {
# Throughput for default Dispatcher, set to 1 for as fair as possible
throughput = 10
}
# serializers.kryo = "com.twitter.chill.akka.AkkaSerializer"
# serialization-bindings."scala.Product" = kryo
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}

cluster {
seed-nodes = [
"akka.tcp://[email protected]:9001",
"akka.tcp://[email protected]:9002"]

auto-down-unreachable-after = 10s
log-info = on
gossip-interval = 5s
publish-stats-interval = 10s
auto-down-unreachable-after = 10s
metrics.gossip-interval = 10s
metrics.collect-interval = 10s
}
}

spark {
# # The Spark master host. Set first by environment if exists. Then system, then config.
# # Options: spark://host1:port1,host2:port2
# # - "local" to run locally with one thread,
# # - local[4]" to run locally with 4 cores
# # - the master IP/hostname to run on a Spark standalone cluster
# # - if not set, defaults to "local[*]" to run with enough threads
# # Supports optional HA failover for more than one: host1,host2..
# # which is used to inform: spark://host1:port1,host2:port2
master = "local"
# # cleaner.ttl = ${?SPARK_CLEANER_TTL}

# # The batch interval must be set based on the latency requirements
# # of your application and available cluster resources.
# # streaming.batch.interval = ${?SPARK_STREAMING_BATCH_INTERVAL}
}

# }
9 changes: 7 additions & 2 deletions app/src/main/scala/com/app/ActorManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import akka.actor.{Actor, ActorLogging, Props, ActorRef}
import akka.cluster.Member
import com.common.cluster.ClusterSupervisor

import com.common.Cluster._
import com.common.cluster._
import com.common.CommonSettings
import com.common.Events._
import com.spark.Events._


class ActorManager(settings:CommonSettings) extends ClusterSupervisor {
Expand All @@ -16,6 +19,7 @@ class ActorManager(settings:CommonSettings) extends ClusterSupervisor {

val ping = context.actorOf(Props(new PingActor), "ping-actor")
val pong = context.actorOf(Props(new PongActor), "pong-actor")
val spark = context.actorOf(Props(new SparkApp("spark")), "spark-batch-linear-regression")

override def preStart():Unit = {
super.preStart()
Expand All @@ -27,7 +31,8 @@ class ActorManager(settings:CommonSettings) extends ClusterSupervisor {
log.info("/////////////////////")
log.info("// Starting nodes //")
log.info("/////////////////////")
ping ! Start
// ping ! Start
spark ! SparkBatchStart

context become actorReceive
}
Expand Down
1 change: 1 addition & 0 deletions app/src/main/scala/com/app/ApplicationMain.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.app

import akka.actor.{ActorSystem, PoisonPill, Props}
import com.common.CommonSettings


object ApplicationMain extends App {
Expand Down
3 changes: 0 additions & 3 deletions app/src/main/scala/com/app/Events.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package com.app

import com.common.Events._
import com.spark.Events._

object Events {
//
}

4 changes: 3 additions & 1 deletion app/src/main/scala/com/app/PingActor.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.common
package com.app

import akka.actor.{Actor, ActorLogging, Props, ActorRef}

import com.common.Events._


// class PingActor(settings:CommonSettings) extends Actor with ActorLogging {
class PingActor extends Actor with ActorLogging {
Expand Down
4 changes: 3 additions & 1 deletion app/src/main/scala/com/app/PongActor.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.common
package com.app

import akka.actor.{Actor, ActorLogging, Props}

import com.common.Events._


// class PongActor(settings:CommonSettings) extends Actor with ActorLogging {
class PongActor extends Actor with ActorLogging {
Expand Down
2 changes: 1 addition & 1 deletion app/src/main/scala/com/app/Settings.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.app

import com.common.Settings._
import com.scalding.Settings._

object Settings {
//
Expand Down
31 changes: 31 additions & 0 deletions app/src/main/scala/com/app/SparkApp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.app

import akka.actor.{Actor, ActorLogging, Props, ActorRef}
import scala.util.Random
import scala.collection.immutable.Vector

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors

import com.spark._


class SparkApp(name:String) extends SparkBatch(name:String) {

def createData(seed:Int, len:Int):Vector[LabeledPoint] = {
val r = new Random(seed)
val d = Vector.fill(len, 2)(r.nextGaussian)
d.map { p => LabeledPoint(p(0), Vectors.dense(p.to[Array])) }
}

override def batch(spk:SparkContext, sql:SQLContext):Unit = {
val dat = createData(Random.nextInt, 100)
val rdd = sc.parallelize(dat)
val model = LinearRegressionWithSGD.train(rdd, 100)

println(s"The model has intercept $model.intercept with weights $model.weights")
}
}

0 comments on commit 054c133

Please sign in to comment.