Skip to content

Commit

Permalink
Perform heartbeats on connection (#23)
Browse files Browse the repository at this point in the history
* Perform heartbeats on connection

* Use TCPListener in order to disable keepalive

* Update comments for connect

* Move heartbeat to clientconn and make interval along with idle timeout configurable

* Add heartbeat config to tests

* Cleanup heartbeats and add tests

* Update tests per code review comments
  • Loading branch information
dougwettlaufer authored Sep 2, 2021
1 parent 6fd3560 commit 262cb1a
Show file tree
Hide file tree
Showing 15 changed files with 395 additions and 84 deletions.
8 changes: 5 additions & 3 deletions astra/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"encoding/json"
"errors"
"fmt"
"cql-proxy/proxycore"
"github.com/datastax/go-cassandra-native-protocol/primitive"
"io/ioutil"
"net/http"
"sync"
"time"

"github.com/datastax/go-cassandra-native-protocol/primitive"

"cql-proxy/proxycore"
)

type astraResolver struct {
Expand Down Expand Up @@ -176,4 +178,4 @@ type astraMetadata struct {
Version int `json:"version"`
Region string `json:"region"`
ContactInfo contactInfo `json:"contact_info"`
}
}
15 changes: 13 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
github.com/alecthomas/kong v0.2.17 h1:URDISCI96MIgcIlQyoCAlhOmrSw6pZScBNkctg8r0W0=
github.com/alecthomas/kong v0.2.17/go.mod h1:ka3VZ8GZNPXv9Ov+j4YNLkI8mTuhXyr/0ktSlqIydQQ=

github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521184019-c5ad59b459ec h1:EEyRvzmpEUZ+I8WmD5cw/vY8EqhambkOqy5iFr0908A=
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521184019-c5ad59b459ec/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
Expand All @@ -11,6 +12,7 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/golang/snappy v0.0.2 h1:aeE13tS0IiQgFjYdoL8qN3K1N2bXXtI6Vi51/y7BpMw=
github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/pierrec/lz4/v4 v4.0.3 h1:vNQKSVZNYUEAvRY9FaUXAF1XPbSOHJtDTiP41kzDz2E=

github.com/pierrec/lz4/v4 v4.0.3/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand All @@ -19,12 +21,14 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.20.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=


github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=

go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.8.0 h1:CUhrE4N1rqSE6FM9ecihEjRkLQu8cDfgDyoOs83mEY4=
go.uber.org/atomic v1.8.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand All @@ -34,16 +38,23 @@ go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95a
go.uber.org/zap v1.17.0 h1:MTjgFu6ZLKvY6Pvaqk97GlxNBuMpV4Hy/3P6tRGlI2U=
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=

golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=

golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=

golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

golang.org/x/tools v0.0.0-20190828213141-aed303cbaa74/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=

golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=

gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
35 changes: 22 additions & 13 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"net"
"net/http"
_ "net/http/pprof"
"time"

"cql-proxy/astra"
"cql-proxy/proxy"
Expand All @@ -30,13 +31,15 @@ import (
)

var cli struct {
Bundle string `help:"Path to secure connect bundle" short:"b" env:"BUNDLE"`
Username string `help:"Username to use for authentication" short:"u" env:"USERNAME"`
Password string `help:"Password to use for authentication" short:"p" env:"PASSWORD"`
ContactPoints []string `help:"Contact points for cluster. Ignored if using the bundle path option." short:"c" env:"CONTACT_POINTS"`
Bind string `help:"Address to use to bind serve" short:"a" env:"BIND"`
Debug bool `help:"Show debug logging" env:"DEBUG"`
Profiling bool `help:"Enable profiling" env:"PROFILING"`
Bundle string `help:"Path to secure connect bundle" short:"b" env:"BUNDLE"`
Username string `help:"Username to use for authentication" short:"u" env:"USERNAME"`
Password string `help:"Password to use for authentication" short:"p" env:"PASSWORD"`
ContactPoints []string `help:"Contact points for cluster. Ignored if using the bundle path option." short:"c" env:"CONTACT_POINTS"`
Bind string `help:"Address to use to bind serve" short:"a" env:"BIND"`
Debug bool `help:"Show debug logging" env:"DEBUG"`
Profiling bool `help:"Enable profiling" env:"PROFILING"`
HeartbeatInterval time.Duration `help:"Interval between performing heartbeats to the cluster" default:"30s" env:"HEARTBEAT_INTERVAL"`
IdleTimeout time.Duration `help:"Time between successful heartbeats before a connection to the cluster is considered unresponsive and closed" default:"60s" env:"IDLE_TIMEOUT"`
}

