Skip to content

Commit

Permalink
Fix gracefull shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
helder-junior committed Nov 21, 2024
1 parent aad63d8 commit 9bb965c
Showing 1 changed file with 26 additions and 14 deletions.
40 changes: 26 additions & 14 deletions server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"context"
"fmt"
"net"
"os"
"os/signal"
"time"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -75,7 +77,7 @@ func NewApp(host string, port int, log logger.Logger, config *viper.Viper) (*App
return a, err
}

func (a *App) loadConfigurationDefaults() {
func (a *App) configure() error {
a.config.SetDefault("otlp.enabled", false)
a.config.SetDefault("kafka.producer.net.maxOpenRequests", 10)
a.config.SetDefault("kafka.producer.net.dialTimeout", "500ms")
Expand All @@ -98,10 +100,6 @@ func (a *App) loadConfigurationDefaults() {
a.config.SetDefault("server.Timeout", "500ms")
a.config.SetDefault("prometheus.enabled", "true") // always true on the API side
a.config.SetDefault("prometheus.port", ":9091")
}

func (a *App) configure() error {
a.loadConfigurationDefaults()

if a.config.GetBool("otlp.enabled") {
if err := a.configureOTel(); err != nil {
Expand Down Expand Up @@ -222,12 +220,12 @@ func (a *App) metricsReporterInterceptor(

// Run runs the app
func (a *App) Run() {
log := a.log

listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", a.host, a.port))
if err != nil {
log.Panic(err.Error())
a.log.Panic(err.Error())
}
log.Infof("events gateway listening on %s:%d", a.host, a.port)
a.log.Infof("events gateway listening on %s:%d", a.host, a.port)

metrics.StartServer(a.config)
var opts []grpc.ServerOption
Expand All @@ -250,11 +248,25 @@ func (a *App) Run() {
a.grpcServer = grpc.NewServer(opts...)

pb.RegisterGRPCForwarderServer(a.grpcServer, a.Server)
if err := a.grpcServer.Serve(listener); err != nil {
log.Panic(err.Error())
}
}
var stopChan = make(chan os.Signal, 2)

signal.Notify(stopChan)
var errChan = make(chan error)

func (a *App) Stop() {
a.grpcServer.GracefulStop()
go func() {
if err := a.grpcServer.Serve(listener); err != nil {
errChan <- err
}
}()

defer func() {
a.log.Info("Calling GRPC Gracefull stop...")
a.grpcServer.GracefulStop()
}()
select {
case err := <-errChan:
a.log.Panicf("Server failed with error: %s", err.Error())
case sig := <-stopChan:
a.log.Infof("Got signal %s from OS. Stopping...", sig)
}
}

0 comments on commit 9bb965c

Please sign in to comment.