Skip to content

Commit

Permalink
Add STAN configuration options
Browse files Browse the repository at this point in the history
- Add configuration options
- Add OnConnectionLost handler
- Update README / go.mod
  • Loading branch information
im-kulikov committed Oct 27, 2019
1 parent 57f0512 commit e953a5c
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 17 deletions.
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ nats:
user: string
password: string
token: string
stan:
connect_wait: duration
pub_ack_wait: duration
max_pub_acks_inflight: int
ping_max_out: int
ping_interval: int
```
- env example
```
Expand All @@ -59,4 +65,9 @@ NATS_SUB_CHAN_LEN=int
NATS_USER=string
NATS_PASSWORD=string
NATS_TOKEN=string
```
NATS_STAN_CONNECT_WAIT=duration
NATS_STAN_PUB_ACK_WAIT=duration
NATS_STAN_MAX_PUB_ACKS_INFLIGHT=int
NATS_STAN_PING_MAX_OUT=int
NATS_STAN_PING_INTERVAL=int
```
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ require (
github.com/nats-io/stan.go v0.5.0
github.com/spf13/viper v1.4.0
github.com/stretchr/testify v1.4.0
go.uber.org/dig v1.7.0
)
49 changes: 44 additions & 5 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/nats-io/nats.go"
"github.com/nats-io/stan.go"
"github.com/spf13/viper"
"go.uber.org/dig"
)