func main() {
Expand All @@ -56,6 +59,10 @@ func main() {
cliCtx.Fatalf("must provide either bundle path or contact points")
}

if cli.HeartbeatInterval >= cli.IdleTimeout {
cliCtx.Fatalf("idle-timeout must be greater than heartbeat-interval")
}

ctx := context.Background()

var logger *zap.Logger
Expand All @@ -76,12 +83,14 @@ func main() {
}

p := proxy.NewProxy(ctx, proxy.Config{
Version: primitive.ProtocolVersion4,
Resolver: resolver,
ReconnectPolicy: proxycore.NewReconnectPolicy(),
NumConns: 1,
Auth: auth,
Logger: logger,
Version: primitive.ProtocolVersion4,
Resolver: resolver,
ReconnectPolicy: proxycore.NewReconnectPolicy(),
NumConns: 1,
Auth: auth,
Logger: logger,
HeartBeatInterval: cli.HeartbeatInterval,
IdleTimeout: cli.IdleTimeout,
})

bind, _, err := net.SplitHostPort(cli.Bind)
Expand Down
69 changes: 46 additions & 23 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import (
"net"
"sync"
"sync/atomic"
"time"

"cql-proxy/parser"
"cql-proxy/proxycore"

"github.com/datastax/go-cassandra-native-protocol/datatype"
"github.com/datastax/go-cassandra-native-protocol/frame"
"github.com/datastax/go-cassandra-native-protocol/message"
Expand All @@ -40,19 +42,21 @@ var (
)

type Config struct {
Version primitive.ProtocolVersion
Auth proxycore.Authenticator
Resolver proxycore.EndpointResolver
ReconnectPolicy proxycore.ReconnectPolicy
NumConns int
Logger *zap.Logger
Version primitive.ProtocolVersion
Auth proxycore.Authenticator
Resolver proxycore.EndpointResolver
ReconnectPolicy proxycore.ReconnectPolicy
NumConns int
Logger *zap.Logger
HeartBeatInterval time.Duration
IdleTimeout time.Duration
}

type Proxy struct {
ctx context.Context
config Config
logger *zap.Logger
listener net.Listener
listener *net.TCPListener
cluster *proxycore.Cluster
sessions sync.Map
sessMu *sync.Mutex
Expand Down Expand Up @@ -107,10 +111,12 @@ func (p *Proxy) Listen(address string) error {
var err error

p.cluster, err = proxycore.ConnectCluster(p.ctx, proxycore.ClusterConfig{
Version: p.config.Version,
Auth: p.config.Auth,
Resolver: p.config.Resolver,
ReconnectPolicy: p.config.ReconnectPolicy,
Version: p.config.Version,
Auth: p.config.Auth,
Resolver: p.config.Resolver,
ReconnectPolicy: p.config.ReconnectPolicy,
HeartBeatInterval: p.config.HeartBeatInterval,
IdleTimeout: p.config.IdleTimeout,
})

if err != nil {
Expand All @@ -131,10 +137,13 @@ func (p *Proxy) Listen(address string) error {
}

sess, err := proxycore.ConnectSession(p.ctx, p.cluster, proxycore.SessionConfig{
ReconnectPolicy: p.config.ReconnectPolicy,
NumConns: p.config.NumConns,
Version: p.cluster.NegotiatedVersion,
Auth: p.config.Auth,
ReconnectPolicy: p.config.ReconnectPolicy,
NumConns: p.config.NumConns,
Version: p.cluster.NegotiatedVersion,
Auth: p.config.Auth,
Logger: p.config.Logger,
HeartBeatInterval: p.config.HeartBeatInterval,
IdleTimeout: p.config.IdleTimeout,
})

if err != nil {
Expand All @@ -143,7 +152,11 @@ func (p *Proxy) Listen(address string) error {

p.sessions.Store("", sess) // No keyspace

p.listener, err = net.Listen("tcp", address)
tcpAddr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return err
}
p.listener, err = net.ListenTCP("tcp", tcpAddr)
if err != nil {
return err
}
Expand All @@ -155,15 +168,23 @@ func (p *Proxy) Listen(address string) error {

func (p *Proxy) Serve() error {
for {
conn, err := p.listener.Accept()
conn, err := p.listener.AcceptTCP()
if err != nil {
return err
}
p.handle(conn)
}
}

func (p *Proxy) handle(conn net.Conn) {
func (p *Proxy) handle(conn *net.TCPConn) {
if err := conn.SetKeepAlive(false); err != nil {
p.logger.Warn("failed to disable keepalive on connection", zap.Error(err))
}

if err := conn.SetNoDelay(true); err != nil {
p.logger.Warn("failed to set TCP_NODELAY on connection", zap.Error(err))
}

cl := &client{
ctx: p.ctx,
proxy: p,
Expand All @@ -180,11 +201,13 @@ func (p *Proxy) maybeCreateSession(keyspace string) error {
defer p.sessMu.Unlock()
if _, ok := p.sessions.Load(keyspace); !ok {
sess, err := proxycore.ConnectSession(p.ctx, p.cluster, proxycore.SessionConfig{
ReconnectPolicy: p.config.ReconnectPolicy,
NumConns: p.config.NumConns,
Version: p.cluster.NegotiatedVersion,
Auth: p.config.Auth,
Keyspace: keyspace,
ReconnectPolicy: p.config.ReconnectPolicy,
NumConns: p.config.NumConns,
Version: p.cluster.NegotiatedVersion,
Auth: p.config.Auth,
Keyspace: keyspace,
HeartBeatInterval: p.config.HeartBeatInterval,
IdleTimeout: p.config.IdleTimeout,
})
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"cql-proxy/proxycore"

"github.com/datastax/go-cassandra-native-protocol/datatype"
"github.com/datastax/go-cassandra-native-protocol/frame"
"github.com/datastax/go-cassandra-native-protocol/message"
Expand Down Expand Up @@ -85,6 +86,8 @@ func TestProxy_ListenAndServe(t *testing.T) {
Resolver: proxycore.NewResolverWithDefaultPort([]string{clusterContactPoint}, clusterPort),
ReconnectPolicy: proxycore.NewReconnectPolicyWithDelays(200*time.Millisecond, time.Second),
NumConns: 2,
HeartBeatInterval: 30 * time.Second,
IdleTimeout: 60 * time.Second,
})

err = proxy.Listen(proxyContactPoint)
Expand Down
40 changes: 40 additions & 0 deletions proxycore/clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/datastax/go-cassandra-native-protocol/frame"
"github.com/datastax/go-cassandra-native-protocol/message"
"github.com/datastax/go-cassandra-native-protocol/primitive"

"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -53,6 +56,7 @@ type ClientConn struct {
closingMu *sync.RWMutex
}

// ConnectClient creates a new connection to an endpoint within a downstream cluster using TLS if specified.
func ConnectClient(ctx context.Context, endpoint Endpoint) (*ClientConn, error) {
return ConnectClientWithEvents(ctx, endpoint, nil)
}
Expand Down Expand Up @@ -330,6 +334,42 @@ func (c *ClientConn) Err() error {
return c.conn.Err()
}

// Heartbeats sends an OPTIONS request to the endpoint in order to keep the connection alive.
func (c *ClientConn) Heartbeats(connectTimeout time.Duration, version primitive.ProtocolVersion, heartbeatInterval time.Duration, idleTimeout time.Duration, logger *zap.Logger) {
idleTimer := time.NewTimer(idleTimeout)

for {
select {
case <-c.conn.IsClosed():
return
case <-time.After(heartbeatInterval):
ctx, cancel := context.WithTimeout(context.Background(), connectTimeout)
response, err := c.SendAndReceive(ctx, frame.NewFrame(version, -1, &message.Options{}))
cancel()
if err != nil {
logger.Warn("error occurred performing heartbeat", zap.Error(err))
continue
}

switch response.Body.Message.(type) {
case *message.Supported:
logger.Debug("successfully performed a heartbeat", zap.Stringer("remoteAddress", c.conn.RemoteAddr()))
if idleTimer.Stop() {
idleTimer.Reset(idleTimeout)
}
case message.Error:
logger.Warn("error occurred performing heartbeat", zap.String("optionsError", response.Body.String()))
default:
logger.Warn("unexpected message received while performing heartbeat", zap.String("optionsError", response.Body.String()))
}
case <-idleTimer.C:
_ = c.Close()
logger.Sugar().Errorf("error connection didn't perform heartbeats within %v", idleTimeout)
return
}
}
}

type requestSender struct {
request Request
stream int16
Expand Down
Loading

0 comments on commit 262cb1a

Please sign in to comment.