Skip to content

Commit

Permalink
Prevent the tcp sink from blocking (#79)
Browse files Browse the repository at this point in the history
Prevent tcpStatsdSink.Flush() from hanging when there is nothing 
to flush and add test to replicate Flush() deadlock.

* tcp sink: ensure Flush() exits

Previosly, continuous writes could prevent calls to Flush() from
exiting.  This commit changes Flush() to only flush buffers that
are pending at the time Flush() is called.

* tcp sink: Add connect and write timeouts

This will prevent establishing or writing to a net.Conn from hanging the
program.
  • Loading branch information
Charlie Vieth authored Jun 20, 2019
1 parent 9b96f33 commit 0943cd5
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 44 deletions.
120 changes: 76 additions & 44 deletions tcp_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
// e.g. `func NewTCPStatsdSinkWithOptions(opts ...Option) Sink`

const (
defaultRetryInterval = time.Second * 3
defaultDialTimeout = defaultRetryInterval / 2
defaultWriteTimeout = time.Second

flushInterval = time.Second
logOnEveryNDroppedBytes = 1 << 15 // Log once per 32kb of dropped stats
defaultBufferSize = 1 << 16
Expand All @@ -34,23 +38,25 @@ func NewTCPStatsdSink() FlushableSink {
outc: outc,
}
s := &tcpStatsdSink{
outc: outc,
bufWriter: bufio.NewWriterSize(&writer, defaultBufferSize), // TODO(btc): parameterize size
outc: outc,
// TODO(btc): parameterize size
bufWriter: bufio.NewWriterSize(&writer, defaultBufferSize),
// arbitrarily buffered
doFlush: make(chan struct{}, 8),
}
s.flushCond = sync.NewCond(&s.mu)
go s.run()
return s
}

type tcpStatsdSink struct {
conn net.Conn
outc chan *bytes.Buffer

mu sync.Mutex
droppedBytes uint64
bufWriter *bufio.Writer
flushCond *sync.Cond
lastFlushTime time.Time
conn net.Conn
outc chan *bytes.Buffer
mu sync.Mutex
bufWriter *bufio.Writer
flushCond *sync.Cond
doFlush chan struct{}
droppedBytes uint64
}

type sinkWriter struct {
Expand All @@ -70,27 +76,26 @@ func (w *sinkWriter) Write(p []byte) (int, error) {
}

func (s *tcpStatsdSink) Flush() {
now := time.Now()
if err := s.flush(); err != nil {
// Not much we can do here; we don't know how/why we failed.
return
if s.flush() != nil {
return // nothing we can do
}

s.doFlush <- struct{}{}
s.mu.Lock()
for now.After(s.lastFlushTime) {
for len(s.outc) != 0 {
s.flushCond.Wait()
}
s.mu.Unlock()
}

func (s *tcpStatsdSink) flush() error {
s.mu.Lock()
defer s.mu.Unlock()
err := s.bufWriter.Flush()
if err != nil {
s.handleFlushError(err, s.bufWriter.Buffered())
return err
}
return nil
s.mu.Unlock()
return err
}

// s.mu should be held
Expand Down Expand Up @@ -166,51 +171,78 @@ func (s *tcpStatsdSink) FlushTimer(name string, value float64) {
}

func (s *tcpStatsdSink) run() {
settings := GetSettings()
conf := GetSettings()
addr := net.JoinHostPort(conf.StatsdHost, strconv.Itoa(conf.StatsdPort))

t := time.NewTicker(flushInterval)
defer t.Stop()
for {
if s.conn == nil {
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", settings.StatsdHost,
settings.StatsdPort))
if err != nil {
if err := s.connect(addr); err != nil {
logger.Warnf("statsd connection error: %s", err)

// TODO (CEV): don't sleep on the first retry
time.Sleep(3 * time.Second)
continue
}
s.conn = conn
}

select {
case <-t.C:
s.flush()
case buf, ok := <-s.outc: // Receive from the channel and check if the channel has been closed
if !ok {
logger.Warnf("Closing statsd client")
s.conn.Close()
return
}
lenbuf := len(buf.Bytes())
_, err := buf.WriteTo(s.conn)
if len(s.outc) == 0 {
// We've at least tried to write all the data we have. Wake up anyone waiting on flush.
s.mu.Lock()
s.lastFlushTime = time.Now()
s.mu.Unlock()
s.flushCond.Broadcast()
}
if err != nil {
s.mu.Lock()
s.handleFlushError(err, lenbuf)
s.mu.Unlock()
_ = s.conn.Close() // Ignore close failures
s.conn = nil
case <-s.doFlush:
// Only flush pending buffers, this prevents an issue where
// continuous writes prevent the flush loop from exiting.
//
// If there is an error writeBuffer() will set the conn to
// nil thus breaking the loop.
//
n := len(s.outc)
for i := 0; i < n && s.conn != nil; i++ {
buf := <-s.outc
s.writeBuffer(buf)
putBuffer(buf)
}
// Signal every blocked Flush() call. We don't handle multiple
// pending Flush() calls independently as we cannot allow this
// to block. This is best effort only.
//
s.flushCond.Broadcast()
case buf := <-s.outc:
s.writeBuffer(buf)
putBuffer(buf)
}
}
}

// writeBuffer writes the buffer to the underlying conn. May only be called
// from run().
func (s *tcpStatsdSink) writeBuffer(buf *bytes.Buffer) {
len := buf.Len()

// TODO (CEV): parameterize timeout
s.conn.SetWriteDeadline(time.Now().Add(defaultWriteTimeout))
_, err := buf.WriteTo(s.conn)
s.conn.SetWriteDeadline(time.Time{}) // clear

if err != nil {
s.mu.Lock()
s.handleFlushError(err, len)
s.mu.Unlock()
_ = s.conn.Close()
s.conn = nil // this will break the loop
}
}

func (s *tcpStatsdSink) connect(address string) error {
// TODO (CEV): parameterize timeout
conn, err := net.DialTimeout("tcp", address, defaultDialTimeout)
if err == nil {
s.conn = conn
}
return err
}

var bufferPool sync.Pool

func getBuffer() *bytes.Buffer {
Expand Down
97 changes: 97 additions & 0 deletions tcp_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ package stats
import (
"bufio"
"fmt"
"io"
"io/ioutil"
"net"
"os"
"strings"
"sync"
"testing"
"time"
)

type testStatSink struct {
Expand Down Expand Up @@ -355,6 +360,98 @@ func TestStatGenerator(t *testing.T) {
}
}

func TestTCPStatsdSink_Flush(t *testing.T) {
lc, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer lc.Close()

_, port, err := net.SplitHostPort(lc.Addr().String())
if err != nil {
t.Fatal(err)
}

oldPort, exists := os.LookupEnv("STATSD_PORT")
if exists {
defer os.Setenv("STATSD_PORT", oldPort)
} else {
defer os.Unsetenv("STATSD_PORT")
}
os.Setenv("STATSD_PORT", port)

go func() {
for {
conn, err := lc.Accept()
if conn != nil {
_, _ = io.Copy(ioutil.Discard, conn)
}
if err != nil {
return
}
}
}()

sink := NewTCPStatsdSink()

t.Run("One", func(t *testing.T) {
flushed := make(chan struct{})
go func() {
defer close(flushed)
sink.Flush()
}()
select {
case <-flushed:
// ok
case <-time.After(time.Second):
t.Fatal("Flush blocked")
}
})

t.Run("Ten", func(t *testing.T) {
flushed := make(chan struct{})
go func() {
defer close(flushed)
for i := 0; i < 10; i++ {
sink.Flush()
}
}()
select {
case <-flushed:
// ok
case <-time.After(time.Second):
t.Fatal("Flush blocked")
}
})

t.Run("Parallel", func(t *testing.T) {
start := make(chan struct{})
wg := new(sync.WaitGroup)
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
<-start
defer wg.Done()
for i := 0; i < 10; i++ {
sink.Flush()
}
}()
}
flushed := make(chan struct{})
go func() {
close(start)
wg.Wait()
close(flushed)
}()
select {
case <-flushed:
// ok
case <-time.After(time.Second):
t.Fatal("Flush blocked")
}
})
}

type nopWriter struct{}

func (nopWriter) Write(b []byte) (int, error) {
Expand Down

0 comments on commit 0943cd5

Please sign in to comment.