Skip to content

Commit

Permalink
Handle heartbeat timeout by sending unknown status to front end
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewmarklloyd committed Jun 1, 2021
1 parent 451e7e2 commit 115300d
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 20 deletions.
2 changes: 1 addition & 1 deletion client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func main() {

configureHeartbeat(mqttClient, *sensorSource)

lastStatus := "CLOSED"
lastStatus := UNKNOWN
var currentStatus string
for true {
currentStatus = pinClient.CurrentStatus()
Expand Down
19 changes: 15 additions & 4 deletions server/frontend/src/DataModel.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
function translateStatus(status) {
return {
color: status === "OPEN" ? "red" : "green",
icon: status === "OPEN" ? "unlock" : "lock"
}
var icon, color
if (status === "OPEN") {
icon = "unlock"
color = "red"
} else if (status === "CLOSED") {
icon = "lock"
color = "green"
} else {
icon = "zap-off"
color = "grey"
}
return {
color,
icon
}
}

function timeSince(unixTimestamp) {
Expand Down
5 changes: 4 additions & 1 deletion server/frontend/src/Sensor.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ class Sensor extends Component {
this.props.socket.on("sensor/status", function(data) {
if (data.source === source) {
var updated = translateStatus(data.status)
if (data.source === "front-door") {
console.log(updated)
}
component.setState({
color: updated.color,
source: data.source,
Expand Down Expand Up @@ -50,7 +53,7 @@ class Sensor extends Component {
return (
<StampCard
color={this.state.color !== "" ? this.state.color : this.props.color}
icon={this.state.icon !== "zap-off" ? this.state.icon : this.props.icon}
icon={this.state.icon !== "" ? this.state.icon : this.props.icon}
header={
<Link
to={{
Expand Down
45 changes: 35 additions & 10 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,12 @@ func main() {
alertIfOpen(lastMessage, message, messenger)
if message.Status == "OPEN" {
timer := time.AfterFunc(openTimeout, func() {
messenger.SendMessage(fmt.Sprintf("%s opened longer than %s", message.Source, openTimeout))
message := fmt.Sprintf("%s opened longer than %s", message.Source, openTimeout)
if mockMode {
logger.Println(message)
} else {
messenger.SendMessage(message)
}
})
delayTimerMap[message.Source] = timer
} else if message.Status == "CLOSED" {
Expand All @@ -140,7 +145,6 @@ func main() {
logger.Println(fmt.Sprintf("Message status '%s' not recognized", message.Status))
}

_webServer.sendMessage(sensorStatusChannel, message)
err := _redisClient.WriteState(message.Source, messageString)
if err == nil {
_webServer.sendMessage(sensorStatusChannel, message)
Expand All @@ -149,24 +153,45 @@ func main() {
}
})

var heartbeatMap map[string]*time.Timer = make(map[string]*time.Timer)
var heartbeatTimerMap map[string]*time.Timer = make(map[string]*time.Timer)
mqttClient.Subscribe(sensorHeartbeatChannel, func(messageString string) {
heartbeat := toHeartbeat(messageString)
currentTimer := heartbeatMap[heartbeat.Source]
currentTimer := heartbeatTimerMap[heartbeat.Source]
if currentTimer != nil {
currentTimer.Stop()
}
timer := time.AfterFunc(heartbeatTimeout, func() {
logger.Println(fmt.Sprintf("Heartbeat timeout occurred for %s", heartbeat.Source))
})
heartbeatMap[heartbeat.Source] = timer
timer := time.AfterFunc(heartbeatTimeout, newHeartbeatTimeoutFunc(heartbeat))
heartbeatTimerMap[heartbeat.Source] = timer
})

_webServer.startServer()
}

func newHeartbeatTimeoutFunc(h Heartbeat) func() {
return func() {
handleHeartbeatTimeout(h)
}
}

func handleHeartbeatTimeout(h Heartbeat) {
logger.Println(fmt.Sprintf("Heartbeat timeout occurred for %s", h.Source))
messageString, err := _redisClient.ReadState(h.Source)
if err == nil {
message := toStruct(messageString)
message.Status = UNKNOWN
err := _redisClient.WriteState(message.Source, toString(message))
if err != nil {
logger.Println(fmt.Sprintf("Error writing message state after heartbeat timeout. Message: %s", messageString))
} else {
_webServer.sendMessage(sensorStatusChannel, message)
}
} else {
logger.Println(err)
}
}

func alertIfOpen(lastMessage Message, currentMessage Message, messenger Messenger) {
if lastMessage.Status == "CLOSED" && currentMessage.Status == "OPEN" {
if (lastMessage.Status == CLOSED && currentMessage.Status == OPEN) || (lastMessage.Status == UNKNOWN && currentMessage.Status == OPEN) {
if mockMode {
logger.Println(fmt.Sprintf("%s was just opened", currentMessage.Source))
} else {
Expand All @@ -178,6 +203,6 @@ func alertIfOpen(lastMessage Message, currentMessage Message, messenger Messenge
} else if lastMessage.Status == "OPEN" && currentMessage.Status == "CLOSED" {
// intentionally do nothing
} else {
logger.Println(fmt.Sprintf("Door status was not changed from open to closed OR from closed to open. Last status: %s, current status: %s", lastMessage.Status, currentMessage.Status))
logger.Println(fmt.Sprintf("Door status change was not recognized. Last status: %s, current status: %s", lastMessage.Status, currentMessage.Status))
}
}
3 changes: 3 additions & 0 deletions server/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (

const (
delimiter = "|"
OPEN = "OPEN"
CLOSED = "CLOSED"
UNKNOWN = "UNKNOWN"
)

type Message struct {
Expand Down
8 changes: 4 additions & 4 deletions server/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

const (
sensorPrefix = "sensor/"
statePrefix = "state/"
heartbeatPrefix = "heartbeat/"
)

Expand All @@ -31,7 +31,7 @@ func newRedisClient(redisURL string) (redisClient, error) {

func (r *redisClient) ReadAllState() (map[string]string, error) {
state := make(map[string]string)
keys := r.client.Keys(ctx, fmt.Sprintf("%s*", sensorPrefix)).Val()
keys := r.client.Keys(ctx, fmt.Sprintf("%s*", statePrefix)).Val()
for _, k := range keys {
val, err := r.client.Get(ctx, k).Result()
if err != nil {
Expand All @@ -43,7 +43,7 @@ func (r *redisClient) ReadAllState() (map[string]string, error) {
}

func (r *redisClient) ReadState(key string) (string, error) {
val, err := r.client.Get(ctx, fmt.Sprintf("%s%s", sensorPrefix, key)).Result()
val, err := r.client.Get(ctx, fmt.Sprintf("%s%s", statePrefix, key)).Result()
if err != nil {
return "", err
}
Expand All @@ -52,7 +52,7 @@ func (r *redisClient) ReadState(key string) (string, error) {
}

func (r *redisClient) WriteState(key string, value string) error {
d := r.client.Set(ctx, fmt.Sprintf("%s%s", sensorPrefix, key), value, 0)
d := r.client.Set(ctx, fmt.Sprintf("%s%s", statePrefix, key), value, 0)
err := d.Err()
if err != nil {
return err
Expand Down

0 comments on commit 115300d

Please sign in to comment.