From c22f0cb3a1d9c78c060b884cbb3ec4f2661765f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E6=9B=99=E5=85=89?= Date: Sun, 5 Jan 2025 09:50:12 +0800 Subject: [PATCH] fix: health ok after server start --- rest/engine.go | 7 +++-- rest/engine_test.go | 16 ++++++++-- rest/internal/starter.go | 19 +++++------- rest/internal/starter_test.go | 53 +++++++++++++++++++++++++++++++-- rest/server.go | 21 +++++++++---- rest/server_test.go | 28 +++++++++++++++++ zrpc/internal/rpcserver.go | 6 ++-- zrpc/internal/rpcserver_test.go | 15 ++++++++++ 8 files changed, 138 insertions(+), 27 deletions(-) diff --git a/rest/engine.go b/rest/engine.go index e57786caf205..ca3d3e8d29cc 100644 --- a/rest/engine.go +++ b/rest/engine.go @@ -11,6 +11,7 @@ import ( "github.com/zeromicro/go-zero/core/codec" "github.com/zeromicro/go-zero/core/load" "github.com/zeromicro/go-zero/core/stat" + "github.com/zeromicro/go-zero/internal/health" "github.com/zeromicro/go-zero/rest/chain" "github.com/zeromicro/go-zero/rest/handler" "github.com/zeromicro/go-zero/rest/httpx" @@ -305,7 +306,7 @@ func (ng *engine) signatureVerifier(signature signatureSetting) (func(chain.Chai }, nil } -func (ng *engine) start(router httpx.Router, opts ...StartOption) error { +func (ng *engine) start(router httpx.Router, probe health.Probe, opts ...StartOption) error { if err := ng.bindRoutes(router); err != nil { return err } @@ -314,7 +315,7 @@ func (ng *engine) start(router httpx.Router, opts ...StartOption) error { opts = append([]StartOption{ng.withTimeout()}, opts...) if len(ng.conf.CertFile) == 0 && len(ng.conf.KeyFile) == 0 { - return internal.StartHttp(ng.conf.Host, ng.conf.Port, router, opts...) + return internal.StartHttp(ng.conf.Host, ng.conf.Port, router, probe, opts...) } // make sure user defined options overwrite default options @@ -327,7 +328,7 @@ func (ng *engine) start(router httpx.Router, opts ...StartOption) error { }, opts...) return internal.StartHttps(ng.conf.Host, ng.conf.Port, ng.conf.CertFile, - ng.conf.KeyFile, router, opts...) + ng.conf.KeyFile, router, probe, opts...) } func (ng *engine) use(middleware Middleware) { diff --git a/rest/engine_test.go b/rest/engine_test.go index 4f86d2173efd..9a672ee8cb1e 100644 --- a/rest/engine_test.go +++ b/rest/engine_test.go @@ -223,7 +223,7 @@ Verbose: true } }) - assert.NotNil(t, ng.start(mockedRouter{}, func(svr *http.Server) { + assert.NotNil(t, ng.start(mockedRouter{}, mockProbe{}, func(svr *http.Server) { })) timeout := time.Second * 3 @@ -414,7 +414,7 @@ func TestEngine_start(t *testing.T) { Host: "localhost", Port: -1, }) - assert.Error(t, ng.start(router.NewRouter())) + assert.Error(t, ng.start(router.NewRouter(), mockProbe{})) }) t.Run("https", func(t *testing.T) { @@ -425,10 +425,20 @@ func TestEngine_start(t *testing.T) { KeyFile: "bar", }) ng.tlsConfig = &tls.Config{} - assert.Error(t, ng.start(router.NewRouter())) + assert.Error(t, ng.start(router.NewRouter(), mockProbe{})) }) } +type mockProbe struct{} + +func (m mockProbe) MarkReady() {} + +func (m mockProbe) MarkNotReady() {} + +func (m mockProbe) IsReady() bool { return false } + +func (m mockProbe) Name() string { return "" } + type mockedRouter struct { } diff --git a/rest/internal/starter.go b/rest/internal/starter.go index 174303342b7e..033d82b259ef 100644 --- a/rest/internal/starter.go +++ b/rest/internal/starter.go @@ -11,28 +11,26 @@ import ( "github.com/zeromicro/go-zero/internal/health" ) -const probeNamePrefix = "rest" - // StartOption defines the method to customize http.Server. type StartOption func(svr *http.Server) // StartHttp starts a http server. -func StartHttp(host string, port int, handler http.Handler, opts ...StartOption) error { - return start(host, port, handler, func(svr *http.Server) error { +func StartHttp(host string, port int, handler http.Handler, probe health.Probe, opts ...StartOption) error { + return start(host, port, handler, probe, func(svr *http.Server) error { return svr.ListenAndServe() }, opts...) } // StartHttps starts a https server. -func StartHttps(host string, port int, certFile, keyFile string, handler http.Handler, +func StartHttps(host string, port int, certFile, keyFile string, handler http.Handler, probe health.Probe, opts ...StartOption) error { - return start(host, port, handler, func(svr *http.Server) error { + return start(host, port, handler, probe, func(svr *http.Server) error { // certFile and keyFile are set in buildHttpsServer return svr.ListenAndServeTLS(certFile, keyFile) }, opts...) } -func start(host string, port int, handler http.Handler, run func(svr *http.Server) error, +func start(host string, port int, handler http.Handler, probe health.Probe, run func(svr *http.Server) error, opts ...StartOption) (err error) { server := &http.Server{ Addr: fmt.Sprintf("%s:%d", host, port), @@ -41,21 +39,20 @@ func start(host string, port int, handler http.Handler, run func(svr *http.Serve for _, opt := range opts { opt(server) } - healthManager := health.NewHealthManager(fmt.Sprintf("%s-%s:%d", probeNamePrefix, host, port)) waitForCalled := proc.AddShutdownListener(func() { - healthManager.MarkNotReady() if e := server.Shutdown(context.Background()); e != nil { logx.Error(e) } }) defer func() { if errors.Is(err, http.ErrServerClosed) { + probe.MarkNotReady() waitForCalled() } }() - healthManager.MarkReady() - health.AddProbe(healthManager) + probe.MarkReady() + return run(server) } diff --git a/rest/internal/starter_test.go b/rest/internal/starter_test.go index a54c215f9f56..0b96f8e7ceef 100644 --- a/rest/internal/starter_test.go +++ b/rest/internal/starter_test.go @@ -6,9 +6,11 @@ import ( "strconv" "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/zeromicro/go-zero/core/proc" + "github.com/zeromicro/go-zero/core/syncx" ) func TestStartHttp(t *testing.T) { @@ -16,7 +18,7 @@ func TestStartHttp(t *testing.T) { fields := strings.Split(svr.Listener.Addr().String(), ":") port, err := strconv.Atoi(fields[1]) assert.Nil(t, err) - err = StartHttp(fields[0], port, http.NotFoundHandler(), func(svr *http.Server) { + err = StartHttp(fields[0], port, http.NotFoundHandler(), &mockProbe{}, func(svr *http.Server) { svr.IdleTimeout = 0 }) assert.NotNil(t, err) @@ -28,9 +30,56 @@ func TestStartHttps(t *testing.T) { fields := strings.Split(svr.Listener.Addr().String(), ":") port, err := strconv.Atoi(fields[1]) assert.Nil(t, err) - err = StartHttps(fields[0], port, "", "", http.NotFoundHandler(), func(svr *http.Server) { + err = StartHttps(fields[0], port, "", "", http.NotFoundHandler(), &mockProbe{}, func(svr *http.Server) { svr.IdleTimeout = 0 }) assert.NotNil(t, err) proc.WrapUp() } +func TestStartWithShutdownListener(t *testing.T) { + probe := &mockProbe{} + shutdownCalled := make(chan struct{}) + serverStarted := make(chan struct{}) + serverClosed := make(chan struct{}) + + run := func(svr *http.Server) error { + close(serverStarted) + <-shutdownCalled + return http.ErrServerClosed + } + + go func() { + err := start("localhost", 8888, http.NotFoundHandler(), probe, run) + assert.Equal(t, http.ErrServerClosed, err) + close(serverClosed) + }() + + select { + case <-serverStarted: + assert.True(t, probe.IsReady(), "server should be marked as ready") + case <-time.After(time.Second): + t.Fatal("timeout waiting for server to start") + } + + proc.WrapUp() + time.Sleep(time.Millisecond * 50) + close(shutdownCalled) +} + +type mockProbe struct { + ready syncx.AtomicBool +} + +func (m *mockProbe) MarkReady() { + m.ready.Set(true) +} + +func (m *mockProbe) MarkNotReady() { + m.ready.Set(false) +} + +func (m *mockProbe) IsReady() bool { + return m.ready.True() +} + +func (m *mockProbe) Name() string { return "" } diff --git a/rest/server.go b/rest/server.go index b1e5487bd8a5..4ddf39a115bf 100644 --- a/rest/server.go +++ b/rest/server.go @@ -3,11 +3,13 @@ package rest import ( "crypto/tls" "errors" + "fmt" "net/http" "path" "time" "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/internal/health" "github.com/zeromicro/go-zero/rest/chain" "github.com/zeromicro/go-zero/rest/handler" "github.com/zeromicro/go-zero/rest/httpx" @@ -17,6 +19,8 @@ import ( "github.com/zeromicro/go-zero/rest/router" ) +const probeNamePrefix = "rest" + type ( // RunOption defines the method to customize a Server. RunOption func(*Server) @@ -26,8 +30,9 @@ type ( // A Server is a http server. Server struct { - ngin *engine - router httpx.Router + ngin *engine + router httpx.Router + healthManager health.Probe } ) @@ -50,9 +55,13 @@ func NewServer(c RestConf, opts ...RunOption) (*Server, error) { return nil, err } + healthManager := health.NewHealthManager(fmt.Sprintf("%s-%s:%d", probeNamePrefix, c.Host, c.Port)) + health.AddProbe(healthManager) + server := &Server{ - ngin: newEngine(c), - router: router.NewRouter(), + ngin: newEngine(c), + router: router.NewRouter(), + healthManager: healthManager, } opts = append([]RunOption{WithNotFoundHandler(nil)}, opts...) @@ -118,14 +127,14 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Graceful shutdown is enabled by default. // Use proc.SetTimeToForceQuit to customize the graceful shutdown period. func (s *Server) Start() { - handleError(s.ngin.start(s.router)) + handleError(s.ngin.start(s.router, s.healthManager)) } // StartWithOpts starts the Server. // Graceful shutdown is enabled by default. // Use proc.SetTimeToForceQuit to customize the graceful shutdown period. func (s *Server) StartWithOpts(opts ...StartOption) { - handleError(s.ngin.start(s.router, opts...)) + handleError(s.ngin.start(s.router, s.healthManager, opts...)) } // Stop stops the Server. diff --git a/rest/server_test.go b/rest/server_test.go index 9a92d58f8203..0b9cfa53cbac 100644 --- a/rest/server_test.go +++ b/rest/server_test.go @@ -17,6 +17,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/zeromicro/go-zero/core/conf" "github.com/zeromicro/go-zero/core/logx/logtest" + "github.com/zeromicro/go-zero/core/service" "github.com/zeromicro/go-zero/rest/chain" "github.com/zeromicro/go-zero/rest/httpx" "github.com/zeromicro/go-zero/rest/internal/cors" @@ -754,6 +755,33 @@ Port: 54321 } } +func TestServerProbe(t *testing.T) { + server := MustNewServer(RestConf{ + ServiceConf: service.ServiceConf{ + DevServer: service.DevServerConfig{ + Host: "localhost", + Port: 6061, + HealthPath: "/healthz", + Enabled: true, + }, + }, + Host: "localhost", + Port: 8888, + }) + assert.NotNil(t, server) + assert.False(t, server.healthManager.IsReady()) + resp, err := http.Get("http://localhost:6061/healthz") + assert.Nil(t, err) + assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode) + + go server.Start() + defer server.Stop() + + assert.Eventually(t, func() bool { + return server.healthManager.IsReady() + }, time.Millisecond*100, time.Millisecond*10) +} + //go:embed testdata var content embed.FS diff --git a/zrpc/internal/rpcserver.go b/zrpc/internal/rpcserver.go index c1302f03ae52..ccb98d524686 100644 --- a/zrpc/internal/rpcserver.go +++ b/zrpc/internal/rpcserver.go @@ -34,9 +34,12 @@ func NewRpcServer(addr string, opts ...ServerOption) Server { opt(&options) } + healthManager := health.NewHealthManager(fmt.Sprintf("%s-%s", probeNamePrefix, addr)) + health.AddProbe(healthManager) + return &rpcServer{ baseRpcServer: newBaseRpcServer(addr, &options), - healthManager: health.NewHealthManager(fmt.Sprintf("%s-%s", probeNamePrefix, addr)), + healthManager: healthManager, } } @@ -63,7 +66,6 @@ func (s *rpcServer) Start(register RegisterFn) error { s.health.Resume() } s.healthManager.MarkReady() - health.AddProbe(s.healthManager) // we need to make sure all others are wrapped up, // so we do graceful stop at shutdown phase instead of wrap up phase diff --git a/zrpc/internal/rpcserver_test.go b/zrpc/internal/rpcserver_test.go index 696dae68713c..19d95d7bf5a7 100644 --- a/zrpc/internal/rpcserver_test.go +++ b/zrpc/internal/rpcserver_test.go @@ -52,3 +52,18 @@ func TestRpcServer_WithBadAddress(t *testing.T) { proc.WrapUp() } + +func TestServerProbe(t *testing.T) { + server := NewRpcServer("localhost:12345") + assert.NotNil(t, server) + svr, ok := server.(*rpcServer) + assert.True(t, ok) + assert.False(t, svr.healthManager.IsReady()) + go func() { + err := svr.Start(func(server *grpc.Server) {}) + assert.Nil(t, err) + }() + assert.Eventually(t, func() bool { + return svr.healthManager.IsReady() + }, time.Millisecond*100, time.Millisecond*10) +}