Skip to content

Commit

Permalink
Allow apps to send heartbeats to server, alert if no connection
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewmarklloyd committed Dec 31, 2021
1 parent b437646 commit d39bcbe
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 8 deletions.
32 changes: 25 additions & 7 deletions scripts/door-light/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"os"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -13,16 +14,20 @@ import (
"github.com/gofrs/uuid"
"github.com/jaedle/golang-tplink-hs100/pkg/configuration"
"github.com/jaedle/golang-tplink-hs100/pkg/hs100"
"github.com/robfig/cron"
)

var (
logger = log.New(os.Stdout, "[Pi-Sensor Outlet] ", log.LstdFlags)
)

const (
sensorStatusChannel = "sensor/status"
OPEN = "OPEN"
CLOSED = "CLOSED"
sensorStatusChannel = "sensor/status"
sensorHeartbeatChannel = "sensor/heartbeat"
heartbeatIntervalSeconds = 60
appSource = "app_door-light"
OPEN = "OPEN"
CLOSED = "CLOSED"
)

// POC for turning on smart outlet when door is open
Expand All @@ -34,22 +39,22 @@ func main() {
logger.Fatalln("at least one broker is required")
}
if *deviceName == "" {
logger.Fatalln("outlet address is required")
logger.Fatalln("device name is required")
}
if *door == "" {
logger.Fatalln("door required")
}

devices, err := hs100.Discover("192.168.1.1/24", configuration.Default().WithTimeout(time.Second))
if err != nil {
logger.Println(fmt.Errorf("Error getting devices: %s", err))
logger.Fatalln(fmt.Errorf("Error getting devices: %s", err))
}

var outlet *hs100.Hs100
for _, d := range devices {
name, err := d.GetName()
if err != nil {
logger.Println(fmt.Errorf("Error getting device name: %s", err))
logger.Fatalln(fmt.Errorf("Error getting device name: %s", err))
}
if name == *deviceName {
outlet = d
Expand All @@ -58,10 +63,16 @@ func main() {
}

if outlet == nil {
logger.Println(fmt.Sprintf("None of discovered devices matches expected device name %s: ", *deviceName), err)
logger.Fatalln(fmt.Sprintf("None of discovered devices matches expected device name %s: ", *deviceName), err)
}

_mqttClient := newMQTTClient(*brokerurl)
cronLib := cron.New()
cronLib.AddFunc(fmt.Sprintf("@every %ds", heartbeatIntervalSeconds), func() {
_mqttClient.publishHeartbeat(appSource, time.Now().UTC().Unix())
})
cronLib.Start()

_mqttClient.Subscribe(sensorStatusChannel, func(messageString string) {
err := triggerOutlet(outlet, messageString, *door)
if err != nil {
Expand Down Expand Up @@ -140,3 +151,10 @@ func triggerOutlet(outlet *hs100.Hs100, messageString string, door string) error
}
return fmt.Errorf(fmt.Sprintf("Message did not contain %s or %s", OPEN, CLOSED))
}

func (c mqttClient) publishHeartbeat(sensorSource string, timestamp int64) {
ts := strconv.FormatInt(timestamp, 10)
text := fmt.Sprintf("%s|%s", sensorSource, ts)
token := c.client.Publish(sensorHeartbeatChannel, 0, false, text)
token.Wait()
}
30 changes: 29 additions & 1 deletion server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"os"
"strconv"
"strings"
"time"
)

Expand Down Expand Up @@ -244,13 +245,36 @@ func main() {
// TODO: investigate Reset instead of creating new timers
currentTimer.Stop()
}
timer := time.AfterFunc(heartbeatTimeout, newHeartbeatTimeoutFunc(heartbeat, messenger))

var timer *time.Timer
if isAppHeartbeat(heartbeat) {
timer = time.AfterFunc(heartbeatTimeout, newAppHeartbeatTimeoutFunc(heartbeat, messenger))
} else {
timer = time.AfterFunc(heartbeatTimeout, newHeartbeatTimeoutFunc(heartbeat, messenger))
}

heartbeatTimerMap[heartbeat.Source] = timer
})

_webServer.startServer()
}

func newAppHeartbeatTimeoutFunc(h Heartbeat, msgr Messenger) func() {
return func() {
handleAppHeartbeatTimeout(h, msgr)
}
}

func handleAppHeartbeatTimeout(h Heartbeat, msgr Messenger) {
logger.Println(fmt.Sprintf("Heartbeat timeout occurred for %s", h.Source))
if !mockMode {
_, err := msgr.SendMessage(fmt.Sprintf("%s has lost connection", h.Source))
if err != nil {
fmt.Println("Error sending app heartbeat timeout message:", err)
}
}
}

func newHeartbeatTimeoutFunc(h Heartbeat, msgr Messenger) func() {
return func() {
handleHeartbeatTimeout(h, msgr)
Expand Down Expand Up @@ -287,6 +311,10 @@ func handleHeartbeatTimeout(h Heartbeat, msgr Messenger) {
}
}

func isAppHeartbeat(h Heartbeat) bool {
return strings.Contains(h.Source, "app_")
}

func newOpenTimeoutFunc(m Message, msgr Messenger, armed bool) func() {
return func() {
handleOpenTimeout(m, msgr, armed)
Expand Down

0 comments on commit d39bcbe

Please sign in to comment.