forked from spirom/LearningSpark
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCustomReceiver.scala
130 lines (110 loc) · 3.87 KB
/
CustomReceiver.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package streaming
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver
import scala.concurrent.{Await, Promise}
import scala.concurrent.duration._
import scala.util.Success
import scala.language.postfixOps
//
// This is almost the simplest custom receiver possible. It emits the same
// string every 100ms or so. Almost all the code is involved with making
// it a 'good citizen' in terms of stopping in an orderly fashion when asked
// to do so by the streaming context.
//
class CustomReceiver
extends Receiver[String](StorageLevel.MEMORY_ONLY)
with Serializable
{
//
// Two way communication with the receiver thread: one promise/future
// pair to ask it to stop and another to find out when it has stopped.
// This is initialized in onStart() to work around the fact that a
// Receiver must be serializable -- see
// https://issues.apache.org/jira/browse/SPARK-1785
//
private class Synchronization {
// this is completed to request that the loop stop
val promiseToTerminate = Promise[Unit]()
val futureTermination = promiseToTerminate.future
// this is completed when the loop has stopped
val promiseToStop = Promise[Unit]()
val futureStopping = promiseToStop.future
}
private var sync: Synchronization = null
// starting is easy: start a receiver thread as onStart() must not block
def onStart(): Unit = {
println("*** starting custom receiver")
sync = new Synchronization
new Thread("Custom Receiver") {
override def run() {
receive()
}
}.start()
}
// stopping is a little harder: once onStop() exits this receiver will be
// assumed to have quiesced -- so you need to signal the thread, and then
// wait for it to stop -- arguably a timeout is appropriate here
def onStop(): Unit = {
println("*** stopping custom receiver")
sync.promiseToTerminate.complete(Success(()))
Await.result(sync.futureStopping, 1 second)
println("*** stopped custom receiver")
}
// This receive loop is bit tricky in terms of termination. Certainly it
// should stop if 'isStopped' is true, as the streaming context thinks the
// receiver has stopped. But that's not enough -- the loop also needs to
// exit if the streaming context has asked it to by calling onStop() above.
//
private def receive(): Unit = {
while(!isStopped && !sync.futureTermination.isCompleted) {
try {
// make it a bit slow to stop
Thread.sleep(100)
store("hello")
} catch {
case e: Exception => {
println("*** exception caught in receiver calling store()")
e.printStackTrace()
}
}
}
sync.promiseToStop.complete(Success(()))
println("*** custom receiver loop exited")
}
}
object CustomStreaming {
def main (args: Array[String]) {
val conf =
new SparkConf().setAppName("CustomSeqStreaming").setMaster("local[4]")
val sc = new SparkContext(conf)
// streams will produce data every second
val ssc = new StreamingContext(sc, Seconds(1))
// create the stream
val stream = ssc.receiverStream(new CustomReceiver)
// register for data
stream.foreachRDD(r => {
println(s"Items: ${r.count()} Partitions: ${r.partitions.size}")
})
println("*** starting streaming")
ssc.start()
new Thread("Delayed Termination") {
override def run() {
Thread.sleep(15000)
println("*** stopping streaming")
ssc.stop()
}
}.start()
try {
ssc.awaitTermination()
println("*** streaming terminated")
} catch {
case e: Exception => {
println("*** streaming exception caught in monitor thread")
}
}
// wait a bit longer for the call to awaitTermination() to return
Thread.sleep(5000)
}
}