From 63303ae22836b2057ac6bf30ef46069efb8309e8 Mon Sep 17 00:00:00 2001 From: eizyc Date: Sat, 29 Jun 2024 23:40:43 -0500 Subject: [PATCH] chore(shutdown): ft: add graceful shutdown servers and workers --- main.go | 119 +++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 96 insertions(+), 23 deletions(-) diff --git a/main.go b/main.go index a8ce4a8..b0c58b4 100644 --- a/main.go +++ b/main.go @@ -3,14 +3,18 @@ package main import ( "context" "embed" + "errors" "io/fs" "net" "net/http" "os" + "os/signal" + "syscall" "github.com/hibiken/asynq" "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "golang.org/x/sync/errgroup" "github.com/eizyc/simplebank/api" db "github.com/eizyc/simplebank/db/sqlc" @@ -32,6 +36,12 @@ import ( //go:embed doc/swagger/* var staticFiles embed.FS +var interruptSignals = []os.Signal{ + os.Interrupt, + syscall.SIGTERM, + syscall.SIGINT, +} + func main() { config, err := util.LoadConfig(".") if err != nil { @@ -42,7 +52,10 @@ func main() { log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) } - connPool, err := pgxpool.New(context.Background(), config.DBSource) + ctx, stop := signal.NotifyContext(context.Background(), interruptSignals...) + defer stop() + + connPool, err := pgxpool.New(ctx, config.DBSource) if err != nil { log.Fatal().Err(err).Msg("cannot connect to db:") } @@ -57,10 +70,16 @@ func main() { taskDistributor := worker.NewRedisTaskDistributor(redisOpt) - go runTaskProcessor(config, redisOpt, store) - go runGatewayServer(config, store, taskDistributor) - runGrpcServer(config, store, taskDistributor) + waitGroup, ctx := errgroup.WithContext(ctx) + + runTaskProcessor(ctx, waitGroup, config, redisOpt, store) + runGatewayServer(ctx, waitGroup, config, store, taskDistributor) + runGrpcServer(ctx, waitGroup, config, store, taskDistributor) + err = waitGroup.Wait() + if err != nil { + log.Fatal().Err(err).Msg("error from wait group") + } } func runDBMigration(migrationURL string, dbSource string) { @@ -77,6 +96,8 @@ func runDBMigration(migrationURL string, dbSource string) { } func runTaskProcessor( + ctx context.Context, + waitGroup *errgroup.Group, config util.Config, redisOpt asynq.RedisClientOpt, store db.Store, @@ -89,9 +110,22 @@ func runTaskProcessor( if err != nil { log.Fatal().Err(err).Msg("failed to start task processor") } + + waitGroup.Go(func() error { + <-ctx.Done() + log.Info().Msg("graceful shutdown task processor") + + taskProcessor.Shutdown() + log.Info().Msg("task processor is stopped") + + return nil + }) } -func runGrpcServer(config util.Config, store db.Store, taskDistributor worker.TaskDistributor) { +func runGrpcServer( + ctx context.Context, + waitGroup *errgroup.Group, + config util.Config, store db.Store, taskDistributor worker.TaskDistributor) { server, err := gapi.NewServer(config, store, taskDistributor) if err != nil { log.Fatal().Err(err).Msg("cannot create server") @@ -107,15 +141,36 @@ func runGrpcServer(config util.Config, store db.Store, taskDistributor worker.Ta log.Fatal().Err(err).Msg("cannot create listener") } - log.Info().Msgf("start gRPC server on %s", listener.Addr().String()) + waitGroup.Go(func() error { + log.Info().Msgf("start gRPC server at %s", listener.Addr().String()) - err = grpcServer.Serve(listener) - if err != nil { - log.Fatal().Err(err).Msg("cannot start grpc server") - } + err = grpcServer.Serve(listener) + if err != nil { + if errors.Is(err, grpc.ErrServerStopped) { + return nil + } + log.Error().Err(err).Msg("gRPC server failed to serve") + return err + } + + return nil + }) + + waitGroup.Go(func() error { + <-ctx.Done() + log.Info().Msg("graceful shutdown gRPC server") + + grpcServer.GracefulStop() + log.Info().Msg("gRPC server is stopped") + + return nil + }) } -func runGatewayServer(config util.Config, store db.Store, taskDistributor worker.TaskDistributor) { +func runGatewayServer( + ctx context.Context, + waitGroup *errgroup.Group, + config util.Config, store db.Store, taskDistributor worker.TaskDistributor) { server, err := gapi.NewServer(config, store, taskDistributor) if err != nil { log.Fatal().Err(err).Msg("cannot create server") @@ -131,8 +186,6 @@ func runGatewayServer(config util.Config, store db.Store, taskDistributor worker }) grpcMux := runtime.NewServeMux(jsonOption) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() err = pb.RegisterSimpleBankHandlerServer(ctx, grpcMux, server) if err != nil { log.Fatal().Err(err).Msg("cannot register handler server") @@ -148,17 +201,37 @@ func runGatewayServer(config util.Config, store db.Store, taskDistributor worker fs := http.FileServer(http.FS(staticFS)) mux.Handle("/swagger/", http.StripPrefix("/swagger/", fs)) - listener, err := net.Listen("tcp", config.HTTPServerAddress) - if err != nil { - log.Fatal().Err(err).Msg("cannot create listener") - } + httpServer := &http.Server{ + Handler: gapi.HttpLogger(mux), + Addr: config.HTTPServerAddress, + } + + waitGroup.Go(func() error { + log.Info().Msgf("start HTTP gateway server at %s", httpServer.Addr) + err = httpServer.ListenAndServe() + if err != nil { + if errors.Is(err, http.ErrServerClosed) { + return nil + } + log.Error().Err(err).Msg("HTTP gateway server failed to serve") + return err + } + return nil + }) - log.Info().Msgf("start HTTP gateway server on %s", listener.Addr().String()) - handler := gapi.HttpLogger(mux) - err = http.Serve(listener, handler) - if err != nil { - log.Fatal().Err(err).Msg("cannot start HTTP gateway server") - } + waitGroup.Go(func() error { + <-ctx.Done() + log.Info().Msg("graceful shutdown HTTP gateway server") + + err := httpServer.Shutdown(context.Background()) + if err != nil { + log.Error().Err(err).Msg("failed to shutdown HTTP gateway server") + return err + } + + log.Info().Msg("HTTP gateway server is stopped") + return nil + }) } func runGinServer(config util.Config, store db.Store) {