From e80a1a4f9e65d2a6f44b5e374c81a8feb725d243 Mon Sep 17 00:00:00 2001 From: Abhishek Patro Date: Wed, 15 Jan 2025 21:31:41 -0800 Subject: [PATCH] Perform health check of MQTT on startup --- config/config.go | 7 ++++++- datastore/mqtt/mqtt.go | 18 +++++++++++++++++- docker-compose.yml | 1 - 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/config/config.go b/config/config.go index af9bedb5..54a82170 100644 --- a/config/config.go +++ b/config/config.go @@ -322,7 +322,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l if _, ok := requiredDispatchers[telemetry.MQTT]; ok { if c.MQTT == nil { - return nil, nil, errors.New("Expected MQTT to be configured") + return nil, nil, errors.New("expected MQTT to be configured") } mqttProducer, err := mqtt.NewProducer(context.Background(), c.MQTT, c.MetricCollector, c.Namespace, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.MQTT], logger) if err != nil { @@ -352,6 +352,11 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l return nil, nil, err } } + if !test && producers[telemetry.MQTT] != nil { + if err := producers[telemetry.MQTT].(*mqtt.Producer).Connect(); err != nil { + return nil, nil, err + } + } return producers, dispatchProducerRules, nil } diff --git a/datastore/mqtt/mqtt.go b/datastore/mqtt/mqtt.go index 9bdf83e1..2a529e5d 100644 --- a/datastore/mqtt/mqtt.go +++ b/datastore/mqtt/mqtt.go @@ -2,6 +2,7 @@ package mqtt import ( "context" + "fmt" "sync" "time" @@ -14,6 +15,10 @@ import ( "github.com/teslamotors/fleet-telemetry/telemetry" ) +var ( + defaultTimeout = 5 * time.Second +) + // Producer is a telemetry.Producer that sends records to an MQTT broker. type Producer struct { client pahomqtt.Client @@ -105,7 +110,6 @@ func NewProducer(ctx context.Context, config *Config, metrics metrics.MetricColl SetKeepAlive(time.Duration(config.KeepAlive) * time.Second) client := PahoNewClient(opts) - client.Connect() return &Producer{ client: client, @@ -119,6 +123,18 @@ func NewProducer(ctx context.Context, config *Config, metrics metrics.MetricColl }, nil } +// Connect performs health check and returns error if connection is not established +func (p *Producer) Connect() error { + token := p.client.Connect() + if !token.WaitTimeout(defaultTimeout) { + return fmt.Errorf("connection attempt timed out after %v", defaultTimeout) + } + if err := token.Error(); err != nil { + return err + } + return nil +} + // Produce sends a record to the MQTT broker. func (p *Producer) Produce(rec *telemetry.Record) { if p.ctx.Err() != nil { diff --git a/docker-compose.yml b/docker-compose.yml index ec7b0c79..cfacec5a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3' services: app: build: