Skip to content

Commit

Permalink
Add metrics and tracing (#129)
Browse files Browse the repository at this point in the history
Most of this is copied from `node-go`.

Expose a metrics server on a port even though no custom metrics are
provided yet.
  • Loading branch information
mkysel authored Sep 5, 2024
1 parent 775f2a9 commit 6bb1a4f
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 16 deletions.
24 changes: 17 additions & 7 deletions cmd/replication/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,20 @@ func main() {
return
}

log, _, err := buildLogger(options)
logger, _, err := buildLogger(options)
if err != nil {
fatal("Could not build logger: %s", err)
}

if options.Tracing.Enable {
logger.Info("starting tracer")
tracing.Start(Commit, logger)
defer func() {
logger.Info("stopping tracer")
tracing.Stop()
}()
}

ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup
Expand All @@ -46,7 +55,7 @@ func main() {
)

if err != nil {
log.Fatal("initializing database", zap.Error(err))
logger.Fatal("initializing database", zap.Error(err))
}

privateKey, err := utils.ParseEcdsaPrivateKey(options.SignerPrivateKey)
Expand All @@ -56,7 +65,7 @@ func main() {

s, err := server.NewReplicationServer(
ctx,
log,
logger,
options,
// TODO:nm replace with real node registry
registry.NewFixedNodeRegistry(
Expand All @@ -75,11 +84,12 @@ func main() {
if err != nil {
log.Fatal("initializing server", zap.Error(err))
}

s.WaitForShutdown()
doneC <- true
})

<-doneC

cancel()
wg.Wait()
}
Expand Down Expand Up @@ -112,12 +122,12 @@ func buildLogger(options config.ServerOptions) (*zap.Logger, *zap.Config, error)
EncodeCaller: zapcore.ShortCallerEncoder,
},
}
log, err := cfg.Build()
logger, err := cfg.Build()
if err != nil {
return nil, nil, err
}

log = log.Named("replication")
logger = logger.Named("replication")

return log, &cfg, nil
return logger, &cfg, nil
}
30 changes: 21 additions & 9 deletions pkg/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,31 @@ type DbOptions struct {
WaitForDB time.Duration `long:"wait-for" env:"XMTPD_DB_WAIT_FOR" description:"wait for DB on start, up to specified duration"`
}

// MetricsOptions are settings used to start a prometheus server
type MetricsOptions struct {
Enable bool `long:"enable" env:"XMTPD_METRICS_ENABLE" description:"Enable the metrics server"`
Address string `long:"metrics-address" env:"XMTPD_METRICS_METRICS_ADDRESS" description:"Listening address of the metrics server" default:"127.0.0.1"`
Port int `long:"metrics-port" env:"XMTPD_METRICS_METRICS_PORT" description:"Listening HTTP port of the metrics server" default:"8008"`
}

type PayerOptions struct {
PrivateKey string `long:"private-key" env:"XMTPD_PAYER_PRIVATE_KEY" description:"Private key used to sign blockchain transactions"`
}

type ServerOptions struct {
LogLevel string `short:"l" long:"log-level" env:"XMTPD_LOG_LEVEL" description:"Define the logging level, supported strings are: DEBUG, INFO, WARN, ERROR, DPANIC, PANIC, FATAL, and their lower-case forms." default:"INFO"`
//nolint:staticcheck
LogEncoding string ` long:"log-encoding" env:"XMTPD_LOG_ENCODING" description:"Log encoding format. Either console or json" default:"console" choice:"console"`

SignerPrivateKey string `long:"signer-private-key" env:"XMTPD_SIGNER_PRIVATE_KEY" description:"Private key used to sign messages" required:"true"`
LogLevel string `short:"l" long:"log-level" env:"XMTPD_LOG_LEVEL" description:"Define the logging level, supported strings are: DEBUG, INFO, WARN, ERROR, DPANIC, PANIC, FATAL, and their lower-case forms." default:"INFO"`
LogEncoding string ` long:"log-encoding" env:"XMTPD_LOG_ENCODING" description:"Log encoding format. Either console or json" default:"console" choice:"console"`
SignerPrivateKey string ` long:"signer-private-key" env:"XMTPD_SIGNER_PRIVATE_KEY" description:"Private key used to sign messages" required:"true"`

API ApiOptions `group:"API Options" namespace:"api"`
DB DbOptions `group:"Database Options" namespace:"db"`
Contracts ContractsOptions `group:"Contracts Options" namespace:"contracts"`
Metrics MetricsOptions `group:"Metrics Options" namespace:"metrics"`
Payer PayerOptions `group:"Payer Options" namespace:"payer"`
Tracing TracingOptions `group:"DD APM Tracing Options" namespace:"tracing"`
}

