-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCluster.go
288 lines (225 loc) · 5.72 KB
/
Cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
package cluster
import (
"bytes"
"encoding/gob"
"encoding/json"
"fmt"
"io/ioutil"
"log"
//"strings"
)
import zmq "github.com/pebbe/zmq4"
const (
BROADCAST = -1
PRINT_START = 0
)
type Envelope struct {
// On the sender side, Pid identifies the receiving peer. If instead, Pid is
// set to cluster.BROADCAST, the message is sent to all peers. On the receiver side, the
// Id is always set to the original sender. If the Id is not found, the message is silently dropped
Pid int
// An id that globally and uniquely identifies the message, meant for duplicate detection at
// higher levels. It is opaque to this package.
MsgId int64
// the actual message.
Msg interface{}
}
type Server interface {
// Id of this server
Pid() int
// array of other servers' ids in the same cluster
Peers() []int
// the channel to use to send messages to other peers
// Note that there are no guarantees of message delivery, and messages
// are silently dropped
Outbox() chan<- *Envelope
// the channel to receive messages from other peers.
Inbox() <-chan *Envelope
// pause/unpause - drop messages recieved and to be sent
Pause() bool
Unpause() bool
}
//Inteface for messaging
type MsgHandler interface {
Sender() int
Receiver() int
}
type ServerBody struct {
// Id of this server
MyId int
// Address of this server
MyAdd string
// Number of servers in cluster
NumServers int
// Array of other servers' address
PeerAdds []string
// Array of other servers' id
PeerIds []int
// Array of sockets to peers
PeerSockets []*zmq.Socket
// Outbox channel
OutChan chan *Envelope
// Inbox channel
InChan chan *Envelope
// Waiting to hear from somebody
RecvChan chan int
// Waiting to complete sending data
SendChan chan int
// Drop incoming messages (simulate cutoff)
Cutoff *bool
}
//ServerBody implementation for Pid()
func (s ServerBody) Pid() int {
return s.MyId
}
//ServerBody implementation for Peers()
func (s ServerBody) Peers() []int {
return s.PeerIds
}
//ServerBody implementation for Outbox()
func (s ServerBody) Outbox() chan<- *Envelope {
return s.OutChan
}
//ServerBody implementation for Inbox()
func (s ServerBody) Inbox() <-chan *Envelope {
return s.InChan
}
//ServerBody implementation for Pause()
func (s ServerBody) Pause() bool {
*s.Cutoff = true
return true
}
//ServerBody implementation for Unpause()
func (s ServerBody) Unpause() bool {
*s.Cutoff = false
return false
}
//ServerBody implementation for Sender
func (s ServerBody) Sender() int {
for {
//Waiting for SendChannel to get free
<-s.SendChan
//Waiting for Outbox entry
e := <-s.OutChan
//Changing the Pid to Sender
var toId int
toId = e.Pid
e.Pid = s.Pid()
m := new(bytes.Buffer)
//m, err := json.Marshal(e) //Marshal encoding
enc := gob.NewEncoder(m)
err := enc.Encode(e)
if !(*s.Cutoff) {
for j, toPid := range s.Peers() {
if (toPid == toId || toId == -1) && toPid != s.MyId {
//fmt.Printf("Sending Message to %s ...", s.PeerAdds[j])
//context, err := zmq.NewContext()
//if(err != nil) { log.Fatal(err) }
// Creating and connecting a new socket for communication if that doesnt already exist
if s.PeerSockets[j] == nil {
s.PeerSockets[j], err = zmq.NewSocket(zmq.PUSH)
if err != nil {
log.Fatal(err)
}
defer s.PeerSockets[j].Close()
err = s.PeerSockets[j].Connect(s.PeerAdds[j])
if err != nil {
log.Fatal(err)
}
//println("Connected")
}
_, err = s.PeerSockets[j].SendBytes(m.Bytes(), 0)
if err != nil {
log.Fatal(err)
}
//println("Sent")
//fmt.Printf(" Done\n")
if toId != -1 {
break
}
}
}
} /*else {
println("Blocked @ Sender %d\n", s.MyId)
}*/
s.SendChan <- 1
}
return 0
}
//ServerBody implementation for Receiver
func (s ServerBody) Receiver() int {
socket, _ := zmq.NewSocket(zmq.PULL)
socket.Bind(s.MyAdd)
defer socket.Close()
//println("Bound to ",s.MyAdd)
for {
//Waiting on RecvChannel to get free
<-s.RecvChan
//println("Recieving")
msg, err := socket.RecvBytes(0)
if err != nil {
log.Fatal(err)
}
//println("Received!")
p := bytes.NewBuffer(msg)
dec := gob.NewDecoder(p)
//Decode the received message into an Envelope
var e Envelope
err = dec.Decode(&e)
if err != nil {
log.Fatal(err)
}
//println(msg)
//Sending on the Inbox channel if not Paused
if !(*s.Cutoff) {
s.InChan <- &e
} /*else {
println("Blocked @ Reciever %d\n", s.MyId)
}*/
//Enabling next Recieve action
s.RecvChan <- 1
}
return 0
}
func AddPeer(id int, config string) Server {
//Struct for handling ConfigData
type ConfigData struct {
Total int //Total number of servers
Ids []int //All the ids
Adds []string //All the addresses (correspondingly)
}
ConfigFile, err := ioutil.ReadFile(config)
if err != nil {
panic(err)
}
//Decoding into a ConfigData
var c ConfigData
err = json.Unmarshal(ConfigFile, &c)
if err != nil {
panic(err)
}
var Me Server
var MyStruct ServerBody
for i, pid := range c.Ids {
if pid == id {
//Initialising Server
MyStruct = ServerBody{pid, c.Adds[i], c.Total, c.Adds /*append(c.Adds[:i], c.Adds[i+1:]...)*/, c.Ids /*append(c.Ids[:i], c.Ids[i+1:]...)*/, make([]*zmq.Socket, c.Total), make(chan *Envelope), make(chan *Envelope), make(chan int, 1), make(chan int, 1), new(bool)}
*MyStruct.Cutoff = false
if PRINT_START == 1 {
fmt.Printf("Starting peer %d at %s ...", id, c.Adds[i])
}
//println(c.Adds[0])
//Enabling Sender and Receiver channels
MyStruct.RecvChan <- 1
MyStruct.SendChan <- 1
go MyStruct.Receiver()
go MyStruct.Sender()
if PRINT_START == 1 {
fmt.Printf(" Server deployed\n")
}
break
}
}
Me = Server(MyStruct)
return Me
}