Skip to content

Commit

Permalink
Allow sending upstream netflow to multiple servers (#672)
Browse files Browse the repository at this point in the history
  • Loading branch information
i3149 authored Feb 26, 2024
1 parent 1fc7356 commit a8acc55
Showing 1 changed file with 43 additions and 33 deletions.
76 changes: 43 additions & 33 deletions pkg/sinks/net/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"net"
"strings"

go_metrics "github.com/kentik/go-metrics"
"github.com/kentik/ktranslate"
Expand All @@ -19,13 +20,13 @@ var (
)

func init() {
flag.StringVar(&server, "net_server", "", "Write flows seen to this address (host and port)")
flag.StringVar(&server, "net_server", "", "Write flows seen to this address (host and port). Comma seperate to send to multiple servers.")
flag.StringVar(&protocol, "net_protocol", "udp", "Use this protocol for writing data (udp|tcp|unix)")
}

type NetSink struct {
logger.ContextL
conn net.Conn
conns []net.Conn
registry go_metrics.Registry
metrics *NetMetric
config *ktranslate.NetSinkConfig
Expand Down Expand Up @@ -53,47 +54,56 @@ func (s *NetSink) Init(ctx context.Context, format formats.Format, compression k
return fmt.Errorf("Net requires -net_server or NetSink.Endpoint to be set")
}

var serverAddr net.Addr
var err error
switch s.config.Protocol {
case "udp":
serverAddr, err = net.ResolveUDPAddr(s.config.Protocol, s.config.Endpoint)
case "tcp":
serverAddr, err = net.ResolveTCPAddr(s.config.Protocol, s.config.Endpoint)
case "unix":
serverAddr, err = net.ResolveUnixAddr(s.config.Protocol, s.config.Endpoint)
default:
err = fmt.Errorf("Invalid protocol: %s. Supported: udp|tcp|unix", s.config.Protocol)

}
if err != nil {
return err
}

conn, err := (&net.Dialer{}).DialContext(ctx, s.config.Protocol, serverAddr.String())
if err != nil {
return err
for _, endpoint := range strings.Split(s.config.Endpoint, ",") {
var serverAddr net.Addr
var err error
switch s.config.Protocol {
case "udp":
serverAddr, err = net.ResolveUDPAddr(s.config.Protocol, endpoint)
case "tcp":
serverAddr, err = net.ResolveTCPAddr(s.config.Protocol, endpoint)
case "unix":
serverAddr, err = net.ResolveUnixAddr(s.config.Protocol, endpoint)
default:
err = fmt.Errorf("Invalid protocol: %s. Supported: udp|tcp|unix", s.config.Protocol)

}
if err != nil {
return err
}

conn, err := (&net.Dialer{}).DialContext(ctx, s.config.Protocol, serverAddr.String())
if err != nil {
return err
}

s.conns = append(s.conns, conn)
s.Infof("Network: sending to %s:%s", s.config.Protocol, endpoint)
}

s.conn = conn
s.Infof("Network: sending to %s:%s", s.config.Protocol, s.config.Endpoint)

return nil
}

func (s *NetSink) Send(ctx context.Context, payload *kt.Output) {
_, err := s.conn.Write(payload.Body)
if err != nil {
s.Errorf("There was an error when writing: %v.", err)
s.metrics.DeliveryErr.Mark(1)
} else {
s.metrics.DeliveryWin.Mark(1)
// Don't block on any long sends here.
go s.sendNet(ctx, payload)
}

func (s *NetSink) sendNet(ctx context.Context, payload *kt.Output) {
for _, conn := range s.conns {
_, err := conn.Write(payload.Body)
if err != nil {
s.Errorf("There was an error when writing: %v.", err)
s.metrics.DeliveryErr.Mark(1)
} else {
s.metrics.DeliveryWin.Mark(1)
}
}
}

func (s *NetSink) Close() {
if s.conn != nil {
s.conn.Close()
for _, conn := range s.conns {
conn.Close()
}
}

Expand Down

0 comments on commit a8acc55

Please sign in to comment.