From 0ac496c58a333c350b893c887c78f7be12832ab3 Mon Sep 17 00:00:00 2001 From: Lakshay Kalbhor <lakshay.kalbhor@zerodha.com> Date: Mon, 29 Jul 2024 16:25:35 +0530 Subject: [PATCH 1/2] chore: upgrade franz-go version with compression optimisations --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index cee54e9..68174b2 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/knadh/koanf/providers/posflag v0.1.0 github.com/knadh/koanf/v2 v2.0.1 github.com/spf13/pflag v1.0.5 - github.com/twmb/franz-go v1.17.0 + github.com/twmb/franz-go v1.17.1 github.com/twmb/franz-go/pkg/kmsg v1.8.0 ) diff --git a/go.sum b/go.sum index ec6a6ad..1aab83a 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,8 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/twmb/franz-go v1.17.0 h1:hawgCx5ejDHkLe6IwAtFWwxi3OU4OztSTl7ZV5rwkYk= github.com/twmb/franz-go v1.17.0/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= +github.com/twmb/franz-go v1.17.1 h1:0LwPsbbJeJ9R91DPUHSEd4su82WJWcTY1Zzbgbg4CeQ= +github.com/twmb/franz-go v1.17.1/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= github.com/twmb/franz-go/pkg/kadm v1.8.1 h1:SrzL855I7gQTGdMtOYGTHhebs7TPgPN29FPtjusqwlE= github.com/twmb/franz-go/pkg/kadm v1.8.1/go.mod h1:qUSM7pxoMCU1UNu5H4USE64ODcVmeG9LS96mysv1nu8= github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA= From b31e40bf42d61536cfe22a47e825d01214d01fd4 Mon Sep 17 00:00:00 2001 From: Lakshay Kalbhor <lakshay.kalbhor@zerodha.com> Date: Mon, 29 Jul 2024 16:26:26 +0530 Subject: [PATCH 2/2] fix: handle http server initialisation and runtime --- go.sum | 2 -- init.go | 11 +---------- main.go | 20 +++++++++++--------- 3 files changed, 12 insertions(+), 21 deletions(-) diff --git a/go.sum b/go.sum index 1aab83a..f63c57b 100644 --- a/go.sum +++ b/go.sum @@ -32,8 +32,6 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/twmb/franz-go v1.17.0 h1:hawgCx5ejDHkLe6IwAtFWwxi3OU4OztSTl7ZV5rwkYk= -github.com/twmb/franz-go v1.17.0/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= github.com/twmb/franz-go v1.17.1 h1:0LwPsbbJeJ9R91DPUHSEd4su82WJWcTY1Zzbgbg4CeQ= github.com/twmb/franz-go v1.17.1/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= github.com/twmb/franz-go/pkg/kadm v1.8.1 h1:SrzL855I7gQTGdMtOYGTHhebs7TPgPN29FPtjusqwlE= diff --git a/init.go b/init.go index 7bd7d4b..bde3cf2 100644 --- a/init.go +++ b/init.go @@ -276,19 +276,10 @@ func initMetricsServer(metrics *metrics.Set, ko *koanf.Koanf) *http.Server { buf.WriteTo(w) }) - srv := &http.Server{ + return &http.Server{ Addr: ko.MustString("app.metrics_server_addr"), Handler: mux, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, } - - go func() { - err := srv.ListenAndServe() - if err != nil && err != http.ErrServerClosed { - log.Printf("error starting server: %v", err) - } - }() - - return srv } diff --git a/main.go b/main.go index b0f9418..9b4270a 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "net/http" "os" "os/signal" "syscall" @@ -39,13 +40,20 @@ func main() { log.Fatalf("error initializing filter provider: %v", err) } - // Initialize metrics. - metr := metrics.NewSet() - // Create a global context with interrupts signals. globalCtx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() + // Initialize metrics set and start the HTTP server. + metr := metrics.NewSet() + metrSrv := initMetricsServer(metr, ko) + go func() { + if err := metrSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Printf("error starting server: %v", err) + } + }() + defer metrSrv.Shutdown(globalCtx) + // Initialize the source and target Kafka config. consumerCfgs, prodConfig := initKafkaConfig(ko) @@ -78,16 +86,10 @@ func main() { log.Fatalf("error initializing relay controller: %v", err) } - // Start the metrSrv HTTP server. - metrSrv := initMetricsServer(metr, ko) - // Start the relay. This is an indefinitely blocking call. if err := relay.Start(globalCtx); err != nil { log.Fatalf("error starting relay controller: %v", err) } - if metrSrv != nil { - metrSrv.Shutdown(globalCtx) - } lo.Info("bye") }