API ApiOptions `group:"API Options" namespace:"api"`
DB DbOptions `group:"Database Options" namespace:"db"`
Contracts ContractsOptions `group:"Contracts Options" namespace:"contracts"`
Payer PayerOptions `group:"Payer Options" namespace:"payer"`
// TracingOptions are settings controlling collection of DD APM traces and error tracking.
type TracingOptions struct {
Enable bool `long:"enable" env:"XMTPD_TRACING_ENABLE" description:"Enable DD APM trace collection"`
}
67 changes: 67 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package metrics

import (
"context"
"fmt"
"github.com/xmtp/xmtpd/pkg/tracing"
"net"
"net/http"

"github.com/pires/go-proxyproto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
)

type Server struct {
ctx context.Context
log *zap.Logger
http net.Listener
}

func NewMetricsServer(
ctx context.Context,
address string,
port int,
log *zap.Logger,
reg *prometheus.Registry,
) (*Server, error) {
s := &Server{
ctx: ctx,
log: log.Named("metrics"),
}

addr := fmt.Sprintf("%s:%d", address, port)
httpListener, err := net.Listen("tcp", addr)
s.http = &proxyproto.Listener{Listener: httpListener}
if err != nil {
return nil, err
}
registerCollectors(reg)
srv := http.Server{
Addr: addr,
Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}),
}

go tracing.PanicWrap(ctx, "metrics server", func(_ context.Context) {
s.log.Info("serving metrics http", zap.String("address", s.http.Addr().String()))
err = srv.Serve(s.http)
if err != nil {
s.log.Error("serving http", zap.Error(err))
}
})

return s, nil
}

func (s *Server) Close() error {
return s.http.Close()
}

func registerCollectors(reg prometheus.Registerer) {
//TODO: add metrics here
var cols []prometheus.Collector
for _, col := range cols {
reg.MustRegister(col)
}
}
30 changes: 30 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package server
import (
"context"
"database/sql"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/xmtp/xmtpd/pkg/metrics"
"net"
"os"
"os/signal"
Expand All @@ -24,6 +27,7 @@ type ReplicationServer struct {
registrant *registrant.Registrant
nodeRegistry registry.NodeRegistry
options config.ServerOptions
metrics *metrics.Server
writerDB *sql.DB
// Can add reader DB later if needed
}
Expand All @@ -37,11 +41,30 @@ func NewReplicationServer(
) (*ReplicationServer, error) {
var err error

var mtcs *metrics.Server
if options.Metrics.Enable {
promReg := prometheus.NewRegistry()
promReg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
promReg.MustRegister(collectors.NewGoCollector())

mtcs, err = metrics.NewMetricsServer(ctx,
options.Metrics.Address,
options.Metrics.Port,
log,
promReg,
)
if err != nil {
log.Fatal("initializing metrics server", zap.Error(err))
return nil, err
}
}

s := &ReplicationServer{
options: options,
log: log,
nodeRegistry: nodeRegistry,
writerDB: writerDB,
metrics: mtcs,
}
s.ctx, s.cancel = context.WithCancel(ctx)

Expand Down Expand Up @@ -76,6 +99,13 @@ func (s *ReplicationServer) WaitForShutdown() {
}

func (s *ReplicationServer) Shutdown() {
// Close metrics server.
if s.metrics != nil {
if err := s.metrics.Close(); err != nil {
s.log.Error("stopping metrics", zap.Error(err))
}
}

if s.apiServer != nil {
s.apiServer.Close()
}
Expand Down

0 comments on commit 6bb1a4f

Please sign in to comment.