Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(statsd receiver): enable use of unix domain sockets #16421

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 93 additions & 73 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
)

type Statsd struct {
// Protocol used on listener - udp or tcp
// Protocol used on listener - see net.Dial. Defaults to "tcp".
Protocol string `toml:"protocol"`

// Address & Port to serve from
Expand Down Expand Up @@ -95,8 +95,9 @@ type Statsd struct {

ReadBufferSize int `toml:"read_buffer_size"`
SanitizeNamesMethod string `toml:"sanitize_name_method"`
Templates []string `toml:"templates"` // bucket -> influx templates
MaxTCPConnections int `toml:"max_tcp_connections"`
Templates []string `toml:"templates"` // bucket -> influx templates
MaxTCPConnections int `toml:"max_tcp_connections"` // deprecated. use MaxConnections instead.
MaxConnections int `toml:"max_connections"`
TCPKeepAlive bool `toml:"tcp_keep_alive"`
TCPKeepAlivePeriod *config.Duration `toml:"tcp_keep_alive_period"`

Expand Down Expand Up @@ -130,16 +131,23 @@ type Statsd struct {
distributions []cacheddistributions

// Protocol listeners
UDPlistener *net.UDPConn
TCPlistener *net.TCPListener
UDPConn *net.UDPConn
Listener net.Listener

// track current connections so we can close them in Stop()
conns map[string]*net.TCPConn
conns map[string]net.Conn
graphiteParser *graphite.Parser
acc telegraf.Accumulator
bufPool sync.Pool // pool of byte slices to handle parsing

lastGatherTime time.Time

Stats InternalStats
}

type InternalStats struct {
// Internal statistics counters
MaxTCPConnections selfstat.Stat
MaxConnections selfstat.Stat
CurrentConnections selfstat.Stat
TotalConnections selfstat.Stat
Expand All @@ -151,8 +159,6 @@ type Statsd struct {
ParseTimeNS selfstat.Stat
PendingMessages selfstat.Stat
MaxPendingMessages selfstat.Stat

lastGatherTime time.Time
}

// number will get parsed as an int or float depending on what is passed
Expand Down Expand Up @@ -232,6 +238,16 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {
s.DataDogExtensions = true
}

if s.Protocol == "" {
s.Protocol = "tcp"
}
if s.MaxTCPConnections > 0 {
s.Log.Warn("max_tcp_connections is deprecated. Use max_connections instead.")
if s.MaxConnections == 0 {
s.MaxConnections = s.MaxTCPConnections
}
}

s.acc = ac

// Make data structures
Expand All @@ -249,30 +265,32 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {
tags := map[string]string{
"address": s.ServiceAddress,
}
s.MaxConnections = selfstat.Register("statsd", "tcp_max_connections", tags)
s.MaxConnections.Set(int64(s.MaxTCPConnections))
s.CurrentConnections = selfstat.Register("statsd", "tcp_current_connections", tags)
s.TotalConnections = selfstat.Register("statsd", "tcp_total_connections", tags)
s.TCPPacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags)
s.TCPBytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags)
s.UDPPacketsRecv = selfstat.Register("statsd", "udp_packets_received", tags)
s.UDPPacketsDrop = selfstat.Register("statsd", "udp_packets_dropped", tags)
s.UDPBytesRecv = selfstat.Register("statsd", "udp_bytes_received", tags)
s.ParseTimeNS = selfstat.Register("statsd", "parse_time_ns", tags)
s.PendingMessages = selfstat.Register("statsd", "pending_messages", tags)
s.MaxPendingMessages = selfstat.Register("statsd", "max_pending_messages", tags)
s.MaxPendingMessages.Set(int64(s.AllowedPendingMessages))
s.Stats.MaxConnections = selfstat.Register("statsd", "max_connections", tags)
s.Stats.MaxConnections.Set(int64(s.MaxConnections))
s.Stats.MaxTCPConnections = selfstat.Register("statsd", "tcp_max_connections", tags)
s.Stats.MaxTCPConnections.Set(int64(s.MaxConnections))
s.Stats.CurrentConnections = selfstat.Register("statsd", "tcp_current_connections", tags)
s.Stats.TotalConnections = selfstat.Register("statsd", "tcp_total_connections", tags)
s.Stats.TCPPacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags)
s.Stats.TCPBytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags)
s.Stats.UDPPacketsRecv = selfstat.Register("statsd", "udp_packets_received", tags)
s.Stats.UDPPacketsDrop = selfstat.Register("statsd", "udp_packets_dropped", tags)
s.Stats.UDPBytesRecv = selfstat.Register("statsd", "udp_bytes_received", tags)
s.Stats.ParseTimeNS = selfstat.Register("statsd", "parse_time_ns", tags)
s.Stats.PendingMessages = selfstat.Register("statsd", "pending_messages", tags)
s.Stats.MaxPendingMessages = selfstat.Register("statsd", "max_pending_messages", tags)
s.Stats.MaxPendingMessages.Set(int64(s.AllowedPendingMessages))

