From ddd3aef3b322f26ffa3459564845f89d9f442302 Mon Sep 17 00:00:00 2001 From: Mark Mulder Date: Sun, 11 Oct 2015 19:17:53 +0100 Subject: [PATCH] email notification from database --- schema.sql | 10 ++++++++ single.rb | 2 +- stats.rb | 6 +++-- udp_listener.go | 62 ++++++++++++++++++++++++++++--------------------- 4 files changed, 51 insertions(+), 29 deletions(-) diff --git a/schema.sql b/schema.sql index cb445a7..d0f7e20 100644 --- a/schema.sql +++ b/schema.sql @@ -1,5 +1,15 @@ +-- psql notifier +-- drop table incoming; +-- drop table notifiers; + CREATE TABLE incoming( id serial primary key, received_at timestamp, data json ); + +CREATE table notifiers( + id serial primary key, + class character(256), + template text +) diff --git a/single.rb b/single.rb index 0a0b369..fd00153 100644 --- a/single.rb +++ b/single.rb @@ -28,5 +28,5 @@ def flush socket = UDPSocket.new s = Stats.new -message = {mark: 'is cool!'} +message = {mark: 'is cool!', number: rand(15)} s.track({'key' => 'mark', 'value' => message}) diff --git a/stats.rb b/stats.rb index 9e45362..721fa05 100644 --- a/stats.rb +++ b/stats.rb @@ -21,6 +21,7 @@ def track(message) def flush @backlog.each do |item| message = item.to_json + puts message socket.send(message, 0, "127.0.0.1", 8000) end @backlog.clear @@ -32,8 +33,9 @@ def flush 10.times do jobs << Thread.new do s = Stats.new - 100.times do |i| - s.track({'key' => 'mark', 'value' => rand(100).to_s}) + 10.times do |i| + message = {mark: 'is cool!', number: rand(10)} + s.track({'key' => 'mark', 'value' => message}) end end end diff --git a/udp_listener.go b/udp_listener.go index e8a1932..47ec059 100644 --- a/udp_listener.go +++ b/udp_listener.go @@ -1,11 +1,11 @@ package main import ( - "database/sql" + // "database/sql" + "bytes" "encoding/json" "fmt" - // _ "github.com/jmoiron/sqlx" - "bytes" + "github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx/types" _ "github.com/lib/pq" "log" @@ -18,7 +18,7 @@ import ( const maxPacketSize = 1024 * 1024 -var db *sql.DB +var db *sqlx.DB type Stat struct { Key string `json:"key"` @@ -30,6 +30,12 @@ type Notifier struct { Template string } +type dbNotifier struct { + Id int `db:"id"` + Class string `db:"class"` + Template string `db:"template"` +} + func (s *Stat) persist() { var incomingId int err := db.QueryRow(`INSERT INTO incoming(received_at, data) VALUES($1, $2) RETURNING id`, time.Now(), s.Value).Scan(&incomingId) @@ -40,36 +46,42 @@ func (s *Stat) persist() { } func (s *Stat) notify() { - sendEmail(s.Key, string(s.Value)) -} + // find (active) notifiers + // execute them all -func (s *Stat) specialNotify() { var err error - var doc bytes.Buffer - n := Notifier{ - "mark", - "{{.Number}}: is pretty awesome!, {{.Yeah}} nigga", + notifiers := []dbNotifier{} + err = db.Select(¬ifiers, "SELECT * FROM notifiers WHERE class=$1", s.Key) + if err != nil { + log.Fatal("db.Select ", err) + } + fmt.Printf("Found %d notifiers\n", len(notifiers)) + for i := 0; i < len(notifiers); i++ { + s.specialNotify(¬ifiers[i]) } +} + +func (s *Stat) specialNotify(notifier *dbNotifier) { + fmt.Printf("Notifying notifier id: %d\n", notifier.Id) + var err error + var doc bytes.Buffer t := template.New("notificationTemplate") - t, err = t.Parse(n.Template) + t, err = t.Parse(notifier.Template) if err != nil { log.Fatal("t.Parse of n.Template", err) } - jmap := map[string]string{ - "Number": "1000test", - "Yeah": "yeayeah", - } - err = t.Execute(&doc, jmap) + m := map[string]interface{}{} + s.Value.Unmarshal(&m) + + err = t.Execute(&doc, m) if err != nil { log.Fatal("t.Execute ", err) } - fmt.Println(doc) - sendEmail(s.Key, string(doc.Bytes())) - + sendEmail(s.Key, doc.Bytes()) } func countRows() int { @@ -102,8 +114,7 @@ func listenToUDP(conn *net.UDPConn) { } stat.persist() - // stat.notify() - stat.specialNotify() + stat.notify() } } @@ -144,7 +155,7 @@ type EmailData struct { Body string } -func sendEmail(class string, data string) { +func sendEmail(class string, data []byte) { var err error var doc bytes.Buffer @@ -153,12 +164,11 @@ func sendEmail(class string, data string) { if err != nil { log.Fatal("t.Parse ", err) } - // bodyString := fmt.Sprintf("

class: %s

\\n

data: %s

", class, data) context := &EmailData{ "Springest Dev ", "recipient@example.com", "Email subject line", - data, + string(data), } err = t.Execute(&doc, context) if err != nil { @@ -182,7 +192,7 @@ func main() { log.Fatal("ListenUDP", err) } - db, err = sql.Open("postgres", "user=markmulder dbname=notifier sslmode=disable") + db, err = sqlx.Connect("postgres", "user=markmulder dbname=notifier sslmode=disable") if err != nil { log.Fatal("DB Open()", err) }