Skip to content

Commit

Permalink
email notification from database
Browse files Browse the repository at this point in the history
  • Loading branch information
bittersweet committed Oct 11, 2015
1 parent fcfb380 commit ddd3aef
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 29 deletions.
10 changes: 10 additions & 0 deletions schema.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
-- psql notifier
-- drop table incoming;
-- drop table notifiers;

CREATE TABLE incoming(
id serial primary key,
received_at timestamp,
data json
);

CREATE table notifiers(
id serial primary key,
class character(256),
template text
)
2 changes: 1 addition & 1 deletion single.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ def flush

socket = UDPSocket.new
s = Stats.new
message = {mark: 'is cool!'}
message = {mark: 'is cool!', number: rand(15)}
s.track({'key' => 'mark', 'value' => message})
6 changes: 4 additions & 2 deletions stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def track(message)
def flush
@backlog.each do |item|
message = item.to_json
puts message
socket.send(message, 0, "127.0.0.1", 8000)
end
@backlog.clear
Expand All @@ -32,8 +33,9 @@ def flush
10.times do
jobs << Thread.new do
s = Stats.new
100.times do |i|
s.track({'key' => 'mark', 'value' => rand(100).to_s})
10.times do |i|
message = {mark: 'is cool!', number: rand(10)}
s.track({'key' => 'mark', 'value' => message})
end
end
end
Expand Down
62 changes: 36 additions & 26 deletions udp_listener.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package main

import (
"database/sql"
// "database/sql"
"bytes"
"encoding/json"
"fmt"
// _ "github.com/jmoiron/sqlx"
"bytes"
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/types"
_ "github.com/lib/pq"
"log"
Expand All @@ -18,7 +18,7 @@ import (

const maxPacketSize = 1024 * 1024

var db *sql.DB
var db *sqlx.DB

type Stat struct {
Key string `json:"key"`
Expand All @@ -30,6 +30,12 @@ type Notifier struct {
Template string
}

type dbNotifier struct {
Id int `db:"id"`
Class string `db:"class"`
Template string `db:"template"`
}

func (s *Stat) persist() {
var incomingId int
err := db.QueryRow(`INSERT INTO incoming(received_at, data) VALUES($1, $2) RETURNING id`, time.Now(), s.Value).Scan(&incomingId)
Expand All @@ -40,36 +46,42 @@ func (s *Stat) persist() {
}

func (s *Stat) notify() {
sendEmail(s.Key, string(s.Value))
}
// find (active) notifiers
// execute them all

func (s *Stat) specialNotify() {
var err error
var doc bytes.Buffer

n := Notifier{
"mark",
"{{.Number}}: is pretty awesome!, {{.Yeah}} nigga",
notifiers := []dbNotifier{}
err = db.Select(&notifiers, "SELECT * FROM notifiers WHERE class=$1", s.Key)
if err != nil {
log.Fatal("db.Select ", err)
}
fmt.Printf("Found %d notifiers\n", len(notifiers))
for i := 0; i < len(notifiers); i++ {
s.specialNotify(&notifiers[i])
}
}

func (s *Stat) specialNotify(notifier *dbNotifier) {
fmt.Printf("Notifying notifier id: %d\n", notifier.Id)
var err error
var doc bytes.Buffer

t := template.New("notificationTemplate")
t, err = t.Parse(n.Template)
t, err = t.Parse(notifier.Template)
if err != nil {
log.Fatal("t.Parse of n.Template", err)
}

jmap := map[string]string{
"Number": "1000test",
"Yeah": "yeayeah",
}
err = t.Execute(&doc, jmap)
m := map[string]interface{}{}
s.Value.Unmarshal(&m)

err = t.Execute(&doc, m)
if err != nil {
log.Fatal("t.Execute ", err)
}

fmt.Println(doc)
sendEmail(s.Key, string(doc.Bytes()))

sendEmail(s.Key, doc.Bytes())
}

func countRows() int {
Expand Down Expand Up @@ -102,8 +114,7 @@ func listenToUDP(conn *net.UDPConn) {
}

stat.persist()
// stat.notify()
stat.specialNotify()
stat.notify()
}
}

Expand Down Expand Up @@ -144,7 +155,7 @@ type EmailData struct {
Body string
}

func sendEmail(class string, data string) {
func sendEmail(class string, data []byte) {
var err error
var doc bytes.Buffer

Expand All @@ -153,12 +164,11 @@ func sendEmail(class string, data string) {
if err != nil {
log.Fatal("t.Parse ", err)
}
// bodyString := fmt.Sprintf("<h1>class: %s</h1>\\n<p>data: %s</p>", class, data)
context := &EmailData{
"Springest Dev <[email protected]>",
"[email protected]",
"Email subject line",
data,
string(data),
}
err = t.Execute(&doc, context)
if err != nil {
Expand All @@ -182,7 +192,7 @@ func main() {
log.Fatal("ListenUDP", err)
}

db, err = sql.Open("postgres", "user=markmulder dbname=notifier sslmode=disable")
db, err = sqlx.Connect("postgres", "user=markmulder dbname=notifier sslmode=disable")
if err != nil {
log.Fatal("DB Open()", err)
}
Expand Down

0 comments on commit ddd3aef

Please sign in to comment.