forked from bittersweet/notifilter-receive
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnotifilter.go
129 lines (109 loc) · 2.85 KB
/
notifilter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package main
import (
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"runtime"
"github.com/bittersweet/notifilter/elasticsearch"
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/types"
_ "github.com/lib/pq"
)
const maxPacketSize = 1024 * 1024
var db *sqlx.DB
// Struct that will keep incoming data
type Event struct {
Identifier string `json:"identifier"`
Data types.JsonText `json:"data"`
}
// toMap transforms the raw JSON data into a map
func (e *Event) toMap() map[string]interface{} {
m := map[string]interface{}{}
e.Data.Unmarshal(&m)
return m
}
// persist saves the incoming event to Elasticsearch
func (e *Event) persist() {
err := elasticsearch.Persist(e.Identifier, e.toMap())
if err != nil {
log.Fatal("Error persisting to ElasticSearch:", err)
}
}
// notify checks to see if we have notifiers set up for this event and if the
// rules for those notifications have been satisfied
func (e *Event) notify() {
notifiers := []Notifier{}
err := db.Select(¬ifiers, "SELECT * FROM notifiers WHERE event_name=$1", e.Identifier)
if err != nil {
log.Fatal("db.Select ", err)
}
fmt.Printf("Found %d notifiers\n", len(notifiers))
for i := 0; i < len(notifiers); i++ {
notifier := notifiers[i]
notifier.notify(e, notifier.newNotifier())
}
}
// incomingItems creates a channel that we can place events on so the main loop
// can keep listening to incoming events
func incomingItems() chan<- []byte {
incomingChan := make(chan []byte)
go func() {
for {
select {
case b := <-incomingChan:
var Event Event
err := json.Unmarshal(b, &Event)
if err != nil {
log.Println(err)
log.Printf("%+v\n", Event)
}
Event.persist()
Event.notify()
}
}
}()
fmt.Println("incomingItems launched")
return incomingChan
}
// listenToUDP opens a UDP connection that we will listen on
func listenToUDP(conn *net.UDPConn) {
incomingChan := incomingItems()
buffer := make([]byte, maxPacketSize)
for {
bytes, err := conn.Read(buffer)
if err != nil {
log.Println("UDP read error: ", err.Error())
continue
}
msg := make([]byte, bytes)
copy(msg, buffer)
incomingChan <- msg
}
}
// main handles setting up connections for UDP/TCP and connecting to Postgres
// Note, sqlx uses a connection pool internally.
func main() {
runtime.GOMAXPROCS(4)
addr, err := net.ResolveUDPAddr("udp", ":8000")
if err != nil {
log.Fatal("ResolveUDPAddr", err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
log.Fatal("ListenUDP", err)
}
go listenToUDP(conn)
db, err = sqlx.Connect("postgres", "user=markmulder dbname=notifilter_development sslmode=disable")
if err != nil {
log.Fatal("DB Open()", err)
}
defer db.Close()
http.HandleFunc("/v1/count", handleCount)
fmt.Println("Will start listening on port 8000")
http.ListenAndServe(":8000", nil)
if err != nil {
log.Fatal("ListenAndServe ", err)
}
}