diff --git a/elasticsearch/client.go b/elasticsearch/client.go new file mode 100644 index 0000000..abbf846 --- /dev/null +++ b/elasticsearch/client.go @@ -0,0 +1,39 @@ +package elasticsearch + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "log" + "net/http" + "time" +) + +type ESPayload struct { + Key string `json:"key"` + ReceivedAt time.Time `json:"received_at"` + Data map[string]interface{} `json:"data"` +} + +func Persist(key string, data map[string]interface{}) { + fmt.Println("Printing from ES Package: ", key) + fmt.Println("Printing from ES Package: ", data) + + payload := ESPayload{ + Key: key, + ReceivedAt: time.Now(), + Data: data, + } + body, _ := json.Marshal(payload) + resp, err := http.Post("http://localhost:9200/notifilter/event/?pretty", "application/json", bytes.NewReader(body)) + if err != nil { + log.Fatal(err) + } + defer resp.Body.Close() + body, err = ioutil.ReadAll(resp.Body) + if err != nil { + log.Fatal(err) + } + log.Print(string(body)) +} diff --git a/notifier.go b/notifier.go index 4ad3259..1b33040 100644 --- a/notifier.go +++ b/notifier.go @@ -88,6 +88,6 @@ func (n *Notifier) notify(s *Event, mn MessageNotifier) { } message := n.renderTemplate(s) - mn.sendMessage(s.Key, message) + mn.sendMessage(s.Identifier, message) fmt.Printf("Notifying notifier id: done\n", n.Id) } diff --git a/rules.go b/rules.go index 64383d2..ee6dc46 100644 --- a/rules.go +++ b/rules.go @@ -15,7 +15,7 @@ type Rule struct { func (r *Rule) Met(e *Event) bool { var parsed map[string]interface{} - err := json.Unmarshal([]byte(e.Value), &parsed) + err := json.Unmarshal([]byte(e.Data), &parsed) if err != nil { log.Fatal("json.Unmarshal", err) } diff --git a/udp_listener.go b/udp_listener.go index 7f00371..06336c6 100644 --- a/udp_listener.go +++ b/udp_listener.go @@ -1,16 +1,15 @@ package main import ( - "bytes" "encoding/json" "fmt" - "io/ioutil" "log" "net" "net/http" "runtime" "time" + "github.com/bittersweet/notifilter/elasticsearch" "github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx/types" _ "github.com/lib/pq" @@ -20,11 +19,13 @@ const maxPacketSize = 1024 * 1024 var db *sqlx.DB +// Struct that will keep incoming data type Event struct { - Key string `json:"key"` - Value types.JsonText `json:"value"` + Identifier string `json:"identifier"` + Data types.JsonText `json:"data"` } +// Data received and persisted to the DB type Incoming struct { Id int `db:"id"` Class string `db:"class"` @@ -32,12 +33,6 @@ type Incoming struct { Data string `db:"data"` } -type ESPayload struct { - Key string `json:"key"` - ReceivedAt time.Time `json:"received_at"` - Data map[string]interface{} `json:"data"` -} - func (i *Incoming) FormattedData() string { return string(i.Data) } @@ -54,48 +49,32 @@ func (i *Incoming) toMap() map[string]interface{} { func (e *Event) toMap() map[string]interface{} { m := map[string]interface{}{} - e.Value.Unmarshal(&m) + e.Data.Unmarshal(&m) return m } func (e *Event) persist() { var incomingID int query := `INSERT INTO incoming(received_at, class, data) VALUES($1, $2, $3) RETURNING id` - err := db.QueryRow(query, time.Now(), e.Key, e.Value).Scan(&incomingID) + err := db.QueryRow(query, time.Now(), e.Identifier, e.Data).Scan(&incomingID) if err != nil { log.Fatal("persist()", err) } - fmt.Printf("class: %s id: %d\n", e.Key, incomingID) + + elasticsearch.Persist(e.Identifier, e.toMap()) + + fmt.Printf("class: %s id: %d\n", e.Identifier, incomingID) } func (e *Event) notify() { notifiers := []Notifier{} - err := db.Select(¬ifiers, "SELECT * FROM notifiers WHERE class=$1", e.Key) + err := db.Select(¬ifiers, "SELECT * FROM notifiers WHERE class=$1", e.Identifier) if err != nil { log.Fatal("db.Select ", err) } fmt.Printf("Incoming data: %v\n", e.toMap()) fmt.Printf("Found %d notifiers\n", len(notifiers)) - go func() { - payload := ESPayload{ - Key: e.Key, - ReceivedAt: time.Now(), - Data: e.toMap(), - } - body, _ := json.Marshal(payload) - resp, err := http.Post("http://localhost:9200/notifilter/event/?pretty", "application/json", bytes.NewReader(body)) - if err != nil { - log.Fatal(err) - } - defer resp.Body.Close() - body, err = ioutil.ReadAll(resp.Body) - if err != nil { - log.Fatal(err) - } - log.Print(string(body)) - }() - for i := 0; i < len(notifiers); i++ { notifier := notifiers[i] notifier.notify(e, notifier.newNotifier())