From d39bcbe32578f50928f93d5f5599e108ba92d5d7 Mon Sep 17 00:00:00 2001 From: Andrew Lloyd Date: Fri, 31 Dec 2021 15:57:56 -0800 Subject: [PATCH] Allow apps to send heartbeats to server, alert if no connection --- scripts/door-light/main.go | 32 +++++++++++++++++++++++++------- server/main.go | 30 +++++++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 8 deletions(-) diff --git a/scripts/door-light/main.go b/scripts/door-light/main.go index 8b5e1c0..3cb81d7 100644 --- a/scripts/door-light/main.go +++ b/scripts/door-light/main.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "os" + "strconv" "strings" "sync" "time" @@ -13,6 +14,7 @@ import ( "github.com/gofrs/uuid" "github.com/jaedle/golang-tplink-hs100/pkg/configuration" "github.com/jaedle/golang-tplink-hs100/pkg/hs100" + "github.com/robfig/cron" ) var ( @@ -20,9 +22,12 @@ var ( ) const ( - sensorStatusChannel = "sensor/status" - OPEN = "OPEN" - CLOSED = "CLOSED" + sensorStatusChannel = "sensor/status" + sensorHeartbeatChannel = "sensor/heartbeat" + heartbeatIntervalSeconds = 60 + appSource = "app_door-light" + OPEN = "OPEN" + CLOSED = "CLOSED" ) // POC for turning on smart outlet when door is open @@ -34,7 +39,7 @@ func main() { logger.Fatalln("at least one broker is required") } if *deviceName == "" { - logger.Fatalln("outlet address is required") + logger.Fatalln("device name is required") } if *door == "" { logger.Fatalln("door required") @@ -42,14 +47,14 @@ func main() { devices, err := hs100.Discover("192.168.1.1/24", configuration.Default().WithTimeout(time.Second)) if err != nil { - logger.Println(fmt.Errorf("Error getting devices: %s", err)) + logger.Fatalln(fmt.Errorf("Error getting devices: %s", err)) } var outlet *hs100.Hs100 for _, d := range devices { name, err := d.GetName() if err != nil { - logger.Println(fmt.Errorf("Error getting device name: %s", err)) + logger.Fatalln(fmt.Errorf("Error getting device name: %s", err)) } if name == *deviceName { outlet = d @@ -58,10 +63,16 @@ func main() { } if outlet == nil { - logger.Println(fmt.Sprintf("None of discovered devices matches expected device name %s: ", *deviceName), err) + logger.Fatalln(fmt.Sprintf("None of discovered devices matches expected device name %s: ", *deviceName), err) } _mqttClient := newMQTTClient(*brokerurl) + cronLib := cron.New() + cronLib.AddFunc(fmt.Sprintf("@every %ds", heartbeatIntervalSeconds), func() { + _mqttClient.publishHeartbeat(appSource, time.Now().UTC().Unix()) + }) + cronLib.Start() + _mqttClient.Subscribe(sensorStatusChannel, func(messageString string) { err := triggerOutlet(outlet, messageString, *door) if err != nil { @@ -140,3 +151,10 @@ func triggerOutlet(outlet *hs100.Hs100, messageString string, door string) error } return fmt.Errorf(fmt.Sprintf("Message did not contain %s or %s", OPEN, CLOSED)) } + +func (c mqttClient) publishHeartbeat(sensorSource string, timestamp int64) { + ts := strconv.FormatInt(timestamp, 10) + text := fmt.Sprintf("%s|%s", sensorSource, ts) + token := c.client.Publish(sensorHeartbeatChannel, 0, false, text) + token.Wait() +} diff --git a/server/main.go b/server/main.go index 0c587fa..44c6b7d 100644 --- a/server/main.go +++ b/server/main.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "strconv" + "strings" "time" ) @@ -244,13 +245,36 @@ func main() { // TODO: investigate Reset instead of creating new timers currentTimer.Stop() } - timer := time.AfterFunc(heartbeatTimeout, newHeartbeatTimeoutFunc(heartbeat, messenger)) + + var timer *time.Timer + if isAppHeartbeat(heartbeat) { + timer = time.AfterFunc(heartbeatTimeout, newAppHeartbeatTimeoutFunc(heartbeat, messenger)) + } else { + timer = time.AfterFunc(heartbeatTimeout, newHeartbeatTimeoutFunc(heartbeat, messenger)) + } + heartbeatTimerMap[heartbeat.Source] = timer }) _webServer.startServer() } +func newAppHeartbeatTimeoutFunc(h Heartbeat, msgr Messenger) func() { + return func() { + handleAppHeartbeatTimeout(h, msgr) + } +} + +func handleAppHeartbeatTimeout(h Heartbeat, msgr Messenger) { + logger.Println(fmt.Sprintf("Heartbeat timeout occurred for %s", h.Source)) + if !mockMode { + _, err := msgr.SendMessage(fmt.Sprintf("%s has lost connection", h.Source)) + if err != nil { + fmt.Println("Error sending app heartbeat timeout message:", err) + } + } +} + func newHeartbeatTimeoutFunc(h Heartbeat, msgr Messenger) func() { return func() { handleHeartbeatTimeout(h, msgr) @@ -287,6 +311,10 @@ func handleHeartbeatTimeout(h Heartbeat, msgr Messenger) { } } +func isAppHeartbeat(h Heartbeat) bool { + return strings.Contains(h.Source, "app_") +} + func newOpenTimeoutFunc(m Message, msgr Messenger, armed bool) func() { return func() { handleOpenTimeout(m, msgr, armed)