s.in = make(chan input, s.AllowedPendingMessages)
s.done = make(chan struct{})
s.accept = make(chan bool, s.MaxTCPConnections)
s.conns = make(map[string]*net.TCPConn)
s.accept = make(chan bool, s.MaxConnections)
s.conns = make(map[string]net.Conn)
s.bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
for i := 0; i < s.MaxTCPConnections; i++ {
for i := 0; i < s.MaxConnections; i++ {
s.accept <- true
}

Expand All @@ -292,7 +310,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {
}

s.Log.Infof("UDP listening on %q", conn.LocalAddr().String())
s.UDPlistener = conn
s.UDPConn = conn

s.wg.Add(1)
go func() {
Expand All @@ -302,22 +320,18 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {
}
}()
} else {
address, err := net.ResolveTCPAddr("tcp", s.ServiceAddress)
if err != nil {
return err
}
listener, err := net.ListenTCP("tcp", address)
listener, err := net.Listen(s.Protocol, s.ServiceAddress)
if err != nil {
return err
}

s.Log.Infof("TCP listening on %q", listener.Addr().String())
s.TCPlistener = listener
s.Log.Infof("listening on %q", listener.Addr().String())
s.Listener = listener

s.wg.Add(1)
go func() {
defer s.wg.Done()
if err := s.tcpListen(listener); err != nil {
if err := s.serve(listener); err != nil {
ac.AddError(err)
}
}()
Expand Down Expand Up @@ -446,19 +460,19 @@ func (s *Statsd) Stop() {
s.Log.Infof("Stopping the statsd service")
close(s.done)
if s.isUDP() {
if s.UDPlistener != nil {
s.UDPlistener.Close()
if s.UDPConn != nil {
s.UDPConn.Close()
}
} else {
if s.TCPlistener != nil {
s.TCPlistener.Close()
if s.Listener != nil {
s.Listener.Close()
}

// Close all open TCP connections
// Close all open connections
// - get all conns from the s.conns map and put into slice
// - this is so the forget() function doesnt conflict with looping
// over the s.conns map
var conns []*net.TCPConn
var conns []net.Conn
s.cleanup.Lock()
for _, conn := range s.conns {
conns = append(conns, conn)
Expand All @@ -478,28 +492,31 @@ func (s *Statsd) Stop() {
s.Unlock()
}

// tcpListen() starts listening for TCP packets on the configured port.
func (s *Statsd) tcpListen(listener *net.TCPListener) error {
// serve() accepts connections on the listerner
func (s *Statsd) serve(listener net.Listener) error {
for {
select {
case <-s.done:
return nil
default:
// Accept connection:
conn, err := listener.AcceptTCP()
conn, err := listener.Accept()
if err != nil {
return err
}

if s.TCPKeepAlive {
if err := conn.SetKeepAlive(true); err != nil {
return err
}

if s.TCPKeepAlivePeriod != nil {
if err := conn.SetKeepAlivePeriod(time.Duration(*s.TCPKeepAlivePeriod)); err != nil {
switch conn := conn.(type) {
case *net.TCPConn:
if s.TCPKeepAlive {
if err := conn.SetKeepAlive(true); err != nil {
return err
}

if s.TCPKeepAlivePeriod != nil {
if err := conn.SetKeepAlivePeriod(time.Duration(*s.TCPKeepAlivePeriod)); err != nil {
return err
}
}
}
}

Expand All @@ -526,7 +543,7 @@ func (s *Statsd) tcpListen(listener *net.TCPListener) error {
// udpListen starts listening for UDP packets on the configured port.
func (s *Statsd) udpListen(conn *net.UDPConn) error {
if s.ReadBufferSize > 0 {
if err := s.UDPlistener.SetReadBuffer(s.ReadBufferSize); err != nil {
if err := s.UDPConn.SetReadBuffer(s.ReadBufferSize); err != nil {
return err
}
}
Expand All @@ -545,8 +562,8 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error {
}
return nil
}
s.UDPPacketsRecv.Incr(1)
s.UDPBytesRecv.Incr(int64(n))
s.Stats.UDPPacketsRecv.Incr(1)
s.Stats.UDPBytesRecv.Incr(int64(n))
b, ok := s.bufPool.Get().(*bytes.Buffer)
if !ok {
return errors.New("bufPool is not a bytes buffer")
Expand All @@ -558,9 +575,9 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error {
Buffer: b,
Time: time.Now(),
Addr: addr.IP.String()}:
s.PendingMessages.Set(int64(len(s.in)))
s.Stats.PendingMessages.Set(int64(len(s.in)))
default:
s.UDPPacketsDrop.Incr(1)
s.Stats.UDPPacketsDrop.Incr(1)
s.drops++
if s.drops == 1 || s.AllowedPendingMessages == 0 || s.drops%s.AllowedPendingMessages == 0 {
s.Log.Errorf("Statsd message queue full. "+
Expand All @@ -581,7 +598,7 @@ func (s *Statsd) parser() error {
case <-s.done:
return nil
case in := <-s.in:
s.PendingMessages.Set(int64(len(s.in)))
s.Stats.PendingMessages.Set(int64(len(s.in)))
start := time.Now()
lines := strings.Split(in.Buffer.String(), "\n")
s.bufPool.Put(in.Buffer)
Expand All @@ -608,7 +625,7 @@ func (s *Statsd) parser() error {
}
}
elapsed := time.Since(start)
s.ParseTimeNS.Set(elapsed.Nanoseconds())
s.Stats.ParseTimeNS.Set(elapsed.Nanoseconds())
}
}
}
Expand Down Expand Up @@ -956,10 +973,10 @@ func (s *Statsd) aggregate(m metric) {
}
}

// handler handles a single TCP Connection
func (s *Statsd) handler(conn *net.TCPConn, id string) {
s.CurrentConnections.Incr(1)
s.TotalConnections.Incr(1)
// handler handles a single Connection
func (s *Statsd) handler(conn net.Conn, id string) {
s.Stats.CurrentConnections.Incr(1)
s.Stats.TotalConnections.Incr(1)
// connection cleanup function
defer func() {
s.wg.Done()
Expand All @@ -968,12 +985,15 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {
// Add one connection potential back to channel when this one closes
s.accept <- true
s.forget(id)
s.CurrentConnections.Incr(-1)
s.Stats.CurrentConnections.Incr(-1)
}()

var remoteIP string
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
switch addr := conn.RemoteAddr().(type) {
case *net.TCPAddr:
remoteIP = addr.IP.String()
default:
remoteIP = addr.String()
}

var n int
Expand All @@ -990,8 +1010,8 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {
if n == 0 {
continue
}
s.TCPBytesRecv.Incr(int64(n))
s.TCPPacketsRecv.Incr(1)
s.Stats.TCPBytesRecv.Incr(int64(n))
s.Stats.TCPPacketsRecv.Incr(1)

b := s.bufPool.Get().(*bytes.Buffer)
b.Reset()
Expand All @@ -1000,7 +1020,7 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {

select {
case s.in <- input{Buffer: b, Time: time.Now(), Addr: remoteIP}:
s.PendingMessages.Set(int64(len(s.in)))
s.Stats.PendingMessages.Set(int64(len(s.in)))
default:
s.drops++
if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 {
Expand All @@ -1013,22 +1033,22 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {
}
}

// refuser refuses a TCP connection
func (s *Statsd) refuser(conn *net.TCPConn) {
// refuser refuses a connection
func (s *Statsd) refuser(conn net.Conn) {
conn.Close()
s.Log.Infof("Refused TCP Connection from %s", conn.RemoteAddr())
s.Log.Warn("Maximum TCP Connections reached, you may want to adjust max_tcp_connections")
s.Log.Infof("Refused Connection from %s", conn.RemoteAddr())
s.Log.Warn("Maximum Connections reached, you may want to adjust max_tcp_connections")
}

// forget a TCP connection
// forget a connection
func (s *Statsd) forget(id string) {
s.cleanup.Lock()
defer s.cleanup.Unlock()
delete(s.conns, id)
}

// remember a TCP connection
func (s *Statsd) remember(id string, conn *net.TCPConn) {
// remember a connection
func (s *Statsd) remember(id string, conn net.Conn) {
s.cleanup.Lock()
defer s.cleanup.Unlock()
s.conns[id] = conn
Expand Down Expand Up @@ -1077,7 +1097,7 @@ func init() {
return &Statsd{
Protocol: defaultProtocol,
ServiceAddress: ":8125",
MaxTCPConnections: 250,
MaxConnections: 250,
MetricSeparator: "_",
AllowedPendingMessages: defaultAllowPendingMessage,
DeleteCounters: true,
Expand Down
Loading
Loading