Skip to content

Commit

Permalink
Fix mqtt reconnection issue #176 (#199)
Browse files Browse the repository at this point in the history

Co-authored-by: Leland Sindt <[email protected]>
  • Loading branch information
virusbrain and LelandSindt authored Oct 11, 2022
1 parent 559527f commit 02826ed
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 18 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ Basically the same environment variables for the database, mqqt and timezone nee
| **MQTT_USERNAME** | string | |
| **MQTT_PASSWORD** | string | |
| **MQTT_NAMESPACE** | string | |
| **MQTT_CLIENTID** | string | *4 char random string* |
**Commands** environment variables
Expand Down
79 changes: 61 additions & 18 deletions src/v1_TeslaMateAPICarsStatus.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package main

import (
"crypto/tls"
"errors"
"fmt"
"log"
"net/http"
"net/url"
"sync"
"time"

Expand Down Expand Up @@ -72,14 +74,24 @@ type statusInfo struct {
}

type statusCache struct {
mqttDisabled bool
mqttDisabled bool
mqttConnected bool

topicScan string // scan parameter (expect it to generate car ID then relevant parameter)

cache map[int]*statusInfo
mu sync.Mutex
}

func getMQTTNameSpace() (MQTTNameSpace string) {
// adding MQTTNameSpace info
MQTTNameSpace = getEnv("MQTT_NAMESPACE", "")
if len(MQTTNameSpace) > 0 {
MQTTNameSpace = ("/" + MQTTNameSpace)
}
return MQTTNameSpace
}

func startMQTT() (*statusCache, error) {
s := statusCache{
cache: make(map[int]*statusInfo),
Expand All @@ -106,6 +118,7 @@ func startMQTT() (*statusCache, error) {
MQTTHost := getEnv("MQTT_HOST", "mosquitto")
MQTTUser := getEnv("MQTT_USERNAME", "")
MQTTPass := getEnv("MQTT_PASSWORD", "")
MQTTClientId := getEnv("MQTT_CLIENTID", randstr.String(4))
// MQTTInvCert := getEnvAsBool("MQTT_TLS_ACCEPT_INVALID_CERTS", false)

// creating mqttURL to connect with
Expand All @@ -125,13 +138,17 @@ func startMQTT() (*statusCache, error) {
// create options for the MQTT client connection
opts := mqtt.NewClientOptions().AddBroker(mqttURL)
// setting generic MQTT settings in opts
opts.SetKeepAlive(2 * time.Second) // setting keepalive for client
opts.SetDefaultPublishHandler(s.newMessage) // using f mqtt.MessageHandler function
opts.SetPingTimeout(1 * time.Second) // setting pingtimeout for client
opts.SetClientID("teslamateapi-" + randstr.String(4)) // setting mqtt client id for TeslaMateApi
opts.SetCleanSession(true) // removal of all subscriptions on disconnect
opts.SetOrderMatters(false) // don't care about order (removes need for callbacks to return immediately)
opts.SetAutoReconnect(true) // if connection drops automatically re-establish it
opts.SetKeepAlive(2 * time.Second) // setting keepalive for client
opts.SetDefaultPublishHandler(s.newMessage) // using f mqtt.MessageHandler function
opts.SetConnectionLostHandler(s.connectionLost) // Logs ConnectionLost events
opts.SetReconnectingHandler(reconnectingHandler) // Logs reconnect events
opts.SetConnectionAttemptHandler(connectingHandler)
opts.SetOnConnectHandler(s.connectedHandler)
opts.SetPingTimeout(1 * time.Second) // setting pingtimeout for client
opts.SetClientID("teslamateapi-" + MQTTClientId) // setting mqtt client id for TeslaMateApi
opts.SetCleanSession(true) // removal of all subscriptions on disconnect
opts.SetOrderMatters(false) // don't care about order (removes need for callbacks to return immediately)
opts.SetAutoReconnect(true) // if connection drops automatically re-establish it
opts.AutoReconnect = true

// creating MQTT connection with options
Expand All @@ -146,25 +163,44 @@ func startMQTT() (*statusCache, error) {
log.Println("[debug] TeslaMateAPICarsStatusV1 successfully connected to mqtt.")
}

// adding MQTTNameSpace info
MQTTNameSpace := getEnv("MQTT_NAMESPACE", "")
if len(MQTTNameSpace) > 0 {
MQTTNameSpace = ("/" + MQTTNameSpace)
}
s.topicScan = fmt.Sprintf("teslamate%s/cars/%%d/%%s", getMQTTNameSpace())

// Thats all - newMessage will be called when something new arrives
return &s, nil
}

func reconnectingHandler(c mqtt.Client, options *mqtt.ClientOptions) {
log.Println("[info] mqtt reconnecting...")

}

func connectingHandler(broker *url.URL, tlsCfg *tls.Config) *tls.Config {
log.Println("[info] mqtt connecting...")
return tlsCfg
}

func (s *statusCache) connectedHandler(c mqtt.Client) {
log.Println("[info] mqtt connected...")
s.mqttConnected = true

// Subscribe - we will accept info on any car...
topic := fmt.Sprintf("teslamate%s/cars/#", MQTTNameSpace)
if token := m.Subscribe(topic, 0, s.newMessage); token.Wait() && token.Error() != nil {
topic := fmt.Sprintf("teslamate%s/cars/#", getMQTTNameSpace())
if token := c.Subscribe(topic, 0, s.newMessage); token.Wait() && token.Error() != nil {
log.Panic(token.Error()) // Note : May want to use opts.ConnectRetry which will keep trying the connection
}
s.topicScan = fmt.Sprintf("teslamate%s/cars/%%d/%%s", MQTTNameSpace)
log.Println("[info] subscribed to: " + topic)

// Thats all - newMessage will be called when something new arrives
return &s, nil
}

// connectionLost - called by mqtt package when the connection get lost
func (s *statusCache) connectionLost(c mqtt.Client, err error) {
log.Println("[error] MQTT connection lost: " + err.Error())
s.mqttConnected = false
}

// newMessage - called by mqtt package when new message received
func (s *statusCache) newMessage(c mqtt.Client, msg mqtt.Message) {
//log.Println("[info] mqtt - received: " + string(msg.Topic()) + " with value: " + string(msg.Payload()))
// topic is in the format teslamateMQTT_NAMESPACE/cars/carID/display_name
var (
carID int
Expand All @@ -185,6 +221,7 @@ func (s *statusCache) newMessage(c mqtt.Client, msg mqtt.Message) {
s.cache[carID] = stat
}

//log.Printf(MqttTopic + " set to: " + string(msg.Payload()))
// running if-else statements to collect data and put into overall vars..
switch MqttTopic {
case "display_name":
Expand Down Expand Up @@ -306,6 +343,12 @@ func (s *statusCache) TeslaMateAPICarsStatusV1(c *gin.Context) {
return
}

if !s.mqttConnected {
log.Println("[notice] TeslaMateAPICarsStatusV1 mqtt is disconnected.. can not return status for car without mqtt!")
TeslaMateAPIHandleOtherResponse(c, http.StatusInternalServerError, "TeslaMateAPICarsStatusV1", gin.H{"error": "mqtt disconnected.. status not accessible!"})
return
}

// getting CarID param from URL
carID := convertStringToInteger(c.Param("CarID"))

Expand Down

0 comments on commit 02826ed

Please sign in to comment.