Skip to content

Commit

Permalink
Connect timeout as CLI param (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
absurdfarce authored May 16, 2023
1 parent 4207850 commit 769499e
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 11 deletions.
4 changes: 4 additions & 0 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Config struct {
NumConns int
Logger *zap.Logger
HeartBeatInterval time.Duration
ConnectTimeout time.Duration
IdleTimeout time.Duration
RPCAddr string
DC string
Expand Down Expand Up @@ -170,6 +171,7 @@ func (p *Proxy) Connect() error {
Resolver: p.config.Resolver,
ReconnectPolicy: p.config.ReconnectPolicy,
HeartBeatInterval: p.config.HeartBeatInterval,
ConnectTimeout: p.config.ConnectTimeout,
IdleTimeout: p.config.IdleTimeout,
Logger: p.logger,
})
Expand Down Expand Up @@ -202,6 +204,7 @@ func (p *Proxy) Connect() error {
Version: p.cluster.NegotiatedVersion,
Auth: p.config.Auth,
HeartBeatInterval: p.config.HeartBeatInterval,
ConnectTimeout: p.config.ConnectTimeout,
IdleTimeout: p.config.IdleTimeout,
PreparedCache: p.preparedCache,
Logger: p.logger,
Expand Down Expand Up @@ -325,6 +328,7 @@ func (p *Proxy) maybeCreateSession(version primitive.ProtocolVersion, keyspace s
PreparedCache: p.preparedCache,
Keyspace: keyspace,
HeartBeatInterval: p.config.HeartBeatInterval,
ConnectTimeout: p.config.ConnectTimeout,
IdleTimeout: p.config.IdleTimeout,
Logger: p.logger,
})
Expand Down
1 change: 1 addition & 0 deletions proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ func setupProxyTestWithConfig(ctx context.Context, numNodes int, cfg *proxyTestC
ReconnectPolicy: proxycore.NewReconnectPolicyWithDelays(200*time.Millisecond, time.Second),
NumConns: 2,
HeartBeatInterval: 30 * time.Second,
ConnectTimeout: 10 * time.Second,
IdleTimeout: 60 * time.Second,
RPCAddr: cfg.rpcAddr,
Peers: cfg.peers,
Expand Down
2 changes: 2 additions & 0 deletions proxy/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type runConfig struct {
HealthCheck bool `yaml:"health-check" help:"Enable liveness and readiness checks" default:"false" env:"HEALTH_CHECK"`
HttpBind string `yaml:"http-bind" help:"Address to use to bind HTTP server used for health checks" default:":8000" env:"HTTP_BIND"`
HeartbeatInterval time.Duration `yaml:"heartbeat-interval" help:"Interval between performing heartbeats to the cluster" default:"30s" env:"HEARTBEAT_INTERVAL"`
ConnectTimeout time.Duration `yaml:"connect-timeout" help:"Duration before an attempt to connect to a cluster is considered timed out" default:"10s" env:"CONNECT_TIMEOUT"`
IdleTimeout time.Duration `yaml:"idle-timeout" help:"Duration between successful heartbeats before a connection to the cluster is considered unresponsive and closed" default:"60s" env:"IDLE_TIMEOUT"`
ReadinessTimeout time.Duration `yaml:"readiness-timeout" help:"Duration the proxy is unable to connect to the backend cluster before it is considered not ready" default:"30s" env:"READINESS_TIMEOUT"`
IdempotentGraph bool `yaml:"idempotent-graph" help:"If true it will treat all graph queries as idempotent by default and retry them automatically. It may be dangerous to retry some graph queries -- use with caution." default:"false" env:"IDEMPOTENT_GRAPH"`
Expand Down Expand Up @@ -178,6 +179,7 @@ func Run(ctx context.Context, args []string) int {
Auth: auth,
Logger: logger,
HeartBeatInterval: cfg.HeartbeatInterval,
ConnectTimeout: cfg.ConnectTimeout,
IdleTimeout: cfg.IdleTimeout,
RPCAddr: cfg.RpcAddress,
DC: cfg.DataCenter,
Expand Down
11 changes: 5 additions & 6 deletions proxycore/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

const (
DefaultRefreshWindow = 10 * time.Second
DefaultConnectTimeout = 10 * time.Second
DefaultRefreshTimeout = 5 * time.Second
)

Expand Down Expand Up @@ -102,11 +101,11 @@ type ClusterConfig struct {
Resolver EndpointResolver
ReconnectPolicy ReconnectPolicy
RefreshWindow time.Duration
HeartBeatInterval time.Duration
ConnectTimeout time.Duration
RefreshTimeout time.Duration
Logger *zap.Logger
HeartBeatInterval time.Duration
IdleTimeout time.Duration
Logger *zap.Logger
}

type ClusterInfo struct {
Expand Down Expand Up @@ -190,8 +189,8 @@ func (c *Cluster) OnEvent(frame *frame.Frame) {
}

func (c *Cluster) connect(ctx context.Context, endpoint Endpoint, initial bool) (err error) {
timeout := getOrUseDefault(c.config.ConnectTimeout, DefaultConnectTimeout)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
c.logger.Debug("connecting to cluster", zap.Stringer("connect timeout", c.config.ConnectTimeout))
ctx, cancel := context.WithTimeout(context.Background(), c.config.ConnectTimeout)
defer cancel()

conn, err := ConnectClient(ctx, endpoint, ClientConnConfig{Handler: c, Logger: c.logger})
Expand Down Expand Up @@ -233,7 +232,7 @@ func (c *Cluster) connect(ctx context.Context, endpoint Endpoint, initial bool)
c.Info = info
}

go conn.Heartbeats(timeout, version, c.config.HeartBeatInterval, c.config.IdleTimeout, c.logger)
go conn.Heartbeats(c.config.ConnectTimeout, version, c.config.HeartBeatInterval, c.config.IdleTimeout, c.logger)

return c.mergeHosts(hosts)
}
Expand Down
10 changes: 6 additions & 4 deletions proxycore/connpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,10 @@ func (p *connPool) leastBusyConn() *ClientConn {
}

func (p *connPool) connect() (conn *ClientConn, err error) {
timeout := getOrUseDefault(p.config.ConnectTimeout, DefaultConnectTimeout)
ctx, cancel := context.WithTimeout(p.ctx, timeout)
p.logger.Debug("creating connection pool",
zap.Stringer("endpoint", p.config.Endpoint),
zap.Stringer("connect timeout", p.config.ConnectTimeout))
ctx, cancel := context.WithTimeout(p.ctx, p.config.ConnectTimeout)
defer cancel()
conn, err = ConnectClient(ctx, p.config.Endpoint, ClientConnConfig{
PreparedCache: p.preparedCache,
Expand All @@ -149,7 +151,7 @@ func (p *connPool) connect() (conn *ClientConn, err error) {
version, err = conn.Handshake(ctx, p.config.Version, p.config.Auth)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil, fmt.Errorf("handshake took longer than %s to complete", timeout)
return nil, fmt.Errorf("handshake took longer than %s to complete", p.config.ConnectTimeout)
}
return nil, err
}
Expand All @@ -165,7 +167,7 @@ func (p *connPool) connect() (conn *ClientConn, err error) {
}
}

go conn.Heartbeats(timeout, p.config.Version, p.config.HeartBeatInterval, p.config.IdleTimeout, p.logger)
go conn.Heartbeats(p.config.ConnectTimeout, p.config.Version, p.config.HeartBeatInterval, p.config.IdleTimeout, p.logger)
return conn, nil
}

Expand Down
4 changes: 4 additions & 0 deletions proxycore/connpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func TestConnectPool_InvalidAuth(t *testing.T) {
ReconnectPolicy: NewReconnectPolicy(),
NumConns: 2,
Version: supported,
ConnectTimeout: 20 * time.Second,
},
})
if assert.Error(t, err) {
Expand Down Expand Up @@ -162,6 +163,7 @@ func TestConnectPool_AuthExpected(t *testing.T) {
ReconnectPolicy: NewReconnectPolicy(),
NumConns: 2,
Version: supported,
ConnectTimeout: 20 * time.Second,
},
})
if assert.Error(t, err) {
Expand Down Expand Up @@ -190,6 +192,7 @@ func TestConnectPool_InvalidProtocolVersion(t *testing.T) {
ReconnectPolicy: NewReconnectPolicy(),
NumConns: 2,
Version: wanted,
ConnectTimeout: 20 * time.Second,
},
})
if assert.Error(t, err) {
Expand Down Expand Up @@ -241,6 +244,7 @@ func TestConnectPool_InvalidKeyspace(t *testing.T) {
ReconnectPolicy: NewReconnectPolicy(),
NumConns: 2,
Version: supported,
ConnectTimeout: 20 * time.Second,
},
})
if assert.Error(t, err) {
Expand Down
2 changes: 1 addition & 1 deletion proxycore/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ type SessionConfig struct {
Keyspace string
Version primitive.ProtocolVersion
Auth Authenticator
Logger *zap.Logger
// PreparedCache a global cache share across sessions for storing previously prepared queries
PreparedCache PreparedCache
ConnectTimeout time.Duration
HeartBeatInterval time.Duration
IdleTimeout time.Duration
Logger *zap.Logger
}

type Session struct {
Expand Down

0 comments on commit 769499e

Please sign in to comment.