From b03a1017ecff2f1118ce175991b78ad532158dca Mon Sep 17 00:00:00 2001 From: James Ribe Date: Tue, 21 Jan 2025 13:49:06 -0800 Subject: [PATCH 1/3] refactor(statsd receiver): encapsulate internal stats in their own struct Summary: Later in this stack of diffs, I add a field named `MaxConnections` to `Statsd` because it controls the max number of connections for all listener types, not just TCP listeners. That creates a naming collision with the existing field named `MaxConnections`, which is an internal stat tracker. This diff eliminates that name collision by encapsulating all of `Statsd`'s internal stat trackers in a struct. Test Plan: Existing unit tests pass. --- plugins/inputs/statsd/statsd.go | 58 +++++++++++++++------------- plugins/inputs/statsd/statsd_test.go | 2 +- 2 files changed, 32 insertions(+), 28 deletions(-) diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 92be5285207aa..63b157ed0da63 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -139,6 +139,12 @@ type Statsd struct { acc telegraf.Accumulator bufPool sync.Pool // pool of byte slices to handle parsing + lastGatherTime time.Time + + Stats InternalStats +} + +type InternalStats struct { // Internal statistics counters MaxConnections selfstat.Stat CurrentConnections selfstat.Stat @@ -151,8 +157,6 @@ type Statsd struct { ParseTimeNS selfstat.Stat PendingMessages selfstat.Stat MaxPendingMessages selfstat.Stat - - lastGatherTime time.Time } // number will get parsed as an int or float depending on what is passed @@ -249,19 +253,19 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error { tags := map[string]string{ "address": s.ServiceAddress, } - s.MaxConnections = selfstat.Register("statsd", "tcp_max_connections", tags) - s.MaxConnections.Set(int64(s.MaxTCPConnections)) - s.CurrentConnections = selfstat.Register("statsd", "tcp_current_connections", tags) - s.TotalConnections = selfstat.Register("statsd", "tcp_total_connections", tags) - s.TCPPacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags) - s.TCPBytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags) - s.UDPPacketsRecv = selfstat.Register("statsd", "udp_packets_received", tags) - s.UDPPacketsDrop = selfstat.Register("statsd", "udp_packets_dropped", tags) - s.UDPBytesRecv = selfstat.Register("statsd", "udp_bytes_received", tags) - s.ParseTimeNS = selfstat.Register("statsd", "parse_time_ns", tags) - s.PendingMessages = selfstat.Register("statsd", "pending_messages", tags) - s.MaxPendingMessages = selfstat.Register("statsd", "max_pending_messages", tags) - s.MaxPendingMessages.Set(int64(s.AllowedPendingMessages)) + s.Stats.MaxConnections = selfstat.Register("statsd", "tcp_max_connections", tags) + s.Stats.MaxConnections.Set(int64(s.MaxTCPConnections)) + s.Stats.CurrentConnections = selfstat.Register("statsd", "tcp_current_connections", tags) + s.Stats.TotalConnections = selfstat.Register("statsd", "tcp_total_connections", tags) + s.Stats.TCPPacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags) + s.Stats.TCPBytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags) + s.Stats.UDPPacketsRecv = selfstat.Register("statsd", "udp_packets_received", tags) + s.Stats.UDPPacketsDrop = selfstat.Register("statsd", "udp_packets_dropped", tags) + s.Stats.UDPBytesRecv = selfstat.Register("statsd", "udp_bytes_received", tags) + s.Stats.ParseTimeNS = selfstat.Register("statsd", "parse_time_ns", tags) + s.Stats.PendingMessages = selfstat.Register("statsd", "pending_messages", tags) + s.Stats.MaxPendingMessages = selfstat.Register("statsd", "max_pending_messages", tags) + s.Stats.MaxPendingMessages.Set(int64(s.AllowedPendingMessages)) s.in = make(chan input, s.AllowedPendingMessages) s.done = make(chan struct{}) @@ -545,8 +549,8 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error { } return nil } - s.UDPPacketsRecv.Incr(1) - s.UDPBytesRecv.Incr(int64(n)) + s.Stats.UDPPacketsRecv.Incr(1) + s.Stats.UDPBytesRecv.Incr(int64(n)) b, ok := s.bufPool.Get().(*bytes.Buffer) if !ok { return errors.New("bufPool is not a bytes buffer") @@ -558,9 +562,9 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error { Buffer: b, Time: time.Now(), Addr: addr.IP.String()}: - s.PendingMessages.Set(int64(len(s.in))) + s.Stats.PendingMessages.Set(int64(len(s.in))) default: - s.UDPPacketsDrop.Incr(1) + s.Stats.UDPPacketsDrop.Incr(1) s.drops++ if s.drops == 1 || s.AllowedPendingMessages == 0 || s.drops%s.AllowedPendingMessages == 0 { s.Log.Errorf("Statsd message queue full. "+ @@ -581,7 +585,7 @@ func (s *Statsd) parser() error { case <-s.done: return nil case in := <-s.in: - s.PendingMessages.Set(int64(len(s.in))) + s.Stats.PendingMessages.Set(int64(len(s.in))) start := time.Now() lines := strings.Split(in.Buffer.String(), "\n") s.bufPool.Put(in.Buffer) @@ -608,7 +612,7 @@ func (s *Statsd) parser() error { } } elapsed := time.Since(start) - s.ParseTimeNS.Set(elapsed.Nanoseconds()) + s.Stats.ParseTimeNS.Set(elapsed.Nanoseconds()) } } } @@ -958,8 +962,8 @@ func (s *Statsd) aggregate(m metric) { // handler handles a single TCP Connection func (s *Statsd) handler(conn *net.TCPConn, id string) { - s.CurrentConnections.Incr(1) - s.TotalConnections.Incr(1) + s.Stats.CurrentConnections.Incr(1) + s.Stats.TotalConnections.Incr(1) // connection cleanup function defer func() { s.wg.Done() @@ -968,7 +972,7 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { // Add one connection potential back to channel when this one closes s.accept <- true s.forget(id) - s.CurrentConnections.Incr(-1) + s.Stats.CurrentConnections.Incr(-1) }() var remoteIP string @@ -990,8 +994,8 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { if n == 0 { continue } - s.TCPBytesRecv.Incr(int64(n)) - s.TCPPacketsRecv.Incr(1) + s.Stats.TCPBytesRecv.Incr(int64(n)) + s.Stats.TCPPacketsRecv.Incr(1) b := s.bufPool.Get().(*bytes.Buffer) b.Reset() @@ -1000,7 +1004,7 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { select { case s.in <- input{Buffer: b, Time: time.Now(), Addr: remoteIP}: - s.PendingMessages.Set(int64(len(s.in))) + s.Stats.PendingMessages.Set(int64(len(s.in))) default: s.drops++ if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 { diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index df36b1b8feead..879a0fa96d5fd 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -2177,7 +2177,7 @@ func TestUdpFillQueue(t *testing.T) { require.NoError(t, conn.Close()) require.Eventually(t, func() bool { - return plugin.UDPPacketsRecv.Get() >= int64(numberToSend) + return plugin.Stats.UDPPacketsRecv.Get() >= int64(numberToSend) }, 1*time.Second, 100*time.Millisecond) defer plugin.Stop() From e5dcec1650a939894312854123f6a8590dfc1993 Mon Sep 17 00:00:00 2001 From: James Ribe Date: Tue, 21 Jan 2025 13:52:43 -0800 Subject: [PATCH 2/3] refactor(statsd receiver): rename Statsd.UDPlistener => UDPConn Summary: Later in this stack of diffs, I rename `TCPlistener` to `Listener` because it becomes a generic listener for all networks supported by `net.Listen`. That makes `UDPlistener`'s naming stand out because: - It isn't a listener. It's a `*net.UDPConn`. - It isn't the UDP counterpart to `Listener`. This diff renames it to `UDPConn` to improve clarity. Test Plan: Unit tests pass. --- plugins/inputs/statsd/statsd.go | 10 +++++----- plugins/inputs/statsd/statsd_test.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 63b157ed0da63..e33dcf5a785f2 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -130,7 +130,7 @@ type Statsd struct { distributions []cacheddistributions // Protocol listeners - UDPlistener *net.UDPConn + UDPConn *net.UDPConn TCPlistener *net.TCPListener // track current connections so we can close them in Stop() @@ -296,7 +296,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error { } s.Log.Infof("UDP listening on %q", conn.LocalAddr().String()) - s.UDPlistener = conn + s.UDPConn = conn s.wg.Add(1) go func() { @@ -450,8 +450,8 @@ func (s *Statsd) Stop() { s.Log.Infof("Stopping the statsd service") close(s.done) if s.isUDP() { - if s.UDPlistener != nil { - s.UDPlistener.Close() + if s.UDPConn != nil { + s.UDPConn.Close() } } else { if s.TCPlistener != nil { @@ -530,7 +530,7 @@ func (s *Statsd) tcpListen(listener *net.TCPListener) error { // udpListen starts listening for UDP packets on the configured port. func (s *Statsd) udpListen(conn *net.UDPConn) error { if s.ReadBufferSize > 0 { - if err := s.UDPlistener.SetReadBuffer(s.ReadBufferSize); err != nil { + if err := s.UDPConn.SetReadBuffer(s.ReadBufferSize); err != nil { return err } } diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index 879a0fa96d5fd..1354b04eae236 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -2168,7 +2168,7 @@ func TestUdpFillQueue(t *testing.T) { var acc testutil.Accumulator require.NoError(t, plugin.Start(&acc)) - conn, err := net.Dial("udp", plugin.UDPlistener.LocalAddr().String()) + conn, err := net.Dial("udp", plugin.UDPConn.LocalAddr().String()) require.NoError(t, err) numberToSend := plugin.AllowedPendingMessages for i := 0; i < numberToSend; i++ { From ed20be1f6ae844aaae5f5f98de4680bf214fb007 Mon Sep 17 00:00:00 2001 From: James Ribe Date: Tue, 21 Jan 2025 14:01:47 -0800 Subject: [PATCH 3/3] feat(statsd receiver): enable use of unix domain sockets Summary: The statsd receiver currently only supports UDP and TCP connections, but the TCP code does very little that's actually unique to TCP. This diff adds support for unix domain sockets and other flavors of TCP (`tcp4`, `tcp6`) by refactoring the statsd receiver to use `net.Listener` instead of `*net.TCPListener`. Test Plan: Added unit test coverage for unix domain sockets and the tests pass. --- plugins/inputs/statsd/statsd.go | 104 +++++++++++++++------------ plugins/inputs/statsd/statsd_test.go | 73 ++++++++++++++++--- 2 files changed, 124 insertions(+), 53 deletions(-) diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index e33dcf5a785f2..d764c3053ca9a 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -40,7 +40,7 @@ const ( ) type Statsd struct { - // Protocol used on listener - udp or tcp + // Protocol used on listener - see net.Dial. Defaults to "tcp". Protocol string `toml:"protocol"` // Address & Port to serve from @@ -95,8 +95,9 @@ type Statsd struct { ReadBufferSize int `toml:"read_buffer_size"` SanitizeNamesMethod string `toml:"sanitize_name_method"` - Templates []string `toml:"templates"` // bucket -> influx templates - MaxTCPConnections int `toml:"max_tcp_connections"` + Templates []string `toml:"templates"` // bucket -> influx templates + MaxTCPConnections int `toml:"max_tcp_connections"` // deprecated. use MaxConnections instead. + MaxConnections int `toml:"max_connections"` TCPKeepAlive bool `toml:"tcp_keep_alive"` TCPKeepAlivePeriod *config.Duration `toml:"tcp_keep_alive_period"` @@ -130,11 +131,11 @@ type Statsd struct { distributions []cacheddistributions // Protocol listeners - UDPConn *net.UDPConn - TCPlistener *net.TCPListener + UDPConn *net.UDPConn + Listener net.Listener // track current connections so we can close them in Stop() - conns map[string]*net.TCPConn + conns map[string]net.Conn graphiteParser *graphite.Parser acc telegraf.Accumulator bufPool sync.Pool // pool of byte slices to handle parsing @@ -146,6 +147,7 @@ type Statsd struct { type InternalStats struct { // Internal statistics counters + MaxTCPConnections selfstat.Stat MaxConnections selfstat.Stat CurrentConnections selfstat.Stat TotalConnections selfstat.Stat @@ -236,6 +238,16 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error { s.DataDogExtensions = true } + if s.Protocol == "" { + s.Protocol = "tcp" + } + if s.MaxTCPConnections > 0 { + s.Log.Warn("max_tcp_connections is deprecated. Use max_connections instead.") + if s.MaxConnections == 0 { + s.MaxConnections = s.MaxTCPConnections + } + } + s.acc = ac // Make data structures @@ -253,8 +265,10 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error { tags := map[string]string{ "address": s.ServiceAddress, } - s.Stats.MaxConnections = selfstat.Register("statsd", "tcp_max_connections", tags) - s.Stats.MaxConnections.Set(int64(s.MaxTCPConnections)) + s.Stats.MaxConnections = selfstat.Register("statsd", "max_connections", tags) + s.Stats.MaxConnections.Set(int64(s.MaxConnections)) + s.Stats.MaxTCPConnections = selfstat.Register("statsd", "tcp_max_connections", tags) + s.Stats.MaxTCPConnections.Set(int64(s.MaxConnections)) s.Stats.CurrentConnections = selfstat.Register("statsd", "tcp_current_connections", tags) s.Stats.TotalConnections = selfstat.Register("statsd", "tcp_total_connections", tags) s.Stats.TCPPacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags) @@ -269,14 +283,14 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error { s.in = make(chan input, s.AllowedPendingMessages) s.done = make(chan struct{}) - s.accept = make(chan bool, s.MaxTCPConnections) - s.conns = make(map[string]*net.TCPConn) + s.accept = make(chan bool, s.MaxConnections) + s.conns = make(map[string]net.Conn) s.bufPool = sync.Pool{ New: func() interface{} { return new(bytes.Buffer) }, } - for i := 0; i < s.MaxTCPConnections; i++ { + for i := 0; i < s.MaxConnections; i++ { s.accept <- true } @@ -306,22 +320,18 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error { } }() } else { - address, err := net.ResolveTCPAddr("tcp", s.ServiceAddress) - if err != nil { - return err - } - listener, err := net.ListenTCP("tcp", address) + listener, err := net.Listen(s.Protocol, s.ServiceAddress) if err != nil { return err } - s.Log.Infof("TCP listening on %q", listener.Addr().String()) - s.TCPlistener = listener + s.Log.Infof("listening on %q", listener.Addr().String()) + s.Listener = listener s.wg.Add(1) go func() { defer s.wg.Done() - if err := s.tcpListen(listener); err != nil { + if err := s.serve(listener); err != nil { ac.AddError(err) } }() @@ -454,15 +464,15 @@ func (s *Statsd) Stop() { s.UDPConn.Close() } } else { - if s.TCPlistener != nil { - s.TCPlistener.Close() + if s.Listener != nil { + s.Listener.Close() } - // Close all open TCP connections + // Close all open connections // - get all conns from the s.conns map and put into slice // - this is so the forget() function doesnt conflict with looping // over the s.conns map - var conns []*net.TCPConn + var conns []net.Conn s.cleanup.Lock() for _, conn := range s.conns { conns = append(conns, conn) @@ -482,28 +492,31 @@ func (s *Statsd) Stop() { s.Unlock() } -// tcpListen() starts listening for TCP packets on the configured port. -func (s *Statsd) tcpListen(listener *net.TCPListener) error { +// serve() accepts connections on the listerner +func (s *Statsd) serve(listener net.Listener) error { for { select { case <-s.done: return nil default: // Accept connection: - conn, err := listener.AcceptTCP() + conn, err := listener.Accept() if err != nil { return err } - if s.TCPKeepAlive { - if err := conn.SetKeepAlive(true); err != nil { - return err - } - - if s.TCPKeepAlivePeriod != nil { - if err := conn.SetKeepAlivePeriod(time.Duration(*s.TCPKeepAlivePeriod)); err != nil { + switch conn := conn.(type) { + case *net.TCPConn: + if s.TCPKeepAlive { + if err := conn.SetKeepAlive(true); err != nil { return err } + + if s.TCPKeepAlivePeriod != nil { + if err := conn.SetKeepAlivePeriod(time.Duration(*s.TCPKeepAlivePeriod)); err != nil { + return err + } + } } } @@ -960,8 +973,8 @@ func (s *Statsd) aggregate(m metric) { } } -// handler handles a single TCP Connection -func (s *Statsd) handler(conn *net.TCPConn, id string) { +// handler handles a single Connection +func (s *Statsd) handler(conn net.Conn, id string) { s.Stats.CurrentConnections.Incr(1) s.Stats.TotalConnections.Incr(1) // connection cleanup function @@ -976,8 +989,11 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { }() var remoteIP string - if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok { + switch addr := conn.RemoteAddr().(type) { + case *net.TCPAddr: remoteIP = addr.IP.String() + default: + remoteIP = addr.String() } var n int @@ -1017,22 +1033,22 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { } } -// refuser refuses a TCP connection -func (s *Statsd) refuser(conn *net.TCPConn) { +// refuser refuses a connection +func (s *Statsd) refuser(conn net.Conn) { conn.Close() - s.Log.Infof("Refused TCP Connection from %s", conn.RemoteAddr()) - s.Log.Warn("Maximum TCP Connections reached, you may want to adjust max_tcp_connections") + s.Log.Infof("Refused Connection from %s", conn.RemoteAddr()) + s.Log.Warn("Maximum Connections reached, you may want to adjust max_tcp_connections") } -// forget a TCP connection +// forget a connection func (s *Statsd) forget(id string) { s.cleanup.Lock() defer s.cleanup.Unlock() delete(s.conns, id) } -// remember a TCP connection -func (s *Statsd) remember(id string, conn *net.TCPConn) { +// remember a connection +func (s *Statsd) remember(id string, conn net.Conn) { s.cleanup.Lock() defer s.cleanup.Unlock() s.conns[id] = conn @@ -1081,7 +1097,7 @@ func init() { return &Statsd{ Protocol: defaultProtocol, ServiceAddress: ":8125", - MaxTCPConnections: 250, + MaxConnections: 250, MetricSeparator: "_", AllowedPendingMessages: defaultAllowPendingMessage, DeleteCounters: true, diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index 1354b04eae236..fd8cdf992b349 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -3,6 +3,8 @@ package statsd import ( "fmt" "net" + "os" + "path/filepath" "sync" "testing" "time" @@ -46,7 +48,7 @@ func TestConcurrentConns(t *testing.T) { Protocol: "tcp", ServiceAddress: "localhost:8125", AllowedPendingMessages: 10000, - MaxTCPConnections: 2, + MaxConnections: 2, NumberWorkerThreads: 5, } @@ -78,7 +80,7 @@ func TestConcurrentConns1(t *testing.T) { Protocol: "tcp", ServiceAddress: "localhost:8125", AllowedPendingMessages: 10000, - MaxTCPConnections: 1, + MaxConnections: 1, NumberWorkerThreads: 5, } @@ -108,7 +110,7 @@ func TestCloseConcurrentConns(t *testing.T) { Protocol: "tcp", ServiceAddress: "localhost:8125", AllowedPendingMessages: 10000, - MaxTCPConnections: 2, + MaxConnections: 2, NumberWorkerThreads: 5, } @@ -310,7 +312,7 @@ func BenchmarkTCP(b *testing.B) { Protocol: "tcp", ServiceAddress: "localhost:8125", AllowedPendingMessages: 250000, - MaxTCPConnections: 250, + MaxConnections: 250, NumberWorkerThreads: 5, } acc := &testutil.Accumulator{Discard: true} @@ -2061,6 +2063,59 @@ func testValidateGauge( return nil } +func TestUnix(t *testing.T) { + tempDir, err := os.MkdirTemp(os.TempDir(), "") + require.NoError(t, err) + defer os.RemoveAll(tempDir) + + statsd := Statsd{ + Log: testutil.Logger{}, + Protocol: "unix", + ServiceAddress: filepath.Join(tempDir, "statsd.sock"), + AllowedPendingMessages: 10000, + MaxConnections: 2, + NumberWorkerThreads: 5, + } + var acc testutil.Accumulator + require.NoError(t, statsd.Start(&acc)) + defer statsd.Stop() + + addr := statsd.Listener.Addr().String() + + conn, err := net.Dial("unix", addr) + require.NoError(t, err) + + _, err = conn.Write([]byte("cpu.time_idle:42|c\n")) + require.NoError(t, err) + require.NoError(t, conn.Close()) + + for { + require.NoError(t, statsd.Gather(&acc)) + + if len(acc.Metrics) > 0 { + break + } + } + + testutil.RequireMetricsEqual(t, + []telegraf.Metric{ + testutil.MustMetric( + "cpu_time_idle", + map[string]string{ + "metric_type": "counter", + }, + map[string]interface{}{ + "value": 42, + }, + time.Now(), + telegraf.Counter, + ), + }, + acc.GetTelegrafMetrics(), + testutil.IgnoreTime(), + ) +} + func TestTCP(t *testing.T) { statsd := Statsd{ Log: testutil.Logger{}, @@ -2074,7 +2129,7 @@ func TestTCP(t *testing.T) { require.NoError(t, statsd.Start(&acc)) defer statsd.Stop() - addr := statsd.TCPlistener.Addr().String() + addr := statsd.Listener.Addr().String() conn, err := net.Dial("tcp", addr) require.NoError(t, err) @@ -2291,7 +2346,7 @@ func TestParse_InvalidAndRecoverIntegration(t *testing.T) { Protocol: "tcp", ServiceAddress: "localhost:8125", AllowedPendingMessages: 10000, - MaxTCPConnections: 250, + MaxConnections: 250, TCPKeepAlive: true, NumberWorkerThreads: 5, } @@ -2300,7 +2355,7 @@ func TestParse_InvalidAndRecoverIntegration(t *testing.T) { require.NoError(t, statsd.Start(acc)) defer statsd.Stop() - addr := statsd.TCPlistener.Addr().String() + addr := statsd.Listener.Addr().String() conn, err := net.Dial("tcp", addr) require.NoError(t, err) @@ -2344,7 +2399,7 @@ func TestParse_DeltaCounter(t *testing.T) { Protocol: "tcp", ServiceAddress: "localhost:8125", AllowedPendingMessages: 10000, - MaxTCPConnections: 250, + MaxConnections: 250, TCPKeepAlive: true, NumberWorkerThreads: 5, // Delete Counters causes Delta temporality to be added @@ -2357,7 +2412,7 @@ func TestParse_DeltaCounter(t *testing.T) { require.NoError(t, statsd.Start(acc)) defer statsd.Stop() - addr := statsd.TCPlistener.Addr().String() + addr := statsd.Listener.Addr().String() conn, err := net.Dial("tcp", addr) require.NoError(t, err)