diff --git a/README.md b/README.md index f47bd30..25e67b7 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,12 @@ nats: user: string password: string token: string + stan: + connect_wait: duration + pub_ack_wait: duration + max_pub_acks_inflight: int + ping_max_out: int + ping_interval: int ``` - env example ``` @@ -59,4 +65,9 @@ NATS_SUB_CHAN_LEN=int NATS_USER=string NATS_PASSWORD=string NATS_TOKEN=string -``` \ No newline at end of file +NATS_STAN_CONNECT_WAIT=duration +NATS_STAN_PUB_ACK_WAIT=duration +NATS_STAN_MAX_PUB_ACKS_INFLIGHT=int +NATS_STAN_PING_MAX_OUT=int +NATS_STAN_PING_INTERVAL=int +``` diff --git a/go.mod b/go.mod index 6daa446..a9df004 100644 --- a/go.mod +++ b/go.mod @@ -10,4 +10,5 @@ require ( github.com/nats-io/stan.go v0.5.0 github.com/spf13/viper v1.4.0 github.com/stretchr/testify v1.4.0 + go.uber.org/dig v1.7.0 ) diff --git a/nats.go b/nats.go index 636aec5..7c3dac2 100644 --- a/nats.go +++ b/nats.go @@ -5,6 +5,7 @@ import ( "github.com/nats-io/nats.go" "github.com/nats-io/stan.go" "github.com/spf13/viper" + "go.uber.org/dig" ) type ( @@ -18,6 +19,14 @@ type ( Options []stan.Option } + StreamerParams struct { + dig.In + + Bus *Client + Viper *viper.Viper + OnConnectionLost stan.ConnectionLostHandler `optional:"true"` + } + // Client alias Client = nats.Conn @@ -86,24 +95,54 @@ func NewDefaultConfig(v *viper.Viper) (*Config, error) { } // NewDefaultStreamerConfig default settings for streaming connection -func NewDefaultStreamerConfig(v *viper.Viper, bus *Client) (*StreamerConfig, error) { - if !v.IsSet("nats") { +func NewDefaultStreamerConfig(p StreamerParams) (*StreamerConfig, error) { + if !p.Viper.IsSet("nats") { return nil, ErrEmptyConfig } var clusterID, clientID string - if clusterID = v.GetString("nats.cluster_id"); clusterID == "" { + if clusterID = p.Viper.GetString("nats.cluster_id"); clusterID == "" { return nil, ErrClusterIDEmpty } - if clientID = v.GetString("nats.client_id"); clientID == "" { + if clientID = p.Viper.GetString("nats.client_id"); clientID == "" { return nil, ErrClientIDEmpty } + // set options: + options := []stan.Option{stan.NatsConn(p.Bus)} + + // ConnectWait(t time.Duration) + if v := p.Viper.GetDuration("nats.stan.connect_wait"); v > 0 { + options = append(options, stan.ConnectWait(v)) + } + + // PubAckWait(t time.Duration) + if v := p.Viper.GetDuration("nats.stan.pub_ack_wait"); v > 0 { + options = append(options, stan.PubAckWait(v)) + } + + // MaxPubAcksInflight(max int) + if v := p.Viper.GetInt("nats.stan.max_pub_acks_inflight"); v > 0 { + options = append(options, stan.MaxPubAcksInflight(v)) + } + + // Pings(interval, maxOut int) + pingMaxOut := p.Viper.GetInt("nats.stan.ping_max_out") + pingInterval := p.Viper.GetInt("nats.stan.ping_interval") + if pingMaxOut > 0 && pingInterval > 0 { + options = append(options, stan.Pings(pingInterval, pingMaxOut)) + } + + // SetConnectionLostHandler(handler ConnectionLostHandler) + if p.OnConnectionLost != nil { + options = append(options, stan.SetConnectionLostHandler(p.OnConnectionLost)) + } + return &StreamerConfig{ ClientID: clientID, ClusterID: clusterID, - Options: []stan.Option{stan.NatsConn(bus)}, + Options: options, }, nil } diff --git a/nats_test.go b/nats_test.go index 1083e85..e0cc097 100644 --- a/nats_test.go +++ b/nats_test.go @@ -1,12 +1,13 @@ package nats import ( - "github.com/nats-io/nats.go" - "github.com/spf13/viper" - "github.com/stretchr/testify/require" "testing" "github.com/nats-io/nats-streaming-server/server" + "github.com/nats-io/nats.go" + "github.com/nats-io/stan.go" + "github.com/spf13/viper" + "github.com/stretchr/testify/require" ) func RunServer(ID string) *server.StanServer { @@ -18,7 +19,6 @@ func RunServer(ID string) *server.StanServer { } func TestNewDefaultConfig(t *testing.T) { - t.Run("must fail on empty", func(t *testing.T) { v := viper.New() c, err := NewDefaultConfig(v) @@ -98,7 +98,7 @@ func TestNewDefaultConfig(t *testing.T) { t.Run("should fail with empty config", func(t *testing.T) { v := viper.New() - cfg, err := NewDefaultStreamerConfig(v, nil) + cfg, err := NewDefaultStreamerConfig(StreamerParams{Viper: v}) require.Nil(t, cfg) require.EqualError(t, err, ErrEmptyConfig.Error()) }) @@ -106,7 +106,7 @@ func TestNewDefaultConfig(t *testing.T) { t.Run("should fail with empty clusterID", func(t *testing.T) { v := viper.New() v.SetDefault("nats.cluster_id", "") - cfg, err := NewDefaultStreamerConfig(v, nil) + cfg, err := NewDefaultStreamerConfig(StreamerParams{Viper: v}) require.Nil(t, cfg) require.EqualError(t, err, ErrClusterIDEmpty.Error()) }) @@ -114,7 +114,7 @@ func TestNewDefaultConfig(t *testing.T) { t.Run("should fail with empty clientID", func(t *testing.T) { v := viper.New() v.SetDefault("nats.cluster_id", "myCluster") - cfg, err := NewDefaultStreamerConfig(v, nil) + cfg, err := NewDefaultStreamerConfig(StreamerParams{Viper: v}) require.Nil(t, cfg) require.EqualError(t, err, ErrClientIDEmpty.Error()) }) @@ -125,13 +125,13 @@ func TestNewDefaultConfig(t *testing.T) { v.SetDefault("nats.client_id", "myClient") v.SetDefault("nats.cluster_id", "myCluster") - cfg, err := NewDefaultStreamerConfig(v, nil) + cfg, err := NewDefaultStreamerConfig(StreamerParams{Viper: v}) require.NoError(t, err) cfg.Options = nil - stan, err := NewStreamer(cfg) - require.Nil(t, stan) + serve, err := NewStreamer(cfg) + require.Nil(t, serve) require.EqualError(t, err, ErrEmptyConnection.Error()) }) @@ -140,6 +140,13 @@ func TestNewDefaultConfig(t *testing.T) { v.SetDefault("nats.client_id", "myClient") v.SetDefault("nats.cluster_id", "myCluster") + // stan options: + v.SetDefault("nats.stan.connect_wait", stan.DefaultConnectWait) + v.SetDefault("nats.stan.pub_ack_wait", stan.DefaultAckWait) + v.SetDefault("nats.stan.ping_max_out", stan.DefaultPingMaxOut) + v.SetDefault("nats.stan.ping_interval", stan.DefaultPingInterval) + v.SetDefault("nats.stan.max_pub_acks_inflight", stan.DefaultMaxPubAcksInflight) + // Run a NATS Streaming server s := RunServer("myCluster") defer s.Shutdown() @@ -149,7 +156,11 @@ func TestNewDefaultConfig(t *testing.T) { defer con.Close() - cfg, err := NewDefaultStreamerConfig(v, con) + cfg, err := NewDefaultStreamerConfig(StreamerParams{ + Viper: v, + Bus: con, + OnConnectionLost: func(stan.Conn, error) {}, + }) require.NoError(t, err) st, err := NewStreamer(cfg)