diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 92be5285207aa..d764c3053ca9a 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -40,7 +40,7 @@ const ( ) type Statsd struct { - // Protocol used on listener - udp or tcp + // Protocol used on listener - see net.Dial. Defaults to "tcp". Protocol string `toml:"protocol"` // Address & Port to serve from @@ -95,8 +95,9 @@ type Statsd struct { ReadBufferSize int `toml:"read_buffer_size"` SanitizeNamesMethod string `toml:"sanitize_name_method"` - Templates []string `toml:"templates"` // bucket -> influx templates - MaxTCPConnections int `toml:"max_tcp_connections"` + Templates []string `toml:"templates"` // bucket -> influx templates + MaxTCPConnections int `toml:"max_tcp_connections"` // deprecated. use MaxConnections instead. + MaxConnections int `toml:"max_connections"` TCPKeepAlive bool `toml:"tcp_keep_alive"` TCPKeepAlivePeriod *config.Duration `toml:"tcp_keep_alive_period"` @@ -130,16 +131,23 @@ type Statsd struct { distributions []cacheddistributions // Protocol listeners - UDPlistener *net.UDPConn - TCPlistener *net.TCPListener + UDPConn *net.UDPConn + Listener net.Listener // track current connections so we can close them in Stop() - conns map[string]*net.TCPConn + conns map[string]net.Conn graphiteParser *graphite.Parser acc telegraf.Accumulator bufPool sync.Pool // pool of byte slices to handle parsing + lastGatherTime time.Time + + Stats InternalStats +} + +type InternalStats struct { // Internal statistics counters + MaxTCPConnections selfstat.Stat MaxConnections selfstat.Stat CurrentConnections selfstat.Stat TotalConnections selfstat.Stat @@ -151,8 +159,6 @@ type Statsd struct { ParseTimeNS selfstat.Stat PendingMessages selfstat.Stat MaxPendingMessages selfstat.Stat - - lastGatherTime time.Time } // number will get parsed as an int or float depending on what is passed @@ -232,6 +238,16 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error { s.DataDogExtensions = true } + if s.Protocol == "" { + s.Protocol = "tcp" + } + if s.MaxTCPConnections > 0 { + s.Log.Warn("max_tcp_connections is deprecated. Use max_connections instead.") + if s.MaxConnections == 0 { + s.MaxConnections = s.MaxTCPConnections + } + } + s.acc = ac // Make data structures @@ -249,30 +265,32 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error { tags := map[string]string{ "address": s.ServiceAddress, } - s.MaxConnections = selfstat.Register("statsd", "tcp_max_connections", tags) - s.MaxConnections.Set(int64(s.MaxTCPConnections)) - s.CurrentConnections = selfstat.Register("statsd", "tcp_current_connections", tags) - s.TotalConnections = selfstat.Register("statsd", "tcp_total_connections", tags) - s.TCPPacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags) - s.TCPBytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags) - s.UDPPacketsRecv = selfstat.Register("statsd", "udp_packets_received", tags) - s.UDPPacketsDrop = selfstat.Register("statsd", "udp_packets_dropped", tags) - s.UDPBytesRecv = selfstat.Register("statsd", "udp_bytes_received", tags) - s.ParseTimeNS = selfstat.Register("statsd", "parse_time_ns", tags) - s.PendingMessages = selfstat.Register("statsd", "pending_messages", tags) - s.MaxPendingMessages = selfstat.Register("statsd", "max_pending_messages", tags) - s.MaxPendingMessages.Set(int64(s.AllowedPendingMessages)) + s.Stats.MaxConnections = selfstat.Register("statsd", "max_connections", tags) + s.Stats.MaxConnections.Set(int64(s.MaxConnections)) + s.Stats.MaxTCPConnections = selfstat.Register("statsd", "tcp_max_connections", tags) + s.Stats.MaxTCPConnections.Set(int64(s.MaxConnections)) + s.Stats.CurrentConnections = selfstat.Register("statsd", "tcp_current_connections", tags) + s.Stats.TotalConnections = selfstat.Register("statsd", "tcp_total_connections", tags) + s.Stats.TCPPacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags) + s.Stats.TCPBytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags) + s.Stats.UDPPacketsRecv = selfstat.Register("statsd", "udp_packets_received", tags) + s.Stats.UDPPacketsDrop = selfstat.Register("statsd", "udp_packets_dropped", tags) + s.Stats.UDPBytesRecv = selfstat.Register("statsd", "udp_bytes_received", tags) + s.Stats.ParseTimeNS = selfstat.Register("statsd", "parse_time_ns", tags) + s.Stats.PendingMessages = selfstat.Register("statsd", "pending_messages", tags) + s.Stats.MaxPendingMessages = selfstat.Register("statsd", "max_pending_messages", tags) + s.Stats.MaxPendingMessages.Set(int64(s.AllowedPendingMessages)) s.in = make(chan input, s.AllowedPendingMessages) s.done = make(chan struct{}) - s.accept = make(chan bool, s.MaxTCPConnections) - s.conns = make(map[string]*net.TCPConn) + s.accept = make(chan bool, s.MaxConnections) + s.conns = make(map[string]net.Conn) s.bufPool = sync.Pool{ New: func() interface{} { return new(bytes.Buffer) }, } - for i := 0; i < s.MaxTCPConnections; i++ { + for i := 0; i < s.MaxConnections; i++ { s.accept <- true } @@ -292,7 +310,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error { } s.Log.Infof("UDP listening on %q", conn.LocalAddr().String()) - s.UDPlistener = conn + s.UDPConn = conn s.wg.Add(1) go func() { @@ -302,22 +320,18 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error { } }() } else { - address, err := net.ResolveTCPAddr("tcp", s.ServiceAddress) - if err != nil { - return err - } - listener, err := net.ListenTCP("tcp", address) + listener, err := net.Listen(s.Protocol, s.ServiceAddress) if err != nil { return err } - s.Log.Infof("TCP listening on %q", listener.Addr().String()) - s.TCPlistener = listener + s.Log.Infof("listening on %q", listener.Addr().String()) + s.Listener = listener s.wg.Add(1) go func() { defer s.wg.Done() - if err := s.tcpListen(listener); err != nil { + if err := s.serve(listener); err != nil { ac.AddError(err) } }() @@ -446,19 +460,19 @@ func (s *Statsd) Stop() { s.Log.Infof("Stopping the statsd service") close(s.done) if s.isUDP() { - if s.UDPlistener != nil { - s.UDPlistener.Close() + if s.UDPConn != nil { + s.UDPConn.Close() } } else { - if s.TCPlistener != nil { - s.TCPlistener.Close() + if s.Listener != nil { + s.Listener.Close() } - // Close all open TCP connections + // Close all open connections // - get all conns from the s.conns map and put into slice // - this is so the forget() function doesnt conflict with looping // over the s.conns map - var conns []*net.TCPConn + var conns []net.Conn s.cleanup.Lock() for _, conn := range s.conns { conns = append(conns, conn) @@ -478,28 +492,31 @@ func (s *Statsd) Stop() { s.Unlock() } -// tcpListen() starts listening for TCP packets on the configured port. -func (s *Statsd) tcpListen(listener *net.TCPListener) error { +// serve() accepts connections on the listerner +func (s *Statsd) serve(listener net.Listener) error { for { select { case <-s.done: return nil default: // Accept connection: - conn, err := listener.AcceptTCP() + conn, err := listener.Accept() if err != nil { return err } - if s.TCPKeepAlive { - if err := conn.SetKeepAlive(true); err != nil { - return err - } - - if s.TCPKeepAlivePeriod != nil { - if err := conn.SetKeepAlivePeriod(time.Duration(*s.TCPKeepAlivePeriod)); err != nil { + switch conn := conn.(type) { + case *net.TCPConn: + if s.TCPKeepAlive { + if err := conn.SetKeepAlive(true); err != nil { return err } + + if s.TCPKeepAlivePeriod != nil { + if err := conn.SetKeepAlivePeriod(time.Duration(*s.TCPKeepAlivePeriod)); err != nil { + return err + } + } } } @@ -526,7 +543,7 @@ func (s *Statsd) tcpListen(listener *net.TCPListener) error { // udpListen starts listening for UDP packets on the configured port. func (s *Statsd) udpListen(conn *net.UDPConn) error { if s.ReadBufferSize > 0 { - if err := s.UDPlistener.SetReadBuffer(s.ReadBufferSize); err != nil { + if err := s.UDPConn.SetReadBuffer(s.ReadBufferSize); err != nil { return err } } @@ -545,8 +562,8 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error { } return nil } - s.UDPPacketsRecv.Incr(1) - s.UDPBytesRecv.Incr(int64(n)) + s.Stats.UDPPacketsRecv.Incr(1) + s.Stats.UDPBytesRecv.Incr(int64(n)) b, ok := s.bufPool.Get().(*bytes.Buffer) if !ok { return errors.New("bufPool is not a bytes buffer") @@ -558,9 +575,9 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error { Buffer: b, Time: time.Now(), Addr: addr.IP.String()}: - s.PendingMessages.Set(int64(len(s.in))) + s.Stats.PendingMessages.Set(int64(len(s.in))) default: - s.UDPPacketsDrop.Incr(1) + s.Stats.UDPPacketsDrop.Incr(1) s.drops++ if s.drops == 1 || s.AllowedPendingMessages == 0 || s.drops%s.AllowedPendingMessages == 0 { s.Log.Errorf("Statsd message queue full. "+ @@ -581,7 +598,7 @@ func (s *Statsd) parser() error { case <-s.done: return nil case in := <-s.in: - s.PendingMessages.Set(int64(len(s.in))) + s.Stats.PendingMessages.Set(int64(len(s.in))) start := time.Now() lines := strings.Split(in.Buffer.String(), "\n") s.bufPool.Put(in.Buffer) @@ -608,7 +625,7 @@ func (s *Statsd) parser() error { } } elapsed := time.Since(start) - s.ParseTimeNS.Set(elapsed.Nanoseconds()) + s.Stats.ParseTimeNS.Set(elapsed.Nanoseconds()) } } } @@ -956,10 +973,10 @@ func (s *Statsd) aggregate(m metric) { } } -// handler handles a single TCP Connection -func (s *Statsd) handler(conn *net.TCPConn, id string) { - s.CurrentConnections.Incr(1) - s.TotalConnections.Incr(1) +// handler handles a single Connection +func (s *Statsd) handler(conn net.Conn, id string) { + s.Stats.CurrentConnections.Incr(1) + s.Stats.TotalConnections.Incr(1) // connection cleanup function defer func() { s.wg.Done() @@ -968,12 +985,15 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { // Add one connection potential back to channel when this one closes s.accept <- true s.forget(id) - s.CurrentConnections.Incr(-1) + s.Stats.CurrentConnections.Incr(-1) }() var remoteIP string - if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok { + switch addr := conn.RemoteAddr().(type) { + case *net.TCPAddr: remoteIP = addr.IP.String() + default: + remoteIP = addr.String() } var n int @@ -990,8 +1010,8 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { if n == 0 { continue } - s.TCPBytesRecv.Incr(int64(n)) - s.TCPPacketsRecv.Incr(1) + s.Stats.TCPBytesRecv.Incr(int64(n)) + s.Stats.TCPPacketsRecv.Incr(1) b := s.bufPool.Get().(*bytes.Buffer) b.Reset() @@ -1000,7 +1020,7 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { select { case s.in <- input{Buffer: b, Time: time.Now(), Addr: remoteIP}: - s.PendingMessages.Set(int64(len(s.in))) + s.Stats.PendingMessages.Set(int64(len(s.in))) default: s.drops++ if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 { @@ -1013,22 +1033,22 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { } } -// refuser refuses a TCP connection -func (s *Statsd) refuser(conn *net.TCPConn) { +// refuser refuses a connection +func (s *Statsd) refuser(conn net.Conn) { conn.Close() - s.Log.Infof("Refused TCP Connection from %s", conn.RemoteAddr()) - s.Log.Warn("Maximum TCP Connections reached, you may want to adjust max_tcp_connections") + s.Log.Infof("Refused Connection from %s", conn.RemoteAddr()) + s.Log.Warn("Maximum Connections reached, you may want to adjust max_tcp_connections") } -// forget a TCP connection +// forget a connection func (s *Statsd) forget(id string) { s.cleanup.Lock() defer s.cleanup.Unlock() delete(s.conns, id) } -// remember a TCP connection -func (s *Statsd) remember(id string, conn *net.TCPConn) { +// remember a connection +func (s *Statsd) remember(id string, conn net.Conn) { s.cleanup.Lock() defer s.cleanup.Unlock() s.conns[id] = conn @@ -1077,7 +1097,7 @@ func init() { return &Statsd{ Protocol: defaultProtocol, ServiceAddress: ":8125", - MaxTCPConnections: 250, + MaxConnections: 250, MetricSeparator: "_", AllowedPendingMessages: defaultAllowPendingMessage, DeleteCounters: true, diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index df36b1b8feead..fd8cdf992b349 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -3,6 +3,8 @@ package statsd import ( "fmt" "net" + "os" + "path/filepath" "sync" "testing" "time" @@ -46,7 +48,7 @@ func TestConcurrentConns(t *testing.T) { Protocol: "tcp", ServiceAddress: "localhost:8125", AllowedPendingMessages: 10000, - MaxTCPConnections: 2, + MaxConnections: 2, NumberWorkerThreads: 5, } @@ -78,7 +80,7 @@ func TestConcurrentConns1(t *testing.T) { Protocol: "tcp", ServiceAddress: "localhost:8125", AllowedPendingMessages: 10000, - MaxTCPConnections: 1, + MaxConnections: 1, NumberWorkerThreads: 5, } @@ -108,7 +110,7 @@ func TestCloseConcurrentConns(t *testing.T) { Protocol: "tcp", ServiceAddress: "localhost:8125", AllowedPendingMessages: 10000, - MaxTCPConnections: 2, + MaxConnections: 2, NumberWorkerThreads: 5, } @@ -310,7 +312,7 @@ func BenchmarkTCP(b *testing.B) { Protocol: "tcp", ServiceAddress: "localhost:8125", AllowedPendingMessages: 250000, - MaxTCPConnections: 250, + MaxConnections: 250, NumberWorkerThreads: 5, } acc := &testutil.Accumulator{Discard: true} @@ -2061,6 +2063,59 @@ func testValidateGauge( return nil } +func TestUnix(t *testing.T) { + tempDir, err := os.MkdirTemp(os.TempDir(), "") + require.NoError(t, err) + defer os.RemoveAll(tempDir) + + statsd := Statsd{ + Log: testutil.Logger{}, + Protocol: "unix", + ServiceAddress: filepath.Join(tempDir, "statsd.sock"), + AllowedPendingMessages: 10000, + MaxConnections: 2, + NumberWorkerThreads: 5, + } + var acc testutil.Accumulator + require.NoError(t, statsd.Start(&acc)) + defer statsd.Stop() + + addr := statsd.Listener.Addr().String() + + conn, err := net.Dial("unix", addr) + require.NoError(t, err) + + _, err = conn.Write([]byte("cpu.time_idle:42|c\n")) + require.NoError(t, err) + require.NoError(t, conn.Close()) + + for { + require.NoError(t, statsd.Gather(&acc)) + + if len(acc.Metrics) > 0 { + break + } + } + + testutil.RequireMetricsEqual(t, + []telegraf.Metric{ + testutil.MustMetric( + "cpu_time_idle", + map[string]string{ + "metric_type": "counter", + }, + map[string]interface{}{ + "value": 42, + }, + time.Now(), + telegraf.Counter, + ), + }, + acc.GetTelegrafMetrics(), + testutil.IgnoreTime(), + ) +} + func TestTCP(t *testing.T) { statsd := Statsd{ Log: testutil.Logger{}, @@ -2074,7 +2129,7 @@ func TestTCP(t *testing.T) { require.NoError(t, statsd.Start(&acc)) defer statsd.Stop() - addr := statsd.TCPlistener.Addr().String() + addr := statsd.Listener.Addr().String() conn, err := net.Dial("tcp", addr) require.NoError(t, err) @@ -2168,7 +2223,7 @@ func TestUdpFillQueue(t *testing.T) { var acc testutil.Accumulator require.NoError(t, plugin.Start(&acc)) - conn, err := net.Dial("udp", plugin.UDPlistener.LocalAddr().String()) + conn, err := net.Dial("udp", plugin.UDPConn.LocalAddr().String()) require.NoError(t, err) numberToSend := plugin.AllowedPendingMessages for i := 0; i < numberToSend; i++ { @@ -2177,7 +2232,7 @@ func TestUdpFillQueue(t *testing.T) { require.NoError(t, conn.Close()) require.Eventually(t, func() bool { - return plugin.UDPPacketsRecv.Get() >= int64(numberToSend) + return plugin.Stats.UDPPacketsRecv.Get() >= int64(numberToSend) }, 1*time.Second, 100*time.Millisecond) defer plugin.Stop() @@ -2291,7 +2346,7 @@ func TestParse_InvalidAndRecoverIntegration(t *testing.T) { Protocol: "tcp", ServiceAddress: "localhost:8125", AllowedPendingMessages: 10000, - MaxTCPConnections: 250, + MaxConnections: 250, TCPKeepAlive: true, NumberWorkerThreads: 5, } @@ -2300,7 +2355,7 @@ func TestParse_InvalidAndRecoverIntegration(t *testing.T) { require.NoError(t, statsd.Start(acc)) defer statsd.Stop() - addr := statsd.TCPlistener.Addr().String() + addr := statsd.Listener.Addr().String() conn, err := net.Dial("tcp", addr) require.NoError(t, err) @@ -2344,7 +2399,7 @@ func TestParse_DeltaCounter(t *testing.T) { Protocol: "tcp", ServiceAddress: "localhost:8125", AllowedPendingMessages: 10000, - MaxTCPConnections: 250, + MaxConnections: 250, TCPKeepAlive: true, NumberWorkerThreads: 5, // Delete Counters causes Delta temporality to be added @@ -2357,7 +2412,7 @@ func TestParse_DeltaCounter(t *testing.T) { require.NoError(t, statsd.Start(acc)) defer statsd.Stop() - addr := statsd.TCPlistener.Addr().String() + addr := statsd.Listener.Addr().String() conn, err := net.Dial("tcp", addr) require.NoError(t, err)