Skip to content

Commit

Permalink
Extract ES code to separate package
Browse files Browse the repository at this point in the history
  • Loading branch information
bittersweet committed Oct 11, 2015
1 parent 2444083 commit ea1dc9f
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 35 deletions.
39 changes: 39 additions & 0 deletions elasticsearch/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package elasticsearch

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"time"
)

type ESPayload struct {
Key string `json:"key"`
ReceivedAt time.Time `json:"received_at"`
Data map[string]interface{} `json:"data"`
}

func Persist(key string, data map[string]interface{}) {
fmt.Println("Printing from ES Package: ", key)
fmt.Println("Printing from ES Package: ", data)

payload := ESPayload{
Key: key,
ReceivedAt: time.Now(),
Data: data,
}
body, _ := json.Marshal(payload)
resp, err := http.Post("http://localhost:9200/notifilter/event/?pretty", "application/json", bytes.NewReader(body))
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
log.Print(string(body))
}
2 changes: 1 addition & 1 deletion notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,6 @@ func (n *Notifier) notify(s *Event, mn MessageNotifier) {
}

message := n.renderTemplate(s)
mn.sendMessage(s.Key, message)
mn.sendMessage(s.Identifier, message)
fmt.Printf("Notifying notifier id: done\n", n.Id)
}
2 changes: 1 addition & 1 deletion rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Rule struct {

func (r *Rule) Met(e *Event) bool {
var parsed map[string]interface{}
err := json.Unmarshal([]byte(e.Value), &parsed)
err := json.Unmarshal([]byte(e.Data), &parsed)
if err != nil {
log.Fatal("json.Unmarshal", err)
}
Expand Down
45 changes: 12 additions & 33 deletions udp_listener.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"runtime"
"time"

"github.com/bittersweet/notifilter/elasticsearch"
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/types"
_ "github.com/lib/pq"
Expand All @@ -20,24 +19,20 @@ const maxPacketSize = 1024 * 1024

var db *sqlx.DB

// Struct that will keep incoming data
type Event struct {
Key string `json:"key"`
Value types.JsonText `json:"value"`
Identifier string `json:"identifier"`
Data types.JsonText `json:"data"`
}

// Data received and persisted to the DB
type Incoming struct {
Id int `db:"id"`
Class string `db:"class"`
ReceivedAt time.Time `db:"received_at"`
Data string `db:"data"`
}

type ESPayload struct {
Key string `json:"key"`
ReceivedAt time.Time `json:"received_at"`
Data map[string]interface{} `json:"data"`
}

func (i *Incoming) FormattedData() string {
return string(i.Data)
}
Expand All @@ -54,48 +49,32 @@ func (i *Incoming) toMap() map[string]interface{} {

func (e *Event) toMap() map[string]interface{} {
m := map[string]interface{}{}
e.Value.Unmarshal(&m)
e.Data.Unmarshal(&m)
return m
}

func (e *Event) persist() {
var incomingID int
query := `INSERT INTO incoming(received_at, class, data) VALUES($1, $2, $3) RETURNING id`
err := db.QueryRow(query, time.Now(), e.Key, e.Value).Scan(&incomingID)
err := db.QueryRow(query, time.Now(), e.Identifier, e.Data).Scan(&incomingID)
if err != nil {
log.Fatal("persist()", err)
}
fmt.Printf("class: %s id: %d\n", e.Key, incomingID)

elasticsearch.Persist(e.Identifier, e.toMap())

fmt.Printf("class: %s id: %d\n", e.Identifier, incomingID)
}

func (e *Event) notify() {
notifiers := []Notifier{}
err := db.Select(&notifiers, "SELECT * FROM notifiers WHERE class=$1", e.Key)
err := db.Select(&notifiers, "SELECT * FROM notifiers WHERE class=$1", e.Identifier)
if err != nil {
log.Fatal("db.Select ", err)
}
fmt.Printf("Incoming data: %v\n", e.toMap())
fmt.Printf("Found %d notifiers\n", len(notifiers))

go func() {
payload := ESPayload{
Key: e.Key,
ReceivedAt: time.Now(),
Data: e.toMap(),
}
body, _ := json.Marshal(payload)
resp, err := http.Post("http://localhost:9200/notifilter/event/?pretty", "application/json", bytes.NewReader(body))
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
log.Print(string(body))
}()

for i := 0; i < len(notifiers); i++ {
notifier := notifiers[i]
notifier.notify(e, notifier.newNotifier())
Expand Down

0 comments on commit ea1dc9f

Please sign in to comment.