type (
Expand All @@ -18,6 +19,14 @@ type (
Options []stan.Option
}

StreamerParams struct {
dig.In

Bus *Client
Viper *viper.Viper
OnConnectionLost stan.ConnectionLostHandler `optional:"true"`
}

// Client alias
Client = nats.Conn

Expand Down Expand Up @@ -86,24 +95,54 @@ func NewDefaultConfig(v *viper.Viper) (*Config, error) {
}

// NewDefaultStreamerConfig default settings for streaming connection
func NewDefaultStreamerConfig(v *viper.Viper, bus *Client) (*StreamerConfig, error) {
if !v.IsSet("nats") {
func NewDefaultStreamerConfig(p StreamerParams) (*StreamerConfig, error) {
if !p.Viper.IsSet("nats") {
return nil, ErrEmptyConfig
}

var clusterID, clientID string
if clusterID = v.GetString("nats.cluster_id"); clusterID == "" {
if clusterID = p.Viper.GetString("nats.cluster_id"); clusterID == "" {
return nil, ErrClusterIDEmpty
}

if clientID = v.GetString("nats.client_id"); clientID == "" {
if clientID = p.Viper.GetString("nats.client_id"); clientID == "" {
return nil, ErrClientIDEmpty
}

// set options:
options := []stan.Option{stan.NatsConn(p.Bus)}

// ConnectWait(t time.Duration)
if v := p.Viper.GetDuration("nats.stan.connect_wait"); v > 0 {
options = append(options, stan.ConnectWait(v))
}

// PubAckWait(t time.Duration)
if v := p.Viper.GetDuration("nats.stan.pub_ack_wait"); v > 0 {
options = append(options, stan.PubAckWait(v))
}

// MaxPubAcksInflight(max int)
if v := p.Viper.GetInt("nats.stan.max_pub_acks_inflight"); v > 0 {
options = append(options, stan.MaxPubAcksInflight(v))
}

// Pings(interval, maxOut int)
pingMaxOut := p.Viper.GetInt("nats.stan.ping_max_out")
pingInterval := p.Viper.GetInt("nats.stan.ping_interval")
if pingMaxOut > 0 && pingInterval > 0 {
options = append(options, stan.Pings(pingInterval, pingMaxOut))
}

// SetConnectionLostHandler(handler ConnectionLostHandler)
if p.OnConnectionLost != nil {
options = append(options, stan.SetConnectionLostHandler(p.OnConnectionLost))
}

return &StreamerConfig{
ClientID: clientID,
ClusterID: clusterID,
Options: []stan.Option{stan.NatsConn(bus)},
Options: options,
}, nil
}

Expand Down
33 changes: 22 additions & 11 deletions nats_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package nats

import (
"github.com/nats-io/nats.go"
"github.com/spf13/viper"
"github.com/stretchr/testify/require"
"testing"

"github.com/nats-io/nats-streaming-server/server"
"github.com/nats-io/nats.go"
"github.com/nats-io/stan.go"
"github.com/spf13/viper"
"github.com/stretchr/testify/require"
)

func RunServer(ID string) *server.StanServer {
Expand All @@ -18,7 +19,6 @@ func RunServer(ID string) *server.StanServer {
}

func TestNewDefaultConfig(t *testing.T) {

t.Run("must fail on empty", func(t *testing.T) {
v := viper.New()
c, err := NewDefaultConfig(v)
Expand Down Expand Up @@ -98,23 +98,23 @@ func TestNewDefaultConfig(t *testing.T) {

t.Run("should fail with empty config", func(t *testing.T) {
v := viper.New()
cfg, err := NewDefaultStreamerConfig(v, nil)
cfg, err := NewDefaultStreamerConfig(StreamerParams{Viper: v})
require.Nil(t, cfg)
require.EqualError(t, err, ErrEmptyConfig.Error())
})

t.Run("should fail with empty clusterID", func(t *testing.T) {
v := viper.New()
v.SetDefault("nats.cluster_id", "")
cfg, err := NewDefaultStreamerConfig(v, nil)
cfg, err := NewDefaultStreamerConfig(StreamerParams{Viper: v})
require.Nil(t, cfg)
require.EqualError(t, err, ErrClusterIDEmpty.Error())
})

t.Run("should fail with empty clientID", func(t *testing.T) {
v := viper.New()
v.SetDefault("nats.cluster_id", "myCluster")
cfg, err := NewDefaultStreamerConfig(v, nil)
cfg, err := NewDefaultStreamerConfig(StreamerParams{Viper: v})
require.Nil(t, cfg)
require.EqualError(t, err, ErrClientIDEmpty.Error())
})
Expand All @@ -125,13 +125,13 @@ func TestNewDefaultConfig(t *testing.T) {
v.SetDefault("nats.client_id", "myClient")
v.SetDefault("nats.cluster_id", "myCluster")

cfg, err := NewDefaultStreamerConfig(v, nil)
cfg, err := NewDefaultStreamerConfig(StreamerParams{Viper: v})
require.NoError(t, err)

cfg.Options = nil

stan, err := NewStreamer(cfg)
require.Nil(t, stan)
serve, err := NewStreamer(cfg)
require.Nil(t, serve)
require.EqualError(t, err, ErrEmptyConnection.Error())
})

Expand All @@ -140,6 +140,13 @@ func TestNewDefaultConfig(t *testing.T) {
v.SetDefault("nats.client_id", "myClient")
v.SetDefault("nats.cluster_id", "myCluster")

// stan options:
v.SetDefault("nats.stan.connect_wait", stan.DefaultConnectWait)
v.SetDefault("nats.stan.pub_ack_wait", stan.DefaultAckWait)
v.SetDefault("nats.stan.ping_max_out", stan.DefaultPingMaxOut)
v.SetDefault("nats.stan.ping_interval", stan.DefaultPingInterval)
v.SetDefault("nats.stan.max_pub_acks_inflight", stan.DefaultMaxPubAcksInflight)

// Run a NATS Streaming server
s := RunServer("myCluster")
defer s.Shutdown()
Expand All @@ -149,7 +156,11 @@ func TestNewDefaultConfig(t *testing.T) {

defer con.Close()

cfg, err := NewDefaultStreamerConfig(v, con)
cfg, err := NewDefaultStreamerConfig(StreamerParams{
Viper: v,
Bus: con,
OnConnectionLost: func(stan.Conn, error) {},
})
require.NoError(t, err)

st, err := NewStreamer(cfg)
Expand Down

0 comments on commit e953a5c

Please sign in to comment.