diff --git a/plugins/inputs/socket_listener/socket_listener_test.go b/plugins/inputs/socket_listener/socket_listener_test.go index 0dcd4448dfd10..1cf8d81d61a49 100644 --- a/plugins/inputs/socket_listener/socket_listener_test.go +++ b/plugins/inputs/socket_listener/socket_listener_test.go @@ -136,8 +136,9 @@ func TestSocketListener(t *testing.T) { } // Setup plugin according to test specification + logger := &testutil.CaptureLogger{} plugin := &SocketListener{ - Log: &testutil.Logger{}, + Log: logger, ServiceAddress: proto + "://" + serverAddr, ContentEncoding: tt.encoding, ReadBufferSize: tt.buffersize, @@ -158,10 +159,17 @@ func TestSocketListener(t *testing.T) { require.NoError(t, plugin.Start(&acc)) defer plugin.Stop() - // Setup the client for submitting data addr := plugin.listener.addr() + + // Create a noop client + // Server is async, so verify no errors at the end. client, err := createClient(plugin.ServiceAddress, addr, tlsCfg) require.NoError(t, err) + require.NoError(t, client.Close()) + + // Setup the client for submitting data + client, err = createClient(plugin.ServiceAddress, addr, tlsCfg) + require.NoError(t, err) // Send the data with the correct encoding encoder, err := internal.NewContentEncoder(tt.encoding) @@ -189,6 +197,8 @@ func TestSocketListener(t *testing.T) { plugin.Stop() + // Make sure we clear out old messages + logger.Clear() if _, ok := plugin.listener.(*streamListener); ok { // Verify that plugin.Stop() closed the client's connection _ = client.SetReadDeadline(time.Now().Add(time.Second)) @@ -196,6 +206,9 @@ func TestSocketListener(t *testing.T) { _, err = client.Read(buf) require.Equal(t, err, io.EOF) } + + require.Empty(t, logger.Errors()) + require.Empty(t, logger.Warnings()) }) } } diff --git a/plugins/inputs/socket_listener/stream_listener.go b/plugins/inputs/socket_listener/stream_listener.go index 0a77c9429aca3..d50a8b594a022 100644 --- a/plugins/inputs/socket_listener/stream_listener.go +++ b/plugins/inputs/socket_listener/stream_listener.go @@ -5,11 +5,13 @@ import ( "crypto/tls" "errors" "fmt" + "io" "net" "net/url" "os" "strconv" "sync" + "syscall" "time" "github.com/influxdata/telegraf" @@ -133,7 +135,7 @@ func (l *streamListener) setupConnection(conn net.Conn) error { func (l *streamListener) closeConnection(conn net.Conn) { addr := conn.RemoteAddr().String() - if err := conn.Close(); err != nil { + if err := conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, syscall.EPIPE) { l.Log.Warnf("Cannot close connection to %q: %v", addr, err) } delete(l.connections, conn) @@ -190,7 +192,9 @@ func (l *streamListener) listen(acc telegraf.Accumulator) { go func(c net.Conn) { defer wg.Done() if err := l.read(acc, c); err != nil { - acc.AddError(err) + if !errors.Is(err, io.EOF) && !errors.Is(err, syscall.ECONNRESET) { + acc.AddError(err) + } } l.Lock() l.closeConnection(conn) diff --git a/testutil/capturelog.go b/testutil/capturelog.go index 47bc6c904b670..2c21885525301 100644 --- a/testutil/capturelog.go +++ b/testutil/capturelog.go @@ -127,3 +127,9 @@ func (l *CaptureLogger) LastError() string { } return "" } + +func (l *CaptureLogger) Clear() { + l.Lock() + defer l.Unlock() + l.messages = make([]Entry, 0) +}