Skip to content

Commit

Permalink
fix(inputs.socket_listener): Avoid noisy logs on closed connection (i…
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored May 19, 2023
1 parent 727533e commit ad4df21
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 4 deletions.
17 changes: 15 additions & 2 deletions plugins/inputs/socket_listener/socket_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ func TestSocketListener(t *testing.T) {
}

// Setup plugin according to test specification
logger := &testutil.CaptureLogger{}
plugin := &SocketListener{
Log: &testutil.Logger{},
Log: logger,
ServiceAddress: proto + "://" + serverAddr,
ContentEncoding: tt.encoding,
ReadBufferSize: tt.buffersize,
Expand All @@ -158,10 +159,17 @@ func TestSocketListener(t *testing.T) {
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()

// Setup the client for submitting data
addr := plugin.listener.addr()

// Create a noop client
// Server is async, so verify no errors at the end.
client, err := createClient(plugin.ServiceAddress, addr, tlsCfg)
require.NoError(t, err)
require.NoError(t, client.Close())

// Setup the client for submitting data
client, err = createClient(plugin.ServiceAddress, addr, tlsCfg)
require.NoError(t, err)

// Send the data with the correct encoding
encoder, err := internal.NewContentEncoder(tt.encoding)
Expand Down Expand Up @@ -189,13 +197,18 @@ func TestSocketListener(t *testing.T) {

plugin.Stop()

// Make sure we clear out old messages
logger.Clear()
if _, ok := plugin.listener.(*streamListener); ok {
// Verify that plugin.Stop() closed the client's connection
_ = client.SetReadDeadline(time.Now().Add(time.Second))
buf := []byte{1}
_, err = client.Read(buf)
require.Equal(t, err, io.EOF)
}

require.Empty(t, logger.Errors())
require.Empty(t, logger.Warnings())
})
}
}
Expand Down
8 changes: 6 additions & 2 deletions plugins/inputs/socket_listener/stream_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"net/url"
"os"
"strconv"
"sync"
"syscall"
"time"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -133,7 +135,7 @@ func (l *streamListener) setupConnection(conn net.Conn) error {

func (l *streamListener) closeConnection(conn net.Conn) {
addr := conn.RemoteAddr().String()
if err := conn.Close(); err != nil {
if err := conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, syscall.EPIPE) {
l.Log.Warnf("Cannot close connection to %q: %v", addr, err)
}
delete(l.connections, conn)
Expand Down Expand Up @@ -190,7 +192,9 @@ func (l *streamListener) listen(acc telegraf.Accumulator) {
go func(c net.Conn) {
defer wg.Done()
if err := l.read(acc, c); err != nil {
acc.AddError(err)
if !errors.Is(err, io.EOF) && !errors.Is(err, syscall.ECONNRESET) {
acc.AddError(err)
}
}
l.Lock()
l.closeConnection(conn)
Expand Down
6 changes: 6 additions & 0 deletions testutil/capturelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,9 @@ func (l *CaptureLogger) LastError() string {
}
return ""
}

func (l *CaptureLogger) Clear() {
l.Lock()
defer l.Unlock()
l.messages = make([]Entry, 0)
}

0 comments on commit ad4df21

Please sign in to comment.