Skip to content

Commit

Permalink
chore(shutdown): ft: add graceful shutdown servers and workers
Browse files Browse the repository at this point in the history
  • Loading branch information
eizyc committed Jun 30, 2024
1 parent b1c6470 commit 63303ae
Showing 1 changed file with 96 additions and 23 deletions.
119 changes: 96 additions & 23 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ package main
import (
"context"
"embed"
"errors"
"io/fs"
"net"
"net/http"
"os"
"os/signal"
"syscall"

"github.com/hibiken/asynq"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"

"github.com/eizyc/simplebank/api"
db "github.com/eizyc/simplebank/db/sqlc"
Expand All @@ -32,6 +36,12 @@ import (
//go:embed doc/swagger/*
var staticFiles embed.FS

var interruptSignals = []os.Signal{
os.Interrupt,
syscall.SIGTERM,
syscall.SIGINT,
}

func main() {
config, err := util.LoadConfig(".")
if err != nil {
Expand All @@ -42,7 +52,10 @@ func main() {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
}

connPool, err := pgxpool.New(context.Background(), config.DBSource)
ctx, stop := signal.NotifyContext(context.Background(), interruptSignals...)
defer stop()

connPool, err := pgxpool.New(ctx, config.DBSource)
if err != nil {
log.Fatal().Err(err).Msg("cannot connect to db:")
}
Expand All @@ -57,10 +70,16 @@ func main() {

taskDistributor := worker.NewRedisTaskDistributor(redisOpt)

go runTaskProcessor(config, redisOpt, store)
go runGatewayServer(config, store, taskDistributor)
runGrpcServer(config, store, taskDistributor)
waitGroup, ctx := errgroup.WithContext(ctx)

runTaskProcessor(ctx, waitGroup, config, redisOpt, store)
runGatewayServer(ctx, waitGroup, config, store, taskDistributor)
runGrpcServer(ctx, waitGroup, config, store, taskDistributor)

err = waitGroup.Wait()
if err != nil {
log.Fatal().Err(err).Msg("error from wait group")
}
}

func runDBMigration(migrationURL string, dbSource string) {
Expand All @@ -77,6 +96,8 @@ func runDBMigration(migrationURL string, dbSource string) {
}

func runTaskProcessor(
ctx context.Context,
waitGroup *errgroup.Group,
config util.Config,
redisOpt asynq.RedisClientOpt,
store db.Store,
Expand All @@ -89,9 +110,22 @@ func runTaskProcessor(
if err != nil {
log.Fatal().Err(err).Msg("failed to start task processor")
}

waitGroup.Go(func() error {
<-ctx.Done()
log.Info().Msg("graceful shutdown task processor")

taskProcessor.Shutdown()
log.Info().Msg("task processor is stopped")

return nil
})
}

func runGrpcServer(config util.Config, store db.Store, taskDistributor worker.TaskDistributor) {
func runGrpcServer(
ctx context.Context,
waitGroup *errgroup.Group,
config util.Config, store db.Store, taskDistributor worker.TaskDistributor) {
server, err := gapi.NewServer(config, store, taskDistributor)
if err != nil {
log.Fatal().Err(err).Msg("cannot create server")
Expand All @@ -107,15 +141,36 @@ func runGrpcServer(config util.Config, store db.Store, taskDistributor worker.Ta
log.Fatal().Err(err).Msg("cannot create listener")
}

log.Info().Msgf("start gRPC server on %s", listener.Addr().String())
waitGroup.Go(func() error {
log.Info().Msgf("start gRPC server at %s", listener.Addr().String())

err = grpcServer.Serve(listener)
if err != nil {
log.Fatal().Err(err).Msg("cannot start grpc server")
}
err = grpcServer.Serve(listener)
if err != nil {
if errors.Is(err, grpc.ErrServerStopped) {
return nil
}
log.Error().Err(err).Msg("gRPC server failed to serve")
return err
}

return nil
})

waitGroup.Go(func() error {
<-ctx.Done()
log.Info().Msg("graceful shutdown gRPC server")

grpcServer.GracefulStop()
log.Info().Msg("gRPC server is stopped")

return nil
})
}

func runGatewayServer(config util.Config, store db.Store, taskDistributor worker.TaskDistributor) {
func runGatewayServer(
ctx context.Context,
waitGroup *errgroup.Group,
config util.Config, store db.Store, taskDistributor worker.TaskDistributor) {
server, err := gapi.NewServer(config, store, taskDistributor)
if err != nil {
log.Fatal().Err(err).Msg("cannot create server")
Expand All @@ -131,8 +186,6 @@ func runGatewayServer(config util.Config, store db.Store, taskDistributor worker
})

grpcMux := runtime.NewServeMux(jsonOption)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = pb.RegisterSimpleBankHandlerServer(ctx, grpcMux, server)
if err != nil {
log.Fatal().Err(err).Msg("cannot register handler server")
Expand All @@ -148,17 +201,37 @@ func runGatewayServer(config util.Config, store db.Store, taskDistributor worker
fs := http.FileServer(http.FS(staticFS))
mux.Handle("/swagger/", http.StripPrefix("/swagger/", fs))

listener, err := net.Listen("tcp", config.HTTPServerAddress)
if err != nil {
log.Fatal().Err(err).Msg("cannot create listener")
}
httpServer := &http.Server{
Handler: gapi.HttpLogger(mux),
Addr: config.HTTPServerAddress,
}

waitGroup.Go(func() error {
log.Info().Msgf("start HTTP gateway server at %s", httpServer.Addr)
err = httpServer.ListenAndServe()
if err != nil {
if errors.Is(err, http.ErrServerClosed) {
return nil
}
log.Error().Err(err).Msg("HTTP gateway server failed to serve")
return err
}
return nil
})

log.Info().Msgf("start HTTP gateway server on %s", listener.Addr().String())
handler := gapi.HttpLogger(mux)
err = http.Serve(listener, handler)
if err != nil {
log.Fatal().Err(err).Msg("cannot start HTTP gateway server")
}
waitGroup.Go(func() error {
<-ctx.Done()
log.Info().Msg("graceful shutdown HTTP gateway server")

err := httpServer.Shutdown(context.Background())
if err != nil {
log.Error().Err(err).Msg("failed to shutdown HTTP gateway server")
return err
}

log.Info().Msg("HTTP gateway server is stopped")
return nil
})
}

func runGinServer(config util.Config, store db.Store) {
Expand Down

0 comments on commit 63303ae

Please sign in to comment.