forked from spacecloud-io/space-cloud
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwebsocket.go
129 lines (114 loc) · 3.65 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package main
import (
"log"
"net/http"
"time"
"github.com/gorilla/websocket"
"github.com/mitchellh/mapstructure"
"github.com/spaceuptech/space-cloud/model"
"github.com/spaceuptech/space-cloud/modules/auth"
"github.com/spaceuptech/space-cloud/modules/crud"
"github.com/spaceuptech/space-cloud/modules/realtime"
"github.com/spaceuptech/space-cloud/utils"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func handleWebsocket(realtime *realtime.Module, auth *auth.Module, crud *crud.Module) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("upgrade:", err)
return
}
client := utils.CreateWebsocketClient(c)
defer client.Close()
defer realtime.RemoveClient(client.ClientID())
client.RoutineWrite()
client.Read(func(req *model.Message) {
switch req.Type {
case utils.TypeRealtimeSubscribe:
// For realtime subscribe event
data := new(model.RealtimeRequest)
mapstructure.Decode(req.Data, data)
// Check if the user is authenticated
authObj, err := auth.IsAuthenticated(data.Token, data.DBType, data.Group, utils.Read)
if err != nil {
client.Write(model.Message{
ID: req.ID,
Type: req.Type,
Data: model.RealtimeResponse{Group: data.Group, ID: data.ID, Ack: false, Error: err.Error()},
})
return
}
// Create an args object
args := map[string]interface{}{
"args": map[string]interface{}{"find": data.Where, "op": utils.All, "auth": authObj},
"project": data.Project, // Don't forget to do this for every request
}
// Check if user is authorized to make this request
err = auth.IsAuthorized(data.DBType, data.Group, utils.Read, args)
if err != nil {
client.Write(model.Message{
ID: req.ID,
Type: req.Type,
Data: model.RealtimeResponse{Group: data.Group, ID: data.ID, Ack: false, Error: err.Error()},
})
return
}
readReq := model.ReadRequest{Find: data.Where, Operation: utils.All}
result, err := crud.Read(client.Context, data.DBType, data.Project, data.Group, &readReq)
if err != nil {
client.Write(model.Message{
ID: req.ID,
Type: req.Type,
Data: model.RealtimeResponse{Group: data.Group, ID: data.ID, Ack: false, Error: err.Error()},
})
return
}
feedData := []*model.FeedData{}
array, ok := result.([]interface{})
if ok {
timeStamp := time.Now().Unix()
for _, row := range array {
payload := row.(map[string]interface{})
idVar := "id"
if data.DBType == string(utils.Mongo) {
idVar = "_id"
}
if docID, ok := payload[idVar].(string); ok {
feedData = append(feedData, &model.FeedData{
Group: data.Group,
Type: utils.RealtimeWrite,
TimeStamp: timeStamp,
DocID: docID,
DBType: data.DBType,
Payload: payload,
QueryID: data.ID,
})
}
}
}
// Add the live query
realtime.AddLiveQuery(data.ID, data.Group, client, data.Where)
client.Write(model.Message{
ID: req.ID,
Type: req.Type,
Data: model.RealtimeResponse{Group: data.Group, ID: data.ID, Ack: true, Docs: feedData},
})
case utils.TypeRealtimeUnsubscribe:
// For realtime unsubscribe event
data := new(model.RealtimeRequest)
mapstructure.Decode(req.Data, data)
realtime.RemoveLiveQuery(data.Group, client.ClientID(), data.ID)
client.Write(model.Message{
ID: req.ID,
Type: req.Type,
Data: model.RealtimeResponse{Group: data.Group, ID: data.ID, Ack: true},
})
}
})
}
}