-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
106 lines (98 loc) · 3.03 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package main
import (
"bytes"
"database/sql"
"encoding/binary"
"fmt"
_ "github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/dialers/postgres"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/golang/protobuf/proto"
"log"
"time"
)
const (
IngressTopic = "alert-topic" // To consume from
AlertToNotifyTopic = "to-notify-topic" // To publish to
)
func main() {
consumer := getConsumer(IngressTopic)
defer consumer.Close()
for {
ev := consumer.Poll(0)
switch e := ev.(type) {
case *kafka.Message:
alert := createAlert(e.Value)
alertId := persistAlert(alert)
pingNotificationService(alertId)
case kafka.PartitionEOF:
log.Printf("%% Reached %v\n", e)
case kafka.Error:
log.Fatalf("%% Error: %v\n", e)
}
}
}
// pingNotificationService publishes the id to a Kafka topic for it to be processed by a notification service.
func pingNotificationService(id int) {
buf := new(bytes.Buffer)
err := binary.Write(buf, binary.LittleEndian, uint64(id))
if err != nil {
log.Fatalf("pingNotificationService failed: %v\n", err)
}
publish(buf.Bytes(), AlertToNotifyTopic)
}
// createAlert creates an alert from bytes.
func createAlert(msg []byte) *Alert {
alert := &Alert{}
if err := proto.Unmarshal(msg, alert); err != nil {
log.Fatalf("failed to unmarshal kafka message into alert: %v\n", err)
}
return alert
}
// persistAlert saves the alert to a PostgreSQL store return the id of the alert created
func persistAlert(alert *Alert) int {
dsn := fmt.Sprintf("host=%s dbname=%s user=%s password=%s sslmode=disable",
InstanceConnectionName,
DatabaseName,
DatabaseUser,
Password)
db, err := sql.Open("cloudsqlpostgres", dsn)
if err != nil {
log.Fatalf("failed to open DB: %v\n", err)
}
// Insert image
//goland:noinspection SqlResolve
rows, err := db.Query("INSERT INTO image (format, width, height, data) VALUES ($1, $2, $3, $4) RETURNING id;",
alert.Image.Format,
alert.Image.Size.Width,
alert.Image.Size.Height,
alert.Image.Data,
)
if err != nil {
log.Fatalf("failed to insert image: %v\n", err)
}
var imageId int
for rows.Next() {
if err = rows.Scan(&imageId); err != nil {
log.Fatalf("failed to recover last image inserted id: %v\n", err)
}
}
_ = rows.Close()
log.Printf("Saved image of type %s (%dH x %dW) with id %d\n", alert.Image.Format, alert.Image.Size.Height, alert.Image.Size.Width, imageId)
// Insert alert record
receivedAt := time.Now()
//goland:noinspection ALL
query := "INSERT INTO alert (event_time, received_at, device_id, face_model_id, mask_model_id, image_id) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id;"
rows, err = db.Query(query, alert.EventTime, receivedAt, alert.CreatedBy.Guid, alert.FaceDetectionModel.Guid, alert.MaskClassifierModel.Guid, imageId)
if err != nil {
log.Fatalf("failed to execute query: %v\n", err)
}
var alertId int
for rows.Next() {
if err = rows.Scan(&alertId); err != nil {
log.Fatalf("failed to recover last alert inserted id: %v\n", err)
}
}
_ = rows.Close()
log.Printf("Saved alert with id #%d\n", alertId)
return alertId
}