Skip to content

Commit

Permalink
Tweak NATS setup
Browse files Browse the repository at this point in the history
  • Loading branch information
neilalexander committed Mar 24, 2024
1 parent 023788b commit da69fa2
Showing 1 changed file with 21 additions and 25 deletions.
46 changes: 21 additions & 25 deletions setup/jetstream/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/neilalexander/harmony/setup/process"

natsserver "github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
natsclient "github.com/nats-io/nats.go"
)

Expand All @@ -36,17 +35,20 @@ func DeleteAllStreams(js natsclient.JetStreamContext, cfg *config.JetStream) {
func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
natsLock.Lock()
defer natsLock.Unlock()
// check if we need an in-process NATS Server
if len(cfg.Addresses) != 0 {
// reuse existing connections
if s.nc != nil {
return s.js, s.nc
}
var err error

// If an existing connection exists, return it.
if s.nc != nil && s.js != nil {
return s.js, s.nc
}

// For connecting to an external NATS server.
if len(cfg.Addresses) > 0 {
s.js, s.nc = setupNATS(process, cfg, nil)
return s.js, s.nc
}
if s.Server == nil {
var err error

if len(cfg.Addresses) == 0 && s.Server == nil {
opts := &natsserver.Options{
ServerName: "monolith",
DontListen: true,
Expand All @@ -57,8 +59,7 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
NoSigs: true,
NoLog: cfg.NoLog,
}
s.Server, err = natsserver.NewServer(opts)
if err != nil {
if s.Server, err = natsserver.NewServer(opts); err != nil {
panic(err)
}
if !cfg.NoLog {
Expand All @@ -74,30 +75,25 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
s.WaitForShutdown()
process.ComponentFinished()
}()
if !s.ReadyForConnections(time.Second * 60) {
logrus.Fatalln("NATS did not start in time")
}
}
if !s.ReadyForConnections(time.Second * 60) {
logrus.Fatalln("NATS did not start in time")
}
// reuse existing connections
if s.nc != nil {
return s.js, s.nc
}
nc, err := natsclient.Connect("", natsclient.InProcessServer(s))
if err != nil {

// No existing process connection, create a new one.
if s.nc, err = natsclient.Connect("", natsclient.InProcessServer(s.Server)); err != nil {
logrus.Fatalln("Failed to create NATS client")
}
js, _ := setupNATS(process, cfg, nc)
s.js = js
s.nc = nc
return js, nc
s.js, s.nc = setupNATS(process, cfg, s.nc)
return s.js, s.nc
}

// nolint:gocyclo
func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
if nc == nil {
var err error
opts := []natsclient.Option{
nats.Name("Harmony"),
natsclient.Name("Harmony"),
}
if cfg.DisableTLSValidation {
opts = append(opts, natsclient.Secure(&tls.Config{
Expand Down

0 comments on commit da69fa2

Please sign in to comment.