From f7ff90bddb7e1085ec7d80ee2a306547d6ee3fa2 Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Sun, 15 Dec 2024 13:40:23 -0600 Subject: [PATCH 01/30] #32732 - Initial implementation This adds the implementation for netflow receiver Introduces: * The OtelLogsProducerWrapper * A parser function to convert proto to otel semantics * The listener implementation as well as the error handling function --- receiver/netflowreceiver/factory.go | 2 +- receiver/netflowreceiver/go.mod | 4 +- receiver/netflowreceiver/go.sum | 4 + receiver/netflowreceiver/listener.go | 139 +++++++++++++++++++++- receiver/netflowreceiver/listener_test.go | 2 +- receiver/netflowreceiver/parser.go | 138 +++++++++++++++++++++ receiver/netflowreceiver/producer.go | 92 ++++++++++++++ receiver/netflowreceiver/receiver.go | 18 ++- 8 files changed, 388 insertions(+), 11 deletions(-) create mode 100644 receiver/netflowreceiver/parser.go create mode 100644 receiver/netflowreceiver/producer.go diff --git a/receiver/netflowreceiver/factory.go b/receiver/netflowreceiver/factory.go index 87c27e9b76ed..f5da854f02e3 100644 --- a/receiver/netflowreceiver/factory.go +++ b/receiver/netflowreceiver/factory.go @@ -16,7 +16,7 @@ import ( const ( defaultSockets = 1 defaultWorkers = 2 - defaultQueueSize = 1_000_000 + defaultQueueSize = 1_000 ) // NewFactory creates a factory for netflow receiver. diff --git a/receiver/netflowreceiver/go.mod b/receiver/netflowreceiver/go.mod index fab4afb00086..3481e6a91c5a 100644 --- a/receiver/netflowreceiver/go.mod +++ b/receiver/netflowreceiver/go.mod @@ -3,12 +3,14 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflo go 1.22.0 require ( + github.com/netsampler/goflow2/v2 v2.2.1 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.115.0 go.opentelemetry.io/collector/component/componenttest v0.115.0 go.opentelemetry.io/collector/confmap v1.21.0 go.opentelemetry.io/collector/consumer v1.21.0 go.opentelemetry.io/collector/consumer/consumertest v0.115.0 + go.opentelemetry.io/collector/pdata v1.21.0 go.opentelemetry.io/collector/receiver v0.115.0 go.opentelemetry.io/collector/receiver/receivertest v0.115.0 go.uber.org/goleak v1.3.0 @@ -26,6 +28,7 @@ require ( github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.2 // indirect + github.com/libp2p/go-reuseport v0.4.0 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -34,7 +37,6 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.115.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.115.0 // indirect go.opentelemetry.io/collector/consumer/consumerprofiles v0.115.0 // indirect - go.opentelemetry.io/collector/pdata v1.21.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.115.0 // indirect go.opentelemetry.io/collector/pipeline v0.115.0 // indirect go.opentelemetry.io/collector/receiver/receiverprofiles v0.115.0 // indirect diff --git a/receiver/netflowreceiver/go.sum b/receiver/netflowreceiver/go.sum index ebf24515df5a..dd1aa3730549 100644 --- a/receiver/netflowreceiver/go.sum +++ b/receiver/netflowreceiver/go.sum @@ -29,6 +29,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/libp2p/go-reuseport v0.4.0 h1:nR5KU7hD0WxXCJbmw7r2rhRYruNRl2koHw8fQscQm2s= +github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8Se6DrI2E1cLU= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= @@ -38,6 +40,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/netsampler/goflow2/v2 v2.2.1 h1:QzrtWS/meXsqCLv68hdouL+09NfuLKrCoVDJ1xfmuoE= +github.com/netsampler/goflow2/v2 v2.2.1/go.mod h1:057wOc/Xp7c+hUwRDB7wRqrx55m0r3vc7J0k4NrlFbM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= diff --git a/receiver/netflowreceiver/listener.go b/receiver/netflowreceiver/listener.go index d4507cadec8f..09e0b43db96e 100644 --- a/receiver/netflowreceiver/listener.go +++ b/receiver/netflowreceiver/listener.go @@ -3,9 +3,140 @@ package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver" +import ( + "errors" + "fmt" + "net" + + "github.com/netsampler/goflow2/v2/decoders/netflow" + + protoproducer "github.com/netsampler/goflow2/v2/producer/proto" + "github.com/netsampler/goflow2/v2/utils" + "github.com/netsampler/goflow2/v2/utils/debug" + + "go.opentelemetry.io/collector/consumer" + "go.uber.org/zap" +) + type Listener struct { - // config Config - // logger *zap.Logger - // recv *utils.UDPReceiver - // logConsumer consumer.Logs + config Config + logger *zap.Logger + recv *utils.UDPReceiver + logConsumer consumer.Logs +} + +func (l *Listener) Dropped(msg utils.Message) { + l.logger.Warn("Dropped netflow message", zap.Any("msg", msg)) +} + +func NewListener(config Config, logger *zap.Logger, logConsumer consumer.Logs) *Listener { + return &Listener{config: config, logger: logger, logConsumer: logConsumer} +} + +func (l *Listener) Start() error { + l.logger.Info("Creating the netflow UDP listener", zap.Any("config", l.config)) + cfg := &utils.UDPReceiverConfig{ + Sockets: l.config.Sockets, + Workers: l.config.Workers, + QueueSize: l.config.QueueSize, + Blocking: false, + ReceiverCallback: l, + } + recv, err := utils.NewUDPReceiver(cfg) + if err != nil { + return err + } + l.recv = recv + + decodeFunc, err := l.buildDecodeFunc() + if err != nil { + return err + } + + l.logger.Info("Start listening for NetFlow", zap.Any("config", l.config)) + if err := l.recv.Start(l.config.Hostname, l.config.Port, decodeFunc); err != nil { + return err + } + + go l.handleErrors() + + return nil +} + +// handleErrors handles errors from the listener +func (l *Listener) handleErrors() { + for err := range l.recv.Errors() { + if errors.Is(err, net.ErrClosed) { + l.logger.Info("receiver closed") + continue + } else if !errors.Is(err, netflow.ErrorTemplateNotFound) && !errors.Is(err, debug.PanicError) { + l.logger.Error("receiver error", zap.Error(err)) + continue + } else if errors.Is(err, netflow.ErrorTemplateNotFound) { + l.logger.Warn("template was not found for this message") + continue + } else if errors.Is(err, debug.PanicError) { + var pErrMsg *debug.PanicErrorMessage + if errors.As(err, &pErrMsg) { + l.logger.Error("panic error", zap.String("panic", pErrMsg.Inner)) + l.logger.Error("receiver stacktrace", zap.String("stack", string(pErrMsg.Stacktrace))) + l.logger.Error("receiver msg", zap.Any("error", pErrMsg.Msg)) + } + l.logger.Error("receiver panic", zap.Error(err)) + + continue + } + } +} + +// buildDecodeFunc creates a decode function based on the scheme +func (l *Listener) buildDecodeFunc() (utils.DecoderFunc, error) { + + // Eventually this can be used to configure mappings + cfgProducer := &protoproducer.ProducerConfig{} + cfgm, err := cfgProducer.Compile() // converts configuration into a format that can be used by a protobuf producer + if err != nil { + return nil, err + } + // We use a goflow2 proto producer to produce messages using protobuf format + protoProducer, err := protoproducer.CreateProtoProducer(cfgm, protoproducer.CreateSamplingSystem) + if err != nil { + return nil, err + } + + // the otel log producer converts those messages into OpenTelemetry logs + otelLogsProducer := NewOtelLogsProducer(protoProducer, l.logConsumer) + + cfgPipe := &utils.PipeConfig{ + Producer: otelLogsProducer, + // Format: &format.Format{ + // FormatDriver: &format.JSONFormatDriver{}, + // }, + } + + var decodeFunc utils.DecoderFunc + var p utils.FlowPipe + if l.config.Scheme == "sflow" { + p = utils.NewSFlowPipe(cfgPipe) + } else if l.config.Scheme == "netflow" { + p = utils.NewNetFlowPipe(cfgPipe) + } else if l.config.Scheme == "flow" { + p = utils.NewFlowPipe(cfgPipe) + } else { + return nil, fmt.Errorf("scheme does not exist: %s", l.config.Scheme) + } + + decodeFunc = p.DecodeFlow + + // We wrap panics while decoding the message to habndle them later + decodeFunc = debug.PanicDecoderWrapper(decodeFunc) + + return decodeFunc, nil +} + +func (l *Listener) Shutdown() error { + if l.recv != nil { + return l.recv.Stop() + } + return nil } diff --git a/receiver/netflowreceiver/listener_test.go b/receiver/netflowreceiver/listener_test.go index 9fbae90c700c..599e7928cd3d 100644 --- a/receiver/netflowreceiver/listener_test.go +++ b/receiver/netflowreceiver/listener_test.go @@ -23,5 +23,5 @@ func TestCreateValidDefaultListener(t *testing.T) { assert.Equal(t, 2055, receiver.(*netflowReceiver).config.Port) assert.Equal(t, 1, receiver.(*netflowReceiver).config.Sockets) assert.Equal(t, 2, receiver.(*netflowReceiver).config.Workers) - assert.Equal(t, 1_000_000, receiver.(*netflowReceiver).config.QueueSize) + assert.Equal(t, 1_000, receiver.(*netflowReceiver).config.QueueSize) } diff --git a/receiver/netflowreceiver/parser.go b/receiver/netflowreceiver/parser.go new file mode 100644 index 000000000000..fdbbdaa461ce --- /dev/null +++ b/receiver/netflowreceiver/parser.go @@ -0,0 +1,138 @@ +package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver" + +import ( + "errors" + "net/netip" + "time" + + "github.com/netsampler/goflow2/v2/producer" + protoproducer "github.com/netsampler/goflow2/v2/producer/proto" +) + +var ( + etypeName = map[uint32]string{ + 0x806: "ARP", + 0x800: "IPv4", + 0x86dd: "IPv6", + } + protoName = map[uint32]string{ + 1: "ICMP", + 6: "TCP", + 17: "UDP", + 58: "ICMPv6", + 132: "SCTP", + } + + flowTypeName = map[int32]string{ + 0: "UNKNOWN", + 1: "SFLOW_5", + 2: "NETFLOW_V5", + 3: "NETFLOW_V9", + 4: "IPFIX", + } +) + +type NetworkAddress struct { + Address string `json:"address,omitempty"` + Port uint32 `json:"port,omitempty"` +} + +type Flow struct { + Type string `json:"type,omitempty"` + TimeReceived time.Time `json:"time_received,omitempty"` + Start time.Time `json:"start,omitempty"` + End time.Time `json:"end,omitempty"` + SequenceNum uint32 `json:"sequence_num,omitempty"` + SamplingRate uint64 `json:"sampling_rate,omitempty"` + SamplerAddress string `json:"sampler_address,omitempty"` +} + +type Protocol struct { + Name []byte `json:"name,omitempty"` // Layer 7 +} + +type NetworkIO struct { + Bytes uint64 `json:"bytes,omitempty"` + Packets uint64 `json:"packets,omitempty"` +} + +type OtelNetworkMessage struct { + Source NetworkAddress `json:"source,omitempty"` + Destination NetworkAddress `json:"destination,omitempty"` + Transport string `json:"transport,omitempty"` // Layer 4 + Type string `json:"type,omitempty"` // Layer 3 + IO NetworkIO `json:"io,omitempty"` + Flow Flow `json:"flow,omitempty"` +} + +func getEtypeName(etype uint32) string { + if name, ok := etypeName[etype]; ok { + return name + } + return "unknown" +} + +func getProtoName(proto uint32) string { + if name, ok := protoName[proto]; ok { + return name + } + return "unknown" +} + +func getFlowTypeName(flowType int32) string { + if name, ok := flowTypeName[flowType]; ok { + return name + } + return "unknown" +} + +// ConvertToOtel converts a ProtoProducerMessage to an OtelNetworkMessage +func ConvertToOtel(m producer.ProducerMessage) (*OtelNetworkMessage, error) { + + // we know msg is ProtoProducerMessage because that is the parent producer + pm, ok := m.(*protoproducer.ProtoProducerMessage) + if !ok { + return nil, errors.New("message is not ProtoProducerMessage") + } + + // Parse IP addresses bytes to netip.Addr + srcAddr, _ := netip.AddrFromSlice(pm.SrcAddr) + dstAddr, _ := netip.AddrFromSlice(pm.DstAddr) + samplerAddr, _ := netip.AddrFromSlice(pm.SamplerAddress) + + // Time the receiver received the message + receivedTime := time.Unix(0, int64(pm.TimeReceivedNs)) + startTime := time.Unix(0, int64(pm.TimeFlowStartNs)) + endTime := time.Unix(0, int64(pm.TimeFlowEndNs)) + + // Construct the actual log record based on the otel semantic conventions + // see https://opentelemetry.io/docs/specs/semconv/general/attributes/ + otelMessage := OtelNetworkMessage{ + Source: NetworkAddress{ + Address: srcAddr.String(), + Port: pm.SrcPort, + }, + Destination: NetworkAddress{ + Address: dstAddr.String(), + Port: pm.DstPort, + }, + Type: getEtypeName(pm.Etype), // Layer 3 + Transport: getProtoName(pm.Proto), // Layer 4 + IO: NetworkIO{ + Bytes: pm.Bytes, + Packets: pm.Packets, + }, + Flow: Flow{ + Type: getFlowTypeName(int32(pm.Type)), + TimeReceived: receivedTime, + Start: startTime, + End: endTime, + SequenceNum: pm.SequenceNum, + SamplingRate: pm.SamplingRate, + SamplerAddress: samplerAddr.String(), + }, + } + + return &otelMessage, nil + +} diff --git a/receiver/netflowreceiver/producer.go b/receiver/netflowreceiver/producer.go new file mode 100644 index 000000000000..0eff95256e94 --- /dev/null +++ b/receiver/netflowreceiver/producer.go @@ -0,0 +1,92 @@ +package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver" + +import ( + "context" + "encoding/json" + + "github.com/netsampler/goflow2/v2/producer" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver/internal/metadata" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" +) + +// OtelLogsProducerWrapper is a wrapper around a producer.ProducerInterface that sends the messages to a log consumer +type OtelLogsProducerWrapper struct { + wrapped producer.ProducerInterface + logConsumer consumer.Logs +} + +// Produce converts the message into a list log records and sends them to log consumer +func (o *OtelLogsProducerWrapper) Produce(msg interface{}, args *producer.ProduceArgs) ([]producer.ProducerMessage, error) { + + // First we let the proto producer parse the message + // All the netflow protocol and structure is handled by the proto producer + flowMessageSet, err := o.wrapped.Produce(msg, args) + if err != nil { + return flowMessageSet, err + } + + // Create the otel log structure to hold our messages + log := plog.NewLogs() + scopeLog := log.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty() + scopeLog.Scope().SetName(metadata.ScopeName) + scopeLog.Scope().Attributes().PutStr("receiver", metadata.Type.String()) + logRecords := scopeLog.LogRecords() + + // A single netflow packet can contain multiple flow messages + for _, msg := range flowMessageSet { + + // Convert each one to the Otel semantic dictionary format + otelMessage, err := ConvertToOtel(msg) + if err != nil { + continue + } + + logRecord := logRecords.AppendEmpty() + logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(otelMessage.Flow.Start)) + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(otelMessage.Flow.TimeReceived)) + + // The bytes of the message in JSON format + m, err := json.Marshal(otelMessage) + if err != nil { + continue + } + + // Convert to a map[string] + // https://opentelemetry.io/docs/specs/otel/logs/data-model/#type-mapstring-any + sec := map[string]interface{}{} + if err = json.Unmarshal(m, &sec); err != nil { + continue + } + + // Set the map to the log record body + err = logRecord.Body().SetEmptyMap().FromRaw(sec) + if err != nil { + continue + } + } + + // Send the logs to the collector, it is difficult to pass the context here + err = o.logConsumer.ConsumeLogs(context.TODO(), log) + if err != nil { + return flowMessageSet, err + } + + return flowMessageSet, nil +} + +func (o *OtelLogsProducerWrapper) Close() { + o.wrapped.Close() +} + +func (o *OtelLogsProducerWrapper) Commit(flowMessageSet []producer.ProducerMessage) { + o.wrapped.Commit(flowMessageSet) +} + +func NewOtelLogsProducer(wrapped producer.ProducerInterface, logConsumer consumer.Logs) producer.ProducerInterface { + return &OtelLogsProducerWrapper{ + wrapped: wrapped, + logConsumer: logConsumer, + } +} diff --git a/receiver/netflowreceiver/receiver.go b/receiver/netflowreceiver/receiver.go index c31bcbe7baaf..8e813a6c01bf 100644 --- a/receiver/netflowreceiver/receiver.go +++ b/receiver/netflowreceiver/receiver.go @@ -12,18 +12,28 @@ import ( ) type netflowReceiver struct { - // host component.Host - // cancel context.CancelFunc config *Config logConsumer consumer.Logs logger *zap.Logger - // listeners []*Listener + listener *Listener } -func (nr *netflowReceiver) Start(_ context.Context, _ component.Host) error { +func (nr *netflowReceiver) Start(ctx context.Context, host component.Host) error { + // TODO - Pass ctx and host here + listener := NewListener(*nr.config, nr.logger, nr.logConsumer) + if err := listener.Start(); err != nil { + return err + } + nr.listener = listener + nr.logger.Info("NetFlow receiver started") return nil } func (nr *netflowReceiver) Shutdown(_ context.Context) error { + nr.logger.Info("NetFlow receiver is shutting down") + err := nr.listener.Shutdown() + if err != nil { + return err + } return nil } From 9871936136401800a54dc6491d3b60dc5c819bef Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Sun, 15 Dec 2024 13:43:51 -0600 Subject: [PATCH 02/30] #32732 - Make internal methods private --- receiver/netflowreceiver/listener.go | 4 ++-- receiver/netflowreceiver/parser.go | 4 ++-- receiver/netflowreceiver/producer.go | 4 ++-- receiver/netflowreceiver/receiver.go | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/receiver/netflowreceiver/listener.go b/receiver/netflowreceiver/listener.go index 09e0b43db96e..0ddc7d09bdf1 100644 --- a/receiver/netflowreceiver/listener.go +++ b/receiver/netflowreceiver/listener.go @@ -29,7 +29,7 @@ func (l *Listener) Dropped(msg utils.Message) { l.logger.Warn("Dropped netflow message", zap.Any("msg", msg)) } -func NewListener(config Config, logger *zap.Logger, logConsumer consumer.Logs) *Listener { +func newListener(config Config, logger *zap.Logger, logConsumer consumer.Logs) *Listener { return &Listener{config: config, logger: logger, logConsumer: logConsumer} } @@ -105,7 +105,7 @@ func (l *Listener) buildDecodeFunc() (utils.DecoderFunc, error) { } // the otel log producer converts those messages into OpenTelemetry logs - otelLogsProducer := NewOtelLogsProducer(protoProducer, l.logConsumer) + otelLogsProducer := newOtelLogsProducer(protoProducer, l.logConsumer) cfgPipe := &utils.PipeConfig{ Producer: otelLogsProducer, diff --git a/receiver/netflowreceiver/parser.go b/receiver/netflowreceiver/parser.go index fdbbdaa461ce..d7b4d9ebc625 100644 --- a/receiver/netflowreceiver/parser.go +++ b/receiver/netflowreceiver/parser.go @@ -86,8 +86,8 @@ func getFlowTypeName(flowType int32) string { return "unknown" } -// ConvertToOtel converts a ProtoProducerMessage to an OtelNetworkMessage -func ConvertToOtel(m producer.ProducerMessage) (*OtelNetworkMessage, error) { +// convertToOtel converts a ProtoProducerMessage to an OtelNetworkMessage +func convertToOtel(m producer.ProducerMessage) (*OtelNetworkMessage, error) { // we know msg is ProtoProducerMessage because that is the parent producer pm, ok := m.(*protoproducer.ProtoProducerMessage) diff --git a/receiver/netflowreceiver/producer.go b/receiver/netflowreceiver/producer.go index 0eff95256e94..2cbd727add10 100644 --- a/receiver/netflowreceiver/producer.go +++ b/receiver/netflowreceiver/producer.go @@ -38,7 +38,7 @@ func (o *OtelLogsProducerWrapper) Produce(msg interface{}, args *producer.Produc for _, msg := range flowMessageSet { // Convert each one to the Otel semantic dictionary format - otelMessage, err := ConvertToOtel(msg) + otelMessage, err := convertToOtel(msg) if err != nil { continue } @@ -84,7 +84,7 @@ func (o *OtelLogsProducerWrapper) Commit(flowMessageSet []producer.ProducerMessa o.wrapped.Commit(flowMessageSet) } -func NewOtelLogsProducer(wrapped producer.ProducerInterface, logConsumer consumer.Logs) producer.ProducerInterface { +func newOtelLogsProducer(wrapped producer.ProducerInterface, logConsumer consumer.Logs) producer.ProducerInterface { return &OtelLogsProducerWrapper{ wrapped: wrapped, logConsumer: logConsumer, diff --git a/receiver/netflowreceiver/receiver.go b/receiver/netflowreceiver/receiver.go index 8e813a6c01bf..daa9f72a89c5 100644 --- a/receiver/netflowreceiver/receiver.go +++ b/receiver/netflowreceiver/receiver.go @@ -20,7 +20,7 @@ type netflowReceiver struct { func (nr *netflowReceiver) Start(ctx context.Context, host component.Host) error { // TODO - Pass ctx and host here - listener := NewListener(*nr.config, nr.logger, nr.logConsumer) + listener := newListener(*nr.config, nr.logger, nr.logConsumer) if err := listener.Start(); err != nil { return err } From 09a91d1ce3a6726f56355b2a5a9901fd44c83475 Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Sun, 15 Dec 2024 13:51:44 -0600 Subject: [PATCH 03/30] #32732 - netflow - make checks --- receiver/netflowreceiver/listener.go | 1 - receiver/netflowreceiver/parser.go | 2 -- receiver/netflowreceiver/producer.go | 4 ++-- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/receiver/netflowreceiver/listener.go b/receiver/netflowreceiver/listener.go index 0ddc7d09bdf1..9c361908e88a 100644 --- a/receiver/netflowreceiver/listener.go +++ b/receiver/netflowreceiver/listener.go @@ -91,7 +91,6 @@ func (l *Listener) handleErrors() { // buildDecodeFunc creates a decode function based on the scheme func (l *Listener) buildDecodeFunc() (utils.DecoderFunc, error) { - // Eventually this can be used to configure mappings cfgProducer := &protoproducer.ProducerConfig{} cfgm, err := cfgProducer.Compile() // converts configuration into a format that can be used by a protobuf producer diff --git a/receiver/netflowreceiver/parser.go b/receiver/netflowreceiver/parser.go index d7b4d9ebc625..4004608b2431 100644 --- a/receiver/netflowreceiver/parser.go +++ b/receiver/netflowreceiver/parser.go @@ -88,7 +88,6 @@ func getFlowTypeName(flowType int32) string { // convertToOtel converts a ProtoProducerMessage to an OtelNetworkMessage func convertToOtel(m producer.ProducerMessage) (*OtelNetworkMessage, error) { - // we know msg is ProtoProducerMessage because that is the parent producer pm, ok := m.(*protoproducer.ProtoProducerMessage) if !ok { @@ -134,5 +133,4 @@ func convertToOtel(m producer.ProducerMessage) (*OtelNetworkMessage, error) { } return &otelMessage, nil - } diff --git a/receiver/netflowreceiver/producer.go b/receiver/netflowreceiver/producer.go index 2cbd727add10..48f9db6fa75b 100644 --- a/receiver/netflowreceiver/producer.go +++ b/receiver/netflowreceiver/producer.go @@ -5,10 +5,11 @@ import ( "encoding/json" "github.com/netsampler/goflow2/v2/producer" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver/internal/metadata" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver/internal/metadata" ) // OtelLogsProducerWrapper is a wrapper around a producer.ProducerInterface that sends the messages to a log consumer @@ -19,7 +20,6 @@ type OtelLogsProducerWrapper struct { // Produce converts the message into a list log records and sends them to log consumer func (o *OtelLogsProducerWrapper) Produce(msg interface{}, args *producer.ProduceArgs) ([]producer.ProducerMessage, error) { - // First we let the proto producer parse the message // All the netflow protocol and structure is handled by the proto producer flowMessageSet, err := o.wrapped.Produce(msg, args) From 26d5aec5f30bc4fe914bc0751a7f700f9e970b05 Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Mon, 16 Dec 2024 13:06:17 -0600 Subject: [PATCH 04/30] #32732 - netflow - simplify implementation, remove listener --- receiver/netflowreceiver/config_test.go | 2 +- receiver/netflowreceiver/factory.go | 15 ++- receiver/netflowreceiver/listener.go | 141 ------------------- receiver/netflowreceiver/listener_test.go | 27 ---- receiver/netflowreceiver/receiver.go | 157 ++++++++++++++++++++-- 5 files changed, 155 insertions(+), 187 deletions(-) delete mode 100644 receiver/netflowreceiver/listener.go delete mode 100644 receiver/netflowreceiver/listener_test.go diff --git a/receiver/netflowreceiver/config_test.go b/receiver/netflowreceiver/config_test.go index 167eaadaaf1e..086083122e94 100644 --- a/receiver/netflowreceiver/config_test.go +++ b/receiver/netflowreceiver/config_test.go @@ -36,7 +36,7 @@ func TestLoadConfig(t *testing.T) { Port: 2055, Sockets: 1, Workers: 1, - QueueSize: 1000000, + QueueSize: 1000, }, }, } diff --git a/receiver/netflowreceiver/factory.go b/receiver/netflowreceiver/factory.go index f5da854f02e3..a77f52716508 100644 --- a/receiver/netflowreceiver/factory.go +++ b/receiver/netflowreceiver/factory.go @@ -27,6 +27,8 @@ func NewFactory() receiver.Factory { receiver.WithLogs(createLogsReceiver, metadata.LogsStability)) } +// Config defines configuration for netflow receiver. +// By default we listen for netflow traffic on port 2055 func createDefaultConfig() component.Config { return &Config{ Scheme: "netflow", @@ -37,14 +39,15 @@ func createDefaultConfig() component.Config { } } +// createLogsReceiver creates a netflow receiver. +// We also create the UDP receiver, which is the piece of software that actually listens +// for incoming netflow traffic on an UDP port. func createLogsReceiver(_ context.Context, params receiver.Settings, cfg component.Config, consumer consumer.Logs) (receiver.Logs, error) { - logger := params.Logger - conf := cfg.(*Config) + conf := *(cfg.(*Config)) - nr := &netflowReceiver{ - logger: logger, - logConsumer: consumer, - config: conf, + nr, err := newNetflowLogsReceiver(params, conf, consumer) + if err != nil { + return nil, err } return nr, nil diff --git a/receiver/netflowreceiver/listener.go b/receiver/netflowreceiver/listener.go deleted file mode 100644 index 9c361908e88a..000000000000 --- a/receiver/netflowreceiver/listener.go +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver" - -import ( - "errors" - "fmt" - "net" - - "github.com/netsampler/goflow2/v2/decoders/netflow" - - protoproducer "github.com/netsampler/goflow2/v2/producer/proto" - "github.com/netsampler/goflow2/v2/utils" - "github.com/netsampler/goflow2/v2/utils/debug" - - "go.opentelemetry.io/collector/consumer" - "go.uber.org/zap" -) - -type Listener struct { - config Config - logger *zap.Logger - recv *utils.UDPReceiver - logConsumer consumer.Logs -} - -func (l *Listener) Dropped(msg utils.Message) { - l.logger.Warn("Dropped netflow message", zap.Any("msg", msg)) -} - -func newListener(config Config, logger *zap.Logger, logConsumer consumer.Logs) *Listener { - return &Listener{config: config, logger: logger, logConsumer: logConsumer} -} - -func (l *Listener) Start() error { - l.logger.Info("Creating the netflow UDP listener", zap.Any("config", l.config)) - cfg := &utils.UDPReceiverConfig{ - Sockets: l.config.Sockets, - Workers: l.config.Workers, - QueueSize: l.config.QueueSize, - Blocking: false, - ReceiverCallback: l, - } - recv, err := utils.NewUDPReceiver(cfg) - if err != nil { - return err - } - l.recv = recv - - decodeFunc, err := l.buildDecodeFunc() - if err != nil { - return err - } - - l.logger.Info("Start listening for NetFlow", zap.Any("config", l.config)) - if err := l.recv.Start(l.config.Hostname, l.config.Port, decodeFunc); err != nil { - return err - } - - go l.handleErrors() - - return nil -} - -// handleErrors handles errors from the listener -func (l *Listener) handleErrors() { - for err := range l.recv.Errors() { - if errors.Is(err, net.ErrClosed) { - l.logger.Info("receiver closed") - continue - } else if !errors.Is(err, netflow.ErrorTemplateNotFound) && !errors.Is(err, debug.PanicError) { - l.logger.Error("receiver error", zap.Error(err)) - continue - } else if errors.Is(err, netflow.ErrorTemplateNotFound) { - l.logger.Warn("template was not found for this message") - continue - } else if errors.Is(err, debug.PanicError) { - var pErrMsg *debug.PanicErrorMessage - if errors.As(err, &pErrMsg) { - l.logger.Error("panic error", zap.String("panic", pErrMsg.Inner)) - l.logger.Error("receiver stacktrace", zap.String("stack", string(pErrMsg.Stacktrace))) - l.logger.Error("receiver msg", zap.Any("error", pErrMsg.Msg)) - } - l.logger.Error("receiver panic", zap.Error(err)) - - continue - } - } -} - -// buildDecodeFunc creates a decode function based on the scheme -func (l *Listener) buildDecodeFunc() (utils.DecoderFunc, error) { - // Eventually this can be used to configure mappings - cfgProducer := &protoproducer.ProducerConfig{} - cfgm, err := cfgProducer.Compile() // converts configuration into a format that can be used by a protobuf producer - if err != nil { - return nil, err - } - // We use a goflow2 proto producer to produce messages using protobuf format - protoProducer, err := protoproducer.CreateProtoProducer(cfgm, protoproducer.CreateSamplingSystem) - if err != nil { - return nil, err - } - - // the otel log producer converts those messages into OpenTelemetry logs - otelLogsProducer := newOtelLogsProducer(protoProducer, l.logConsumer) - - cfgPipe := &utils.PipeConfig{ - Producer: otelLogsProducer, - // Format: &format.Format{ - // FormatDriver: &format.JSONFormatDriver{}, - // }, - } - - var decodeFunc utils.DecoderFunc - var p utils.FlowPipe - if l.config.Scheme == "sflow" { - p = utils.NewSFlowPipe(cfgPipe) - } else if l.config.Scheme == "netflow" { - p = utils.NewNetFlowPipe(cfgPipe) - } else if l.config.Scheme == "flow" { - p = utils.NewFlowPipe(cfgPipe) - } else { - return nil, fmt.Errorf("scheme does not exist: %s", l.config.Scheme) - } - - decodeFunc = p.DecodeFlow - - // We wrap panics while decoding the message to habndle them later - decodeFunc = debug.PanicDecoderWrapper(decodeFunc) - - return decodeFunc, nil -} - -func (l *Listener) Shutdown() error { - if l.recv != nil { - return l.recv.Stop() - } - return nil -} diff --git a/receiver/netflowreceiver/listener_test.go b/receiver/netflowreceiver/listener_test.go deleted file mode 100644 index 599e7928cd3d..000000000000 --- a/receiver/netflowreceiver/listener_test.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package netflowreceiver - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/receiver/receivertest" -) - -func TestCreateValidDefaultListener(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - set := receivertest.NewNopSettings() - receiver, err := factory.CreateLogs(context.Background(), set, cfg, consumertest.NewNop()) - assert.NoError(t, err, "receiver creation failed") - assert.NotNil(t, receiver, "receiver creation failed") - assert.Equal(t, "netflow", receiver.(*netflowReceiver).config.Scheme) - assert.Equal(t, 2055, receiver.(*netflowReceiver).config.Port) - assert.Equal(t, 1, receiver.(*netflowReceiver).config.Sockets) - assert.Equal(t, 2, receiver.(*netflowReceiver).config.Workers) - assert.Equal(t, 1_000, receiver.(*netflowReceiver).config.QueueSize) -} diff --git a/receiver/netflowreceiver/receiver.go b/receiver/netflowreceiver/receiver.go index daa9f72a89c5..d524d976ee13 100644 --- a/receiver/netflowreceiver/receiver.go +++ b/receiver/netflowreceiver/receiver.go @@ -5,35 +5,168 @@ package netflowreceiver // import "github.com/open-telemetry/opentelemetry-colle import ( "context" + "errors" + "fmt" + "net" + "github.com/netsampler/goflow2/v2/decoders/netflow" + protoproducer "github.com/netsampler/goflow2/v2/producer/proto" + "github.com/netsampler/goflow2/v2/utils" + "github.com/netsampler/goflow2/v2/utils/debug" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" ) +type dropHandler struct { + logger *zap.Logger +} + +func (d *dropHandler) Dropped(msg utils.Message) { + d.logger.Warn("Dropped netflow message", zap.Any("msg", msg)) +} + type netflowReceiver struct { - config *Config - logConsumer consumer.Logs + config Config logger *zap.Logger - listener *Listener + udpReceiver *utils.UDPReceiver + logConsumer consumer.Logs +} + +func newNetflowLogsReceiver(params receiver.Settings, cfg Config, consumer consumer.Logs) (receiver.Logs, error) { + // UDP receiver configuration + udpCfg := &utils.UDPReceiverConfig{ + Sockets: cfg.Sockets, + Workers: cfg.Workers, + QueueSize: cfg.QueueSize, + Blocking: false, + ReceiverCallback: &dropHandler{ + logger: params.Logger, + }, + } + udpReceiver, err := utils.NewUDPReceiver(udpCfg) + if err != nil { + return nil, err + } + + nr := &netflowReceiver{ + logger: params.Logger, + config: cfg, + logConsumer: consumer, + udpReceiver: udpReceiver, + } + + return nr, nil } func (nr *netflowReceiver) Start(ctx context.Context, host component.Host) error { - // TODO - Pass ctx and host here - listener := newListener(*nr.config, nr.logger, nr.logConsumer) - if err := listener.Start(); err != nil { + nr.logger.Info("NetFlow receiver is starting...") + + // The function that will decode packets + decodeFunc, err := nr.buildDecodeFunc() + if err != nil { return err } - nr.listener = listener - nr.logger.Info("NetFlow receiver started") + + nr.logger.Info("Start listening for NetFlow on UDP", zap.Any("config", nr.config)) + if err := nr.udpReceiver.Start(nr.config.Hostname, nr.config.Port, decodeFunc); err != nil { + return err + } + + // This runs until the receiver is stoppped, consuming from an error channel + go nr.handleErrors() + return nil } -func (nr *netflowReceiver) Shutdown(_ context.Context) error { - nr.logger.Info("NetFlow receiver is shutting down") - err := nr.listener.Shutdown() +func (nr *netflowReceiver) Shutdown(context.Context) error { + nr.logger.Info("NetFlow receiver is shutting down...") + if nr.udpReceiver == nil { + return nil + } + err := nr.udpReceiver.Stop() if err != nil { - return err + nr.logger.Warn("Error stopping UDP receiver", zap.Error(err)) } + return nil } + +// buildDecodeFunc creates a decode function based on the scheme +// This is the fuction that will be invoked for every netflow packet received +// The function depends on the type of schema (netflow, sflow, flow) +func (nr *netflowReceiver) buildDecodeFunc() (utils.DecoderFunc, error) { + // Eventually this can be used to configure mappings + cfgProducer := &protoproducer.ProducerConfig{} + cfgm, err := cfgProducer.Compile() // converts configuration into a format that can be used by a protobuf producer + if err != nil { + return nil, err + } + // We use a goflow2 proto producer to produce messages using protobuf format + protoProducer, err := protoproducer.CreateProtoProducer(cfgm, protoproducer.CreateSamplingSystem) + if err != nil { + return nil, err + } + + // the otel log producer converts those messages into OpenTelemetry logs + // it is a wrapper around the protobuf producer + otelLogsProducer := newOtelLogsProducer(protoProducer, nr.logConsumer) + + cfgPipe := &utils.PipeConfig{ + Producer: otelLogsProducer, + } + + var decodeFunc utils.DecoderFunc + var p utils.FlowPipe + if nr.config.Scheme == "sflow" { + p = utils.NewSFlowPipe(cfgPipe) + } else if nr.config.Scheme == "netflow" { + p = utils.NewNetFlowPipe(cfgPipe) + } else if nr.config.Scheme == "flow" { + p = utils.NewFlowPipe(cfgPipe) + } else { + return nil, fmt.Errorf("scheme does not exist: %s", nr.config.Scheme) + } + + decodeFunc = p.DecodeFlow + + // We wrap panics while decoding the message to habndle them later + decodeFunc = debug.PanicDecoderWrapper(decodeFunc) + + return decodeFunc, nil +} + +// handleErrors handles errors from the listener +// These come from the panic decoder wrapper around the decode function +// We don't want the receiver to stop if there is an error processing a packet +func (nr *netflowReceiver) handleErrors() { + for err := range nr.udpReceiver.Errors() { + // Receiver was closed, exit + if errors.Is(err, net.ErrClosed) { + nr.logger.Info("UDP receiver closed, exiting error handler") + return + + // This is the type of error we want to log + } else if !errors.Is(err, netflow.ErrorTemplateNotFound) && !errors.Is(err, debug.PanicError) { + nr.logger.Error("receiver error", zap.Error(err)) + continue + + // A template was not found, this is not a big deal and can happen + } else if errors.Is(err, netflow.ErrorTemplateNotFound) { + nr.logger.Warn("template was not found for this message") + continue + + // These are pretty bad and should not happen + } else if errors.Is(err, debug.PanicError) { + var pErrMsg *debug.PanicErrorMessage + if errors.As(err, &pErrMsg) { + nr.logger.Error("panic error", zap.String("panic", pErrMsg.Inner)) + nr.logger.Error("receiver stacktrace", zap.String("stack", string(pErrMsg.Stacktrace))) + nr.logger.Error("receiver msg", zap.Any("error", pErrMsg.Msg)) + } + nr.logger.Error("receiver panic", zap.Error(err)) + continue + } + } +} From 64acc947a27e4a01d6fc66e52e731009ffe8bfbd Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Mon, 16 Dec 2024 14:50:04 -0600 Subject: [PATCH 05/30] netflow - add parser tests --- receiver/netflowreceiver/parser_test.go | 102 ++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 receiver/netflowreceiver/parser_test.go diff --git a/receiver/netflowreceiver/parser_test.go b/receiver/netflowreceiver/parser_test.go new file mode 100644 index 000000000000..16259e4fbc79 --- /dev/null +++ b/receiver/netflowreceiver/parser_test.go @@ -0,0 +1,102 @@ +package netflowreceiver + +import ( + "net/netip" + "testing" + "time" + + flowpb "github.com/netsampler/goflow2/v2/pb" + protoproducer "github.com/netsampler/goflow2/v2/producer/proto" + "github.com/stretchr/testify/assert" +) + +func TestGetProtoName(t *testing.T) { + tests := []struct { + proto uint32 + want string + }{ + {proto: 1, want: "ICMP"}, + {proto: 6, want: "TCP"}, + {proto: 17, want: "UDP"}, + {proto: 58, want: "ICMPv6"}, + {proto: 132, want: "SCTP"}, + {proto: 0, want: "unknown"}, + } + + for _, tt := range tests { + t.Run(tt.want, func(t *testing.T) { + got := getProtoName(tt.proto) + if got != tt.want { + t.Errorf("getProtoName(%d) = %s; want %s", tt.proto, got, tt.want) + } + }) + } +} + +func TestConvertToOtel(t *testing.T) { + pm := &protoproducer.ProtoProducerMessage{ + FlowMessage: flowpb.FlowMessage{ + SrcAddr: netip.MustParseAddr("192.168.1.1").AsSlice(), + SrcPort: 0, + DstAddr: netip.MustParseAddr("192.168.1.2").AsSlice(), + DstPort: 2055, + SamplerAddress: netip.MustParseAddr("192.168.1.100").AsSlice(), + Type: 3, + Etype: 0x800, + Proto: 6, + Bytes: 100, + Packets: 1, + TimeReceivedNs: 1000000000, + TimeFlowStartNs: 1000000000, + TimeFlowEndNs: 1000000100, + SequenceNum: 1, + SamplingRate: 1, + }, + } + + otel, err := convertToOtel(pm) + if err != nil { + t.Errorf("convertToOtel() error = %v", err) + return + } + assert.Equal(t, "192.168.1.1", otel.Source.Address) + assert.Equal(t, uint32(0), otel.Source.Port) + assert.Equal(t, "192.168.1.2", otel.Destination.Address) + assert.Equal(t, uint32(2055), otel.Destination.Port) + assert.Equal(t, uint64(100), otel.IO.Bytes) + assert.Equal(t, uint64(1), otel.IO.Packets) + assert.Equal(t, "TCP", otel.Transport) + assert.Equal(t, "IPv4", otel.Type) + assert.Equal(t, "NETFLOW_V9", otel.Flow.Type) + assert.Equal(t, "192.168.1.100", otel.Flow.SamplerAddress) + assert.Equal(t, uint32(1), otel.Flow.SequenceNum) + assert.Equal(t, uint64(1), otel.Flow.SamplingRate) + assert.Equal(t, time.Unix(0, 1000000000), otel.Flow.Start) + assert.Equal(t, time.Unix(0, 1000000100), otel.Flow.End) + assert.Equal(t, time.Unix(0, 1000000000), otel.Flow.TimeReceived) +} + +func TestEmptyConvertToOtel(t *testing.T) { + pm := &protoproducer.ProtoProducerMessage{} + + otel, err := convertToOtel(pm) + if err != nil { + t.Errorf("convertToOtel() error = %v", err) + return + } + assert.Equal(t, "invalid IP", otel.Source.Address) + assert.Equal(t, uint32(0), otel.Source.Port) + assert.Equal(t, "invalid IP", otel.Destination.Address) + assert.Equal(t, uint32(0), otel.Destination.Port) + assert.Equal(t, uint64(0), otel.IO.Bytes) + assert.Equal(t, uint64(0), otel.IO.Packets) + assert.Equal(t, "unknown", otel.Transport) + assert.Equal(t, "unknown", otel.Type) + assert.Equal(t, "UNKNOWN", otel.Flow.Type) + assert.Equal(t, "invalid IP", otel.Flow.SamplerAddress) + assert.Equal(t, uint32(0), otel.Flow.SequenceNum) + assert.Equal(t, uint64(0), otel.Flow.SamplingRate) + assert.Equal(t, time.Unix(0, 0), otel.Flow.Start) + assert.Equal(t, time.Unix(0, 0), otel.Flow.End) + assert.Equal(t, time.Unix(0, 0), otel.Flow.TimeReceived) +} From 658c414e9b5532abe110f023209402d052008087 Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Mon, 16 Dec 2024 15:47:14 -0600 Subject: [PATCH 06/30] netflow - add more tests --- receiver/netflowreceiver/config_test.go | 48 ++++++++++++++ receiver/netflowreceiver/producer_test.go | 66 +++++++++++++++++++ receiver/netflowreceiver/receiver.go | 1 - receiver/netflowreceiver/testdata/config.yaml | 34 ++++++++++ 4 files changed, 148 insertions(+), 1 deletion(-) create mode 100644 receiver/netflowreceiver/producer_test.go diff --git a/receiver/netflowreceiver/config_test.go b/receiver/netflowreceiver/config_test.go index 086083122e94..4afc38ce68ee 100644 --- a/receiver/netflowreceiver/config_test.go +++ b/receiver/netflowreceiver/config_test.go @@ -39,6 +39,46 @@ func TestLoadConfig(t *testing.T) { QueueSize: 1000, }, }, + { + id: component.NewIDWithName(metadata.Type, "zero_queue"), + expected: &Config{ + Scheme: "netflow", + Port: 2055, + Sockets: 1, + Workers: 1, + QueueSize: 1000, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "zero_queue"), + expected: &Config{ + Scheme: "netflow", + Port: 2055, + Sockets: 1, + Workers: 1, + QueueSize: 1000, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "sflow"), + expected: &Config{ + Scheme: "sflow", + Port: 2055, + Sockets: 1, + Workers: 1, + QueueSize: 1000, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "flow"), + expected: &Config{ + Scheme: "flow", + Port: 2055, + Sockets: 1, + Workers: 1, + QueueSize: 1000, + }, + }, } for _, tt := range tests { @@ -74,6 +114,14 @@ func TestInvalidConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, "invalid_port"), err: "port must be greater than 0", }, + { + id: component.NewIDWithName(metadata.Type, "zero_sockets"), + err: "sockets must be greater than 0", + }, + { + id: component.NewIDWithName(metadata.Type, "zero_workers"), + err: "workers must be greater than 0", + }, } for _, tt := range tests { diff --git a/receiver/netflowreceiver/producer_test.go b/receiver/netflowreceiver/producer_test.go new file mode 100644 index 000000000000..d9d3648f918b --- /dev/null +++ b/receiver/netflowreceiver/producer_test.go @@ -0,0 +1,66 @@ +package netflowreceiver + +import ( + "testing" + + "github.com/netsampler/goflow2/v2/decoders/netflow" + flowpb "github.com/netsampler/goflow2/v2/pb" + "github.com/netsampler/goflow2/v2/producer" + protoproducer "github.com/netsampler/goflow2/v2/producer/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumertest" +) + +func TestProduce(t *testing.T) { + + // list of netflow.DataFlowSet + message := &netflow.NFv9Packet{ + Version: 9, + Count: 1, + SystemUptime: 0xb3bff683, + UnixSeconds: 0x618aa3a8, + SequenceNumber: 838987416, + SourceId: 256, + FlowSets: []interface{}{ + netflow.DataFlowSet{ + FlowSetHeader: netflow.FlowSetHeader{ + Id: 260, + Length: 1372, + }, + Records: []netflow.DataRecord{ + {Values: []netflow.DataField{ + { + PenProvided: false, + Type: 2, + Pen: 0, + Value: []uint8{0x00, 0x00, 0x00, 0x01}, + }, + }, + }, + }, + }, + }, + } + + cfgProducer := &protoproducer.ProducerConfig{} + cfgm, err := cfgProducer.Compile() // converts configuration into a format that can be used by a protobuf producer + require.NoError(t, err) + // We use a goflow2 proto producer to produce messages using protobuf format + protoProducer, err := protoproducer.CreateProtoProducer(cfgm, protoproducer.CreateSamplingSystem) + require.NoError(t, err) + + otelLogsProducer := newOtelLogsProducer(protoProducer, consumertest.NewNop()) + messages, err := otelLogsProducer.Produce(message, &producer.ProduceArgs{}) + require.NoError(t, err) + require.NotNil(t, messages) + assert.Equal(t, 1, len(messages)) + + pm, ok := messages[0].(*protoproducer.ProtoProducerMessage) + assert.True(t, ok) + assert.Equal(t, flowpb.FlowMessage_NETFLOW_V9, pm.Type) + assert.Equal(t, uint64(1), pm.Packets) + assert.Equal(t, uint32(256), pm.ObservationDomainId) + assert.Equal(t, uint32(838987416), pm.SequenceNum) + +} diff --git a/receiver/netflowreceiver/receiver.go b/receiver/netflowreceiver/receiver.go index d524d976ee13..2bbd302665d8 100644 --- a/receiver/netflowreceiver/receiver.go +++ b/receiver/netflowreceiver/receiver.go @@ -89,7 +89,6 @@ func (nr *netflowReceiver) Shutdown(context.Context) error { if err != nil { nr.logger.Warn("Error stopping UDP receiver", zap.Error(err)) } - return nil } diff --git a/receiver/netflowreceiver/testdata/config.yaml b/receiver/netflowreceiver/testdata/config.yaml index cbc5f657b145..467ed8fefaa0 100644 --- a/receiver/netflowreceiver/testdata/config.yaml +++ b/receiver/netflowreceiver/testdata/config.yaml @@ -14,3 +14,37 @@ netflow/invalid_port: sockets: 1 workers: 1 port: 0 + +netflow/zero_sockets: + scheme: netflow + port: 2055 + sockets: 0 + workers: 1 + +netflow/zero_workers: + scheme: netflow + port: 2055 + sockets: 1 + workers: 0 + +netflow/zero_queue: + scheme: netflow + port: 2055 + sockets: 1 + workers: 1 + queue_size: 0 + +netflow/sflow: + scheme: sflow + port: 2055 + sockets: 1 + workers: 1 + queue_size: 0 + +netflow/flow: + scheme: flow + port: 2055 + sockets: 1 + workers: 1 + queue_size: 0 + From 0a50dbb360585ddfea46a85aa541fe45fe2cd0a2 Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Mon, 16 Dec 2024 15:48:45 -0600 Subject: [PATCH 07/30] netflow - add test to check for udp receiver creation --- receiver/netflowreceiver/receiver_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/receiver/netflowreceiver/receiver_test.go b/receiver/netflowreceiver/receiver_test.go index ad37d7feebf0..1f96f96e507d 100644 --- a/receiver/netflowreceiver/receiver_test.go +++ b/receiver/netflowreceiver/receiver_test.go @@ -19,6 +19,5 @@ func TestCreateValidDefaultReceiver(t *testing.T) { receiver, err := factory.CreateLogs(context.Background(), set, cfg, consumertest.NewNop()) assert.NoError(t, err, "receiver creation failed") assert.NotNil(t, receiver, "receiver creation failed") - // TODO - Will be added on the following PR - // assert.NotNil(t, "sflow", receiver.(*netflowReceiver).listeners[0].recv) + assert.NotNil(t, receiver.(*netflowReceiver).udpReceiver) } From dd6a0068f72bf26c7dfcc69230b0970b725c515f Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Mon, 16 Dec 2024 17:12:50 -0600 Subject: [PATCH 08/30] netflow - gofmt --- receiver/netflowreceiver/producer_test.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/receiver/netflowreceiver/producer_test.go b/receiver/netflowreceiver/producer_test.go index d9d3648f918b..07c4859e4737 100644 --- a/receiver/netflowreceiver/producer_test.go +++ b/receiver/netflowreceiver/producer_test.go @@ -13,7 +13,6 @@ import ( ) func TestProduce(t *testing.T) { - // list of netflow.DataFlowSet message := &netflow.NFv9Packet{ Version: 9, @@ -29,15 +28,16 @@ func TestProduce(t *testing.T) { Length: 1372, }, Records: []netflow.DataRecord{ - {Values: []netflow.DataField{ - { - PenProvided: false, - Type: 2, - Pen: 0, - Value: []uint8{0x00, 0x00, 0x00, 0x01}, + { + Values: []netflow.DataField{ + { + PenProvided: false, + Type: 2, + Pen: 0, + Value: []uint8{0x00, 0x00, 0x00, 0x01}, + }, }, }, - }, }, }, }, @@ -62,5 +62,4 @@ func TestProduce(t *testing.T) { assert.Equal(t, uint64(1), pm.Packets) assert.Equal(t, uint32(256), pm.ObservationDomainId) assert.Equal(t, uint32(838987416), pm.SequenceNum) - } From 38542699682ab21bf974287a189a6aa0ada10ee2 Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Mon, 16 Dec 2024 17:44:54 -0600 Subject: [PATCH 09/30] netflow - make checks --- receiver/netflowreceiver/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/netflowreceiver/go.mod b/receiver/netflowreceiver/go.mod index 33424bb42fcc..38c26e50542e 100644 --- a/receiver/netflowreceiver/go.mod +++ b/receiver/netflowreceiver/go.mod @@ -10,6 +10,7 @@ require ( go.opentelemetry.io/collector/confmap v1.22.0 go.opentelemetry.io/collector/consumer v1.22.0 go.opentelemetry.io/collector/consumer/consumertest v0.116.0 + go.opentelemetry.io/collector/pdata v1.22.0 go.opentelemetry.io/collector/receiver v0.116.0 go.opentelemetry.io/collector/receiver/receivertest v0.116.0 go.uber.org/goleak v1.3.0 @@ -36,7 +37,6 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.116.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.116.0 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.116.0 // indirect - go.opentelemetry.io/collector/pdata v1.22.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.116.0 // indirect go.opentelemetry.io/collector/pipeline v0.116.0 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.116.0 // indirect From 0fe206ee649ca48692c33060005f7d863bf471a0 Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Mon, 16 Dec 2024 18:03:21 -0600 Subject: [PATCH 10/30] netflowreceiver - add changelog for implementation --- .../netflow-receiver-implementation.yaml | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/netflow-receiver-implementation.yaml diff --git a/.chloggen/netflow-receiver-implementation.yaml b/.chloggen/netflow-receiver-implementation.yaml new file mode 100644 index 000000000000..304f564d25c8 --- /dev/null +++ b/.chloggen/netflow-receiver-implementation.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: new_component + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: netflowreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds the implementation of the netflow receiver + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32732, 97279] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Implement the UDP listener and the producer, as well as adding tests. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] From d7c078206d462baafe5c744cce737d75b8b93dbe Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Mon, 16 Dec 2024 18:12:39 -0600 Subject: [PATCH 11/30] netflow - fix license header --- receiver/netflowreceiver/parser.go | 3 +++ receiver/netflowreceiver/parser_test.go | 3 +++ receiver/netflowreceiver/producer.go | 3 +++ receiver/netflowreceiver/producer_test.go | 3 +++ 4 files changed, 12 insertions(+) diff --git a/receiver/netflowreceiver/parser.go b/receiver/netflowreceiver/parser.go index 4004608b2431..cc050d95d2dc 100644 --- a/receiver/netflowreceiver/parser.go +++ b/receiver/netflowreceiver/parser.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver" import ( diff --git a/receiver/netflowreceiver/parser_test.go b/receiver/netflowreceiver/parser_test.go index 16259e4fbc79..f397a8858406 100644 --- a/receiver/netflowreceiver/parser_test.go +++ b/receiver/netflowreceiver/parser_test.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package netflowreceiver import ( diff --git a/receiver/netflowreceiver/producer.go b/receiver/netflowreceiver/producer.go index 48f9db6fa75b..a5f3b060bb8e 100644 --- a/receiver/netflowreceiver/producer.go +++ b/receiver/netflowreceiver/producer.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver" import ( diff --git a/receiver/netflowreceiver/producer_test.go b/receiver/netflowreceiver/producer_test.go index 07c4859e4737..ef4ad77c313c 100644 --- a/receiver/netflowreceiver/producer_test.go +++ b/receiver/netflowreceiver/producer_test.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package netflowreceiver import ( From b343ab429282455972425baa59aa7392c8f81dc4 Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Mon, 16 Dec 2024 18:38:11 -0600 Subject: [PATCH 12/30] netflowreceiver - lint --- receiver/netflowreceiver/producer.go | 19 +++++++++---------- receiver/netflowreceiver/producer_test.go | 4 ++-- receiver/netflowreceiver/receiver.go | 22 ++++++++++------------ 3 files changed, 21 insertions(+), 24 deletions(-) diff --git a/receiver/netflowreceiver/producer.go b/receiver/netflowreceiver/producer.go index a5f3b060bb8e..58fe890990e1 100644 --- a/receiver/netflowreceiver/producer.go +++ b/receiver/netflowreceiver/producer.go @@ -22,7 +22,7 @@ type OtelLogsProducerWrapper struct { } // Produce converts the message into a list log records and sends them to log consumer -func (o *OtelLogsProducerWrapper) Produce(msg interface{}, args *producer.ProduceArgs) ([]producer.ProducerMessage, error) { +func (o *OtelLogsProducerWrapper) Produce(msg any, args *producer.ProduceArgs) ([]producer.ProducerMessage, error) { // First we let the proto producer parse the message // All the netflow protocol and structure is handled by the proto producer flowMessageSet, err := o.wrapped.Produce(msg, args) @@ -39,10 +39,9 @@ func (o *OtelLogsProducerWrapper) Produce(msg interface{}, args *producer.Produc // A single netflow packet can contain multiple flow messages for _, msg := range flowMessageSet { - // Convert each one to the Otel semantic dictionary format - otelMessage, err := convertToOtel(msg) - if err != nil { + otelMessage, innerErr := convertToOtel(msg) + if innerErr != nil { continue } @@ -51,21 +50,21 @@ func (o *OtelLogsProducerWrapper) Produce(msg interface{}, args *producer.Produc logRecord.SetTimestamp(pcommon.NewTimestampFromTime(otelMessage.Flow.TimeReceived)) // The bytes of the message in JSON format - m, err := json.Marshal(otelMessage) - if err != nil { + m, innerErr := json.Marshal(otelMessage) + if innerErr != nil { continue } // Convert to a map[string] // https://opentelemetry.io/docs/specs/otel/logs/data-model/#type-mapstring-any - sec := map[string]interface{}{} - if err = json.Unmarshal(m, &sec); err != nil { + sec := map[string]any{} + if innerErr = json.Unmarshal(m, &sec); innerErr != nil { continue } // Set the map to the log record body - err = logRecord.Body().SetEmptyMap().FromRaw(sec) - if err != nil { + innerErr = logRecord.Body().SetEmptyMap().FromRaw(sec) + if innerErr != nil { continue } } diff --git a/receiver/netflowreceiver/producer_test.go b/receiver/netflowreceiver/producer_test.go index ef4ad77c313c..cb1419b37ad2 100644 --- a/receiver/netflowreceiver/producer_test.go +++ b/receiver/netflowreceiver/producer_test.go @@ -24,7 +24,7 @@ func TestProduce(t *testing.T) { UnixSeconds: 0x618aa3a8, SequenceNumber: 838987416, SourceId: 256, - FlowSets: []interface{}{ + FlowSets: []any{ netflow.DataFlowSet{ FlowSetHeader: netflow.FlowSetHeader{ Id: 260, @@ -57,7 +57,7 @@ func TestProduce(t *testing.T) { messages, err := otelLogsProducer.Produce(message, &producer.ProduceArgs{}) require.NoError(t, err) require.NotNil(t, messages) - assert.Equal(t, 1, len(messages)) + assert.Len(t, messages, 1) pm, ok := messages[0].(*protoproducer.ProtoProducerMessage) assert.True(t, ok) diff --git a/receiver/netflowreceiver/receiver.go b/receiver/netflowreceiver/receiver.go index 2bbd302665d8..01257f398b9a 100644 --- a/receiver/netflowreceiver/receiver.go +++ b/receiver/netflowreceiver/receiver.go @@ -118,13 +118,14 @@ func (nr *netflowReceiver) buildDecodeFunc() (utils.DecoderFunc, error) { var decodeFunc utils.DecoderFunc var p utils.FlowPipe - if nr.config.Scheme == "sflow" { + switch nr.config.Scheme { + case "sflow": p = utils.NewSFlowPipe(cfgPipe) - } else if nr.config.Scheme == "netflow" { + case "netflow": p = utils.NewNetFlowPipe(cfgPipe) - } else if nr.config.Scheme == "flow" { + case "flow": p = utils.NewFlowPipe(cfgPipe) - } else { + default: return nil, fmt.Errorf("scheme does not exist: %s", nr.config.Scheme) } @@ -141,23 +142,20 @@ func (nr *netflowReceiver) buildDecodeFunc() (utils.DecoderFunc, error) { // We don't want the receiver to stop if there is an error processing a packet func (nr *netflowReceiver) handleErrors() { for err := range nr.udpReceiver.Errors() { - // Receiver was closed, exit - if errors.Is(err, net.ErrClosed) { + switch { + case errors.Is(err, net.ErrClosed): nr.logger.Info("UDP receiver closed, exiting error handler") return - // This is the type of error we want to log - } else if !errors.Is(err, netflow.ErrorTemplateNotFound) && !errors.Is(err, debug.PanicError) { + case !errors.Is(err, netflow.ErrorTemplateNotFound) && !errors.Is(err, debug.PanicError): nr.logger.Error("receiver error", zap.Error(err)) continue - // A template was not found, this is not a big deal and can happen - } else if errors.Is(err, netflow.ErrorTemplateNotFound) { + case errors.Is(err, netflow.ErrorTemplateNotFound): nr.logger.Warn("template was not found for this message") continue - // These are pretty bad and should not happen - } else if errors.Is(err, debug.PanicError) { + case errors.Is(err, debug.PanicError): var pErrMsg *debug.PanicErrorMessage if errors.As(err, &pErrMsg) { nr.logger.Error("panic error", zap.String("panic", pErrMsg.Inner)) From 5e178ac87d6dc2b8b7a1a6a6566556988087f54c Mon Sep 17 00:00:00 2001 From: David Lopes Date: Thu, 2 Jan 2025 12:26:30 -0600 Subject: [PATCH 13/30] netflow - update .chloggen/netflow-receiver-implementation.yaml Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> --- .chloggen/netflow-receiver-implementation.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/netflow-receiver-implementation.yaml b/.chloggen/netflow-receiver-implementation.yaml index 304f564d25c8..8f852f843b12 100644 --- a/.chloggen/netflow-receiver-implementation.yaml +++ b/.chloggen/netflow-receiver-implementation.yaml @@ -1,7 +1,7 @@ # Use this changelog template to create an entry for release notes. # One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' -change_type: new_component +change_type: enhancement # The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) component: netflowreceiver From ebfa73d32c6fac862288c9584207b6b75fe2c83b Mon Sep 17 00:00:00 2001 From: David Lopes Date: Thu, 2 Jan 2025 12:27:02 -0600 Subject: [PATCH 14/30] netflow - remove scaffold issue Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> --- .chloggen/netflow-receiver-implementation.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/netflow-receiver-implementation.yaml b/.chloggen/netflow-receiver-implementation.yaml index 8f852f843b12..a7a0d7f718bf 100644 --- a/.chloggen/netflow-receiver-implementation.yaml +++ b/.chloggen/netflow-receiver-implementation.yaml @@ -10,7 +10,7 @@ component: netflowreceiver note: Adds the implementation of the netflow receiver # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [32732, 97279] +issues: [32732] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. From 86701b9d20acdc2511187638ad446fa958de4734 Mon Sep 17 00:00:00 2001 From: David Lopes Date: Thu, 2 Jan 2025 12:28:28 -0600 Subject: [PATCH 15/30] netflow - add comment about default queue size value Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> --- receiver/netflowreceiver/factory.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/receiver/netflowreceiver/factory.go b/receiver/netflowreceiver/factory.go index a77f52716508..6f65bc466747 100644 --- a/receiver/netflowreceiver/factory.go +++ b/receiver/netflowreceiver/factory.go @@ -16,6 +16,9 @@ import ( const ( defaultSockets = 1 defaultWorkers = 2 + // The default UDP packet buffer size in GoFlow2 is 9000 bytes, which means + // that for a full queue of 1000 messages, the size in memory will be 9MB. + // Source: https://github.com/netsampler/goflow2/blob/v2.2.1/README.md#security-notes-and-assumptions defaultQueueSize = 1_000 ) From 52bca594dce86e7c9e25aa54504b1224e5034148 Mon Sep 17 00:00:00 2001 From: David Lopes Date: Thu, 2 Jan 2025 12:28:56 -0600 Subject: [PATCH 16/30] netflow - fix duplicate test case Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> --- receiver/netflowreceiver/config_test.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/receiver/netflowreceiver/config_test.go b/receiver/netflowreceiver/config_test.go index 4afc38ce68ee..347282f2404f 100644 --- a/receiver/netflowreceiver/config_test.go +++ b/receiver/netflowreceiver/config_test.go @@ -49,16 +49,6 @@ func TestLoadConfig(t *testing.T) { QueueSize: 1000, }, }, - { - id: component.NewIDWithName(metadata.Type, "zero_queue"), - expected: &Config{ - Scheme: "netflow", - Port: 2055, - Sockets: 1, - Workers: 1, - QueueSize: 1000, - }, - }, { id: component.NewIDWithName(metadata.Type, "sflow"), expected: &Config{ From edbbbc03ecc868926ce3bd7fafa823618fef200e Mon Sep 17 00:00:00 2001 From: David Lopes Date: Thu, 2 Jan 2025 12:29:45 -0600 Subject: [PATCH 17/30] netflow - fix typo Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> --- receiver/netflowreceiver/receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/netflowreceiver/receiver.go b/receiver/netflowreceiver/receiver.go index 01257f398b9a..914b0e83c73b 100644 --- a/receiver/netflowreceiver/receiver.go +++ b/receiver/netflowreceiver/receiver.go @@ -131,7 +131,7 @@ func (nr *netflowReceiver) buildDecodeFunc() (utils.DecoderFunc, error) { decodeFunc = p.DecodeFlow - // We wrap panics while decoding the message to habndle them later + // We wrap panics while decoding the message to handle them later decodeFunc = debug.PanicDecoderWrapper(decodeFunc) return decodeFunc, nil From d68e82628f7072b980170637da64f98fecb2f097 Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Tue, 7 Jan 2025 19:54:24 -0600 Subject: [PATCH 18/30] netflow - better error handling messages --- receiver/netflowreceiver/receiver.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/receiver/netflowreceiver/receiver.go b/receiver/netflowreceiver/receiver.go index 914b0e83c73b..7c1676c82e8e 100644 --- a/receiver/netflowreceiver/receiver.go +++ b/receiver/netflowreceiver/receiver.go @@ -148,21 +148,20 @@ func (nr *netflowReceiver) handleErrors() { return case !errors.Is(err, netflow.ErrorTemplateNotFound) && !errors.Is(err, debug.PanicError): - nr.logger.Error("receiver error", zap.Error(err)) + nr.logger.Error("received a generic error while processing a flow message via GoFlow2 for the netflow receiver", zap.Error(err)) continue case errors.Is(err, netflow.ErrorTemplateNotFound): - nr.logger.Warn("template was not found for this message") + nr.logger.Warn("we could not find a template for a flow message, this error is expected from time to time until the device sends a template", zap.Error(err)) continue case errors.Is(err, debug.PanicError): var pErrMsg *debug.PanicErrorMessage if errors.As(err, &pErrMsg) { - nr.logger.Error("panic error", zap.String("panic", pErrMsg.Inner)) - nr.logger.Error("receiver stacktrace", zap.String("stack", string(pErrMsg.Stacktrace))) - nr.logger.Error("receiver msg", zap.Any("error", pErrMsg.Msg)) + nr.logger.Error("unexpected error found decoding a flow message via GoFlow2 for the netflow receiver", zap.Any("error", pErrMsg.Msg)) + } else { + nr.logger.Error("could not process a flow message, received an error from GoFlow2, this is a netflow receiver error", zap.Error(err)) } - nr.logger.Error("receiver panic", zap.Error(err)) continue } } From 053543c7dffedf61df021a0e4b65dceafbca9a09 Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Tue, 7 Jan 2025 23:32:18 -0600 Subject: [PATCH 19/30] netflow - use a simple log record instead of struct Use the semconv constants for the attribute keys where applicable --- receiver/netflowreceiver/README.md | 55 +++-- receiver/netflowreceiver/go.mod | 1 + receiver/netflowreceiver/go.sum | 2 + receiver/netflowreceiver/parser.go | 285 +++++++++++++++++------- receiver/netflowreceiver/parser_test.go | 98 ++++---- receiver/netflowreceiver/producer.go | 27 +-- 6 files changed, 286 insertions(+), 182 deletions(-) diff --git a/receiver/netflowreceiver/README.md b/receiver/netflowreceiver/README.md index 30443c6d1199..2370c33d11b4 100644 --- a/receiver/netflowreceiver/README.md +++ b/receiver/netflowreceiver/README.md @@ -70,33 +70,30 @@ You would then configure your network devices to send netflow, sflow, or ipfix d ## Data format -The netflow data is standardized for the different schemas and is converted to OpenTelemetry logs following the [semantic conventions](https://opentelemetry.io/docs/specs/semconv/general/attributes/#server-client-and-shared-network-attributes) - -The output will adhere the format: - -```json -{ - "destination": { - "address": "192.168.0.1", - "port": 22 - }, - "flow": { - "end": 1731073104662487000, - "sampler_address": "192.168.0.2", - "sequence_num": 49, - "start": 1731073077662487000, - "time_received": 1731073138662487000, - "type": "NETFLOW_V5" - }, - "io": { - "bytes": 529, - "packets": 378 - }, - "source": { - "address": "192.168.0.3", - "port": 40 - }, - "transport": "TCP", - "type": "IPv4" -} +The netflow data is standardized for the different schemas and is converted to OpenTelemetry log records following the [semantic conventions](https://opentelemetry.io/docs/specs/semconv/general/attributes/#server-client-and-shared-network-attributes) + +The log record will have the following attributes: + +``` +ObservedTimestamp: 2025-01-08 04:14:49.8308464 +0000 UTC +Timestamp: 2025-01-08 04:14:49.918929427 +0000 UTC +SeverityText: +SeverityNumber: Unspecified(0) +Body: Empty() +Attributes: + -> source.address: Str(132.189.238.100) + -> source.port: Int(1255) + -> destination.address: Str(241.171.33.110) + -> destination.port: Int(64744) + -> network.transport: Str(tcp) + -> network.type: Str(ipv4) + -> network.io.bytes: Int(853) + -> network.io.packets: Int(83) + -> network.flow.type: Str(netflow_v5) + -> network.flow.sequence_num: Int(191) + -> network.flow.time_received: Int(1736309689918929427) + -> network.flow.start: Int(1736309689830846400) + -> network.flow.end: Int(1736309689871846400) + -> network.flow.sampling_rate: Int(0) + -> network.flow.sampler_address: Str(172.28.176.1) ``` diff --git a/receiver/netflowreceiver/go.mod b/receiver/netflowreceiver/go.mod index 5df281fb2ae0..f4a08da8fe85 100644 --- a/receiver/netflowreceiver/go.mod +++ b/receiver/netflowreceiver/go.mod @@ -13,6 +13,7 @@ require ( go.opentelemetry.io/collector/pdata v1.22.1-0.20241220212031-7c2639723f67 go.opentelemetry.io/collector/receiver v0.116.1-0.20241220212031-7c2639723f67 go.opentelemetry.io/collector/receiver/receivertest v0.116.1-0.20241220212031-7c2639723f67 + go.opentelemetry.io/collector/semconv v0.117.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 ) diff --git a/receiver/netflowreceiver/go.sum b/receiver/netflowreceiver/go.sum index bd6f4c2d5d2b..8366e9772b74 100644 --- a/receiver/netflowreceiver/go.sum +++ b/receiver/netflowreceiver/go.sum @@ -84,6 +84,8 @@ go.opentelemetry.io/collector/receiver/receivertest v0.116.1-0.20241220212031-7c go.opentelemetry.io/collector/receiver/receivertest v0.116.1-0.20241220212031-7c2639723f67/go.mod h1:B2EVj9VPLn484MngVq/53+XRv2fFBa/kXL3K2aum5pc= go.opentelemetry.io/collector/receiver/xreceiver v0.116.1-0.20241220212031-7c2639723f67 h1:bSP9NT4CF6Jw0PHtL3tsVt2/HoS00hHRhVIyxG6t2kY= go.opentelemetry.io/collector/receiver/xreceiver v0.116.1-0.20241220212031-7c2639723f67/go.mod h1:TzpQxAe+ZDfYjkww0L0lVoJ17pnieUOR4KHRk0shXYM= +go.opentelemetry.io/collector/semconv v0.117.0 h1:SavOvSbHPVD/QdAnXlI/cMca+yxCNyXStY1mQzerHs4= +go.opentelemetry.io/collector/semconv v0.117.0/go.mod h1:N6XE8Q0JKgBN2fAhkUQtqK9LT7rEGR6+Wu/Rtbal1iI= go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= diff --git a/receiver/netflowreceiver/parser.go b/receiver/netflowreceiver/parser.go index cc050d95d2dc..e5e109d1109d 100644 --- a/receiver/netflowreceiver/parser.go +++ b/receiver/netflowreceiver/parser.go @@ -10,91 +10,206 @@ import ( "github.com/netsampler/goflow2/v2/producer" protoproducer "github.com/netsampler/goflow2/v2/producer/proto" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + semconv "go.opentelemetry.io/collector/semconv/v1.27.0" ) var ( - etypeName = map[uint32]string{ - 0x806: "ARP", - 0x800: "IPv4", - 0x86dd: "IPv6", + + // https://en.wikipedia.org/wiki/EtherType + etypeNames = map[uint32]string{ + 0x806: "arp", + 0x800: "ipv4", + 0x86dd: "ipv6", } - protoName = map[uint32]string{ - 1: "ICMP", - 6: "TCP", - 17: "UDP", - 58: "ICMPv6", - 132: "SCTP", + + // https://en.wikipedia.org/wiki/List_of_IP_protocol_numbers + transportProtocolNames = map[uint32]string{ + 0: "hopopt", + 1: "icmp", + 2: "igmp", + 3: "ggp", + 4: "ipv4", + 5: "st", + 6: "tcp", + 7: "cbt", + 8: "egp", + 9: "igp", + 10: "bbn-rcc-mon", + 11: "nvp-ii", + 12: "pup", + 13: "argus", + 14: "emcon", + 15: "xnet", + 16: "chaos", + 17: "udp", + 18: "mux", + 19: "dcn-meas", + 20: "hmp", + 21: "prm", + 22: "xns-idp", + 23: "trunk-1", + 24: "trunk-2", + 25: "leaf-1", + 26: "leaf-2", + 27: "rdp", + 28: "irtp", + 29: "iso-tp4", + 30: "netblt", + 31: "mfe-nsp", + 32: "merit-inp", + 33: "dccp", + 34: "3pc", + 35: "idpr", + 36: "xtp", + 37: "ddp", + 38: "idpr-cmtp", + 39: "tp++", + 40: "il", + 41: "ipv6", + 42: "sdrp", + 43: "ipv6-route", + 44: "ipv6-frag", + 45: "idrp", + 46: "rsvp", + 47: "gre", + 48: "dsr", + 49: "bna", + 50: "esp", + 51: "ah", + 52: "i-nlsp", + 53: "swipe", + 54: "narp", + 55: "min-ipv4", + 56: "tlsp", + 57: "skip", + 58: "ipv6-icmp", + 59: "ipv6-nonxt", + 60: "ipv6-opts", + 61: "any-host-internal-protocol", + 62: "cftp", + 63: "any-local-network", + 64: "sat-expak", + 65: "kryptolan", + 66: "rvd", + 67: "ippc", + 68: "any-distributed-file-system", + 69: "sat-mon", + 70: "visa", + 71: "ipcv", + 72: "cpnx", + 73: "cphb", + 74: "wsn", + 75: "pvp", + 76: "br-sat-mon", + 77: "sun-nd", + 78: "wb-mon", + 79: "wb-expak", + 80: "iso-ip", + 81: "vmtp", + 82: "secure-vmtp", + 83: "vines", + 84: "iptm", + 85: "nsfnet-igp", + 86: "dgp", + 87: "tcf", + 88: "eigrp", + 89: "ospfigp", + 90: "sprite-rpc", + 91: "larp", + 92: "mtp", + 93: "ax.25", + 94: "ipip", + 95: "micp", + 96: "scc-sp", + 97: "etherip", + 98: "encap", + 99: "any-private-encryption-scheme", + 100: "gmtp", + 101: "ifmp", + 102: "pnni", + 103: "pim", + 104: "aris", + 105: "scps", + 106: "qnx", + 107: "a/n", + 108: "ipcomp", + 109: "snp", + 110: "compaq-peer", + 111: "ipx-in-ip", + 112: "vrrp", + 113: "pgm", + 114: "any-0-hop-protocol", + 115: "l2tp", + 116: "ddx", + 117: "iatp", + 118: "stp", + 119: "srp", + 120: "uti", + 121: "smp", + 122: "sm", + 123: "ptp", + 124: "isis over ipv4", + 125: "fire", + 126: "crtp", + 127: "crudp", + 128: "sscopmce", + 129: "iplt", + 130: "sps", + 131: "pipe", + 132: "sctp", + 133: "fc", + 134: "rsvp-e2e-ignore", + 135: "mobility header", + 136: "udplite", + 137: "mpls-in-ip", + 138: "manet", + 139: "hip", + 140: "shim6", + 141: "wesp", + 142: "rohc", + 143: "ethernet", + 144: "aggfrag", + 145: "nsh", } - flowTypeName = map[int32]string{ - 0: "UNKNOWN", - 1: "SFLOW_5", - 2: "NETFLOW_V5", - 3: "NETFLOW_V9", - 4: "IPFIX", + flowTypeNames = map[int32]string{ + 0: "unknown", + 1: "sflow_5", + 2: "netflow_v5", + 3: "netflow_v9", + 4: "ipfix", } ) -type NetworkAddress struct { - Address string `json:"address,omitempty"` - Port uint32 `json:"port,omitempty"` -} - -type Flow struct { - Type string `json:"type,omitempty"` - TimeReceived time.Time `json:"time_received,omitempty"` - Start time.Time `json:"start,omitempty"` - End time.Time `json:"end,omitempty"` - SequenceNum uint32 `json:"sequence_num,omitempty"` - SamplingRate uint64 `json:"sampling_rate,omitempty"` - SamplerAddress string `json:"sampler_address,omitempty"` -} - -type Protocol struct { - Name []byte `json:"name,omitempty"` // Layer 7 -} - -type NetworkIO struct { - Bytes uint64 `json:"bytes,omitempty"` - Packets uint64 `json:"packets,omitempty"` -} - -type OtelNetworkMessage struct { - Source NetworkAddress `json:"source,omitempty"` - Destination NetworkAddress `json:"destination,omitempty"` - Transport string `json:"transport,omitempty"` // Layer 4 - Type string `json:"type,omitempty"` // Layer 3 - IO NetworkIO `json:"io,omitempty"` - Flow Flow `json:"flow,omitempty"` -} - func getEtypeName(etype uint32) string { - if name, ok := etypeName[etype]; ok { + if name, ok := etypeNames[etype]; ok { return name } return "unknown" } -func getProtoName(proto uint32) string { - if name, ok := protoName[proto]; ok { +func getTransportName(proto uint32) string { + if name, ok := transportProtocolNames[proto]; ok { return name } return "unknown" } func getFlowTypeName(flowType int32) string { - if name, ok := flowTypeName[flowType]; ok { + if name, ok := flowTypeNames[flowType]; ok { return name } return "unknown" } -// convertToOtel converts a ProtoProducerMessage to an OtelNetworkMessage -func convertToOtel(m producer.ProducerMessage) (*OtelNetworkMessage, error) { +// addMessageAttributes parses the message attributes and adds them to the log record +func addMessageAttributes(m producer.ProducerMessage, r *plog.LogRecord) error { // we know msg is ProtoProducerMessage because that is the parent producer pm, ok := m.(*protoproducer.ProtoProducerMessage) if !ok { - return nil, errors.New("message is not ProtoProducerMessage") + return errors.New("this flow message is not ProtoProducerMessage, this is not expected") } // Parse IP addresses bytes to netip.Addr @@ -105,35 +220,33 @@ func convertToOtel(m producer.ProducerMessage) (*OtelNetworkMessage, error) { // Time the receiver received the message receivedTime := time.Unix(0, int64(pm.TimeReceivedNs)) startTime := time.Unix(0, int64(pm.TimeFlowStartNs)) - endTime := time.Unix(0, int64(pm.TimeFlowEndNs)) - - // Construct the actual log record based on the otel semantic conventions - // see https://opentelemetry.io/docs/specs/semconv/general/attributes/ - otelMessage := OtelNetworkMessage{ - Source: NetworkAddress{ - Address: srcAddr.String(), - Port: pm.SrcPort, - }, - Destination: NetworkAddress{ - Address: dstAddr.String(), - Port: pm.DstPort, - }, - Type: getEtypeName(pm.Etype), // Layer 3 - Transport: getProtoName(pm.Proto), // Layer 4 - IO: NetworkIO{ - Bytes: pm.Bytes, - Packets: pm.Packets, - }, - Flow: Flow{ - Type: getFlowTypeName(int32(pm.Type)), - TimeReceived: receivedTime, - Start: startTime, - End: endTime, - SequenceNum: pm.SequenceNum, - SamplingRate: pm.SamplingRate, - SamplerAddress: samplerAddr.String(), - }, - } - return &otelMessage, nil + r.SetObservedTimestamp(pcommon.NewTimestampFromTime(startTime)) + r.SetTimestamp(pcommon.NewTimestampFromTime(receivedTime)) + + // Source and destination attributes + r.Attributes().PutStr(semconv.AttributeSourceAddress, srcAddr.String()) + r.Attributes().PutInt(semconv.AttributeSourcePort, int64(pm.SrcPort)) + r.Attributes().PutStr(semconv.AttributeDestinationAddress, dstAddr.String()) + r.Attributes().PutInt(semconv.AttributeDestinationPort, int64(pm.DstPort)) + + // Network attributes + r.Attributes().PutStr(semconv.AttributeNetworkTransport, getTransportName(pm.Proto)) + r.Attributes().PutStr(semconv.AttributeNetworkType, getEtypeName(pm.Etype)) + + // There is no semconv as of today for these + // Network IO attributes + r.Attributes().PutInt("network.io.bytes", int64(pm.Bytes)) + r.Attributes().PutInt("network.io.packets", int64(pm.Packets)) + + // Flow attributes + r.Attributes().PutStr("network.flow.type", getFlowTypeName(int32(pm.Type))) + r.Attributes().PutInt("network.flow.sequence_num", int64(pm.SequenceNum)) + r.Attributes().PutInt("network.flow.time_received", int64(pm.TimeReceivedNs)) + r.Attributes().PutInt("network.flow.start", int64(pm.TimeFlowStartNs)) + r.Attributes().PutInt("network.flow.end", int64(pm.TimeFlowEndNs)) + r.Attributes().PutInt("network.flow.sampling_rate", int64(pm.SamplingRate)) + r.Attributes().PutStr("network.flow.sampler_address", samplerAddr.String()) + + return nil } diff --git a/receiver/netflowreceiver/parser_test.go b/receiver/netflowreceiver/parser_test.go index f397a8858406..91e444f98c30 100644 --- a/receiver/netflowreceiver/parser_test.go +++ b/receiver/netflowreceiver/parser_test.go @@ -6,11 +6,13 @@ package netflowreceiver import ( "net/netip" "testing" - "time" flowpb "github.com/netsampler/goflow2/v2/pb" protoproducer "github.com/netsampler/goflow2/v2/producer/proto" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + semconv "go.opentelemetry.io/collector/semconv/v1.27.0" ) func TestGetProtoName(t *testing.T) { @@ -18,17 +20,18 @@ func TestGetProtoName(t *testing.T) { proto uint32 want string }{ - {proto: 1, want: "ICMP"}, - {proto: 6, want: "TCP"}, - {proto: 17, want: "UDP"}, - {proto: 58, want: "ICMPv6"}, - {proto: 132, want: "SCTP"}, - {proto: 0, want: "unknown"}, + {proto: 1, want: "icmp"}, + {proto: 6, want: "tcp"}, + {proto: 17, want: "udp"}, + {proto: 58, want: "ipv6-icmp"}, + {proto: 132, want: "sctp"}, + {proto: 0, want: "hopopt"}, + {proto: 400, want: "unknown"}, } for _, tt := range tests { t.Run(tt.want, func(t *testing.T) { - got := getProtoName(tt.proto) + got := getTransportName(tt.proto) if got != tt.want { t.Errorf("getProtoName(%d) = %s; want %s", tt.proto, got, tt.want) } @@ -57,49 +60,60 @@ func TestConvertToOtel(t *testing.T) { }, } - otel, err := convertToOtel(pm) + record := plog.NewLogRecord() + err := addMessageAttributes(pm, &record) if err != nil { - t.Errorf("convertToOtel() error = %v", err) + t.Errorf("TestConvertToOtel() error = %v", err) return } - assert.Equal(t, "192.168.1.1", otel.Source.Address) - assert.Equal(t, uint32(0), otel.Source.Port) - assert.Equal(t, "192.168.1.2", otel.Destination.Address) - assert.Equal(t, uint32(2055), otel.Destination.Port) - assert.Equal(t, uint64(100), otel.IO.Bytes) - assert.Equal(t, uint64(1), otel.IO.Packets) - assert.Equal(t, "TCP", otel.Transport) - assert.Equal(t, "IPv4", otel.Type) - assert.Equal(t, "NETFLOW_V9", otel.Flow.Type) - assert.Equal(t, "192.168.1.100", otel.Flow.SamplerAddress) - assert.Equal(t, uint32(1), otel.Flow.SequenceNum) - assert.Equal(t, uint64(1), otel.Flow.SamplingRate) - assert.Equal(t, time.Unix(0, 1000000000), otel.Flow.Start) - assert.Equal(t, time.Unix(0, 1000000100), otel.Flow.End) - assert.Equal(t, time.Unix(0, 1000000000), otel.Flow.TimeReceived) + + expectedAttributes := pcommon.NewMap() + expectedAttributes.PutStr(semconv.AttributeSourceAddress, "192.168.1.1") + expectedAttributes.PutInt(semconv.AttributeSourcePort, 0) + expectedAttributes.PutStr(semconv.AttributeDestinationAddress, "192.168.1.2") + expectedAttributes.PutInt(semconv.AttributeDestinationPort, 2055) + expectedAttributes.PutStr(semconv.AttributeNetworkTransport, getTransportName(6)) + expectedAttributes.PutStr(semconv.AttributeNetworkType, getEtypeName(0x800)) + expectedAttributes.PutInt("network.io.bytes", 100) + expectedAttributes.PutInt("network.io.packets", 1) + expectedAttributes.PutStr("network.flow.type", getFlowTypeName(3)) + expectedAttributes.PutInt("network.flow.sequence_num", 1) + expectedAttributes.PutInt("network.flow.time_received", 1000000000) + expectedAttributes.PutInt("network.flow.start", 1000000000) + expectedAttributes.PutInt("network.flow.end", 1000000100) + expectedAttributes.PutInt("network.flow.sampling_rate", 1) + expectedAttributes.PutStr("network.flow.sampler_address", "192.168.1.100") + + assert.Equal(t, expectedAttributes, record.Attributes()) + } func TestEmptyConvertToOtel(t *testing.T) { pm := &protoproducer.ProtoProducerMessage{} - otel, err := convertToOtel(pm) + record := plog.NewLogRecord() + err := addMessageAttributes(pm, &record) if err != nil { - t.Errorf("convertToOtel() error = %v", err) + t.Errorf("TestConvertToOtel() error = %v", err) return } - assert.Equal(t, "invalid IP", otel.Source.Address) - assert.Equal(t, uint32(0), otel.Source.Port) - assert.Equal(t, "invalid IP", otel.Destination.Address) - assert.Equal(t, uint32(0), otel.Destination.Port) - assert.Equal(t, uint64(0), otel.IO.Bytes) - assert.Equal(t, uint64(0), otel.IO.Packets) - assert.Equal(t, "unknown", otel.Transport) - assert.Equal(t, "unknown", otel.Type) - assert.Equal(t, "UNKNOWN", otel.Flow.Type) - assert.Equal(t, "invalid IP", otel.Flow.SamplerAddress) - assert.Equal(t, uint32(0), otel.Flow.SequenceNum) - assert.Equal(t, uint64(0), otel.Flow.SamplingRate) - assert.Equal(t, time.Unix(0, 0), otel.Flow.Start) - assert.Equal(t, time.Unix(0, 0), otel.Flow.End) - assert.Equal(t, time.Unix(0, 0), otel.Flow.TimeReceived) + + expectedAttributes := pcommon.NewMap() + expectedAttributes.PutStr(semconv.AttributeSourceAddress, "invalid IP") + expectedAttributes.PutInt(semconv.AttributeSourcePort, 0) + expectedAttributes.PutStr(semconv.AttributeDestinationAddress, "invalid IP") + expectedAttributes.PutInt(semconv.AttributeDestinationPort, 0) + expectedAttributes.PutStr(semconv.AttributeNetworkTransport, "hopopt") + expectedAttributes.PutStr(semconv.AttributeNetworkType, "unknown") + expectedAttributes.PutInt("network.io.bytes", 0) + expectedAttributes.PutInt("network.io.packets", 0) + expectedAttributes.PutStr("network.flow.type", "unknown") + expectedAttributes.PutInt("network.flow.sequence_num", 0) + expectedAttributes.PutInt("network.flow.time_received", 0) + expectedAttributes.PutInt("network.flow.start", 0) + expectedAttributes.PutInt("network.flow.end", 0) + expectedAttributes.PutInt("network.flow.sampling_rate", 0) + expectedAttributes.PutStr("network.flow.sampler_address", "invalid IP") + + assert.Equal(t, expectedAttributes, record.Attributes()) } diff --git a/receiver/netflowreceiver/producer.go b/receiver/netflowreceiver/producer.go index 58fe890990e1..0248b212a9d3 100644 --- a/receiver/netflowreceiver/producer.go +++ b/receiver/netflowreceiver/producer.go @@ -5,11 +5,9 @@ package netflowreceiver // import "github.com/open-telemetry/opentelemetry-colle import ( "context" - "encoding/json" "github.com/netsampler/goflow2/v2/producer" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver/internal/metadata" @@ -39,34 +37,13 @@ func (o *OtelLogsProducerWrapper) Produce(msg any, args *producer.ProduceArgs) ( // A single netflow packet can contain multiple flow messages for _, msg := range flowMessageSet { - // Convert each one to the Otel semantic dictionary format - otelMessage, innerErr := convertToOtel(msg) - if innerErr != nil { - continue - } logRecord := logRecords.AppendEmpty() - logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(otelMessage.Flow.Start)) - logRecord.SetTimestamp(pcommon.NewTimestampFromTime(otelMessage.Flow.TimeReceived)) - - // The bytes of the message in JSON format - m, innerErr := json.Marshal(otelMessage) - if innerErr != nil { - continue - } - - // Convert to a map[string] - // https://opentelemetry.io/docs/specs/otel/logs/data-model/#type-mapstring-any - sec := map[string]any{} - if innerErr = json.Unmarshal(m, &sec); innerErr != nil { + parseErr := addMessageAttributes(msg, &logRecord) + if parseErr != nil { continue } - // Set the map to the log record body - innerErr = logRecord.Body().SetEmptyMap().FromRaw(sec) - if innerErr != nil { - continue - } } // Send the logs to the collector, it is difficult to pass the context here From 169373f9f6f3584468a4d1987346a71ba6933605 Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Tue, 7 Jan 2025 23:33:18 -0600 Subject: [PATCH 20/30] netflow - use context.Background() on producer --- receiver/netflowreceiver/producer.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/receiver/netflowreceiver/producer.go b/receiver/netflowreceiver/producer.go index 0248b212a9d3..34bcfa38636f 100644 --- a/receiver/netflowreceiver/producer.go +++ b/receiver/netflowreceiver/producer.go @@ -46,8 +46,7 @@ func (o *OtelLogsProducerWrapper) Produce(msg any, args *producer.ProduceArgs) ( } - // Send the logs to the collector, it is difficult to pass the context here - err = o.logConsumer.ConsumeLogs(context.TODO(), log) + err = o.logConsumer.ConsumeLogs(context.Background(), log) if err != nil { return flowMessageSet, err } From 5cf7625a031808b396fec5f1ece9ac700e9c920a Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Tue, 7 Jan 2025 23:38:04 -0600 Subject: [PATCH 21/30] netflow - type the dropHandler interface --- receiver/netflowreceiver/receiver.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/receiver/netflowreceiver/receiver.go b/receiver/netflowreceiver/receiver.go index 7c1676c82e8e..3bb02020507e 100644 --- a/receiver/netflowreceiver/receiver.go +++ b/receiver/netflowreceiver/receiver.go @@ -19,11 +19,13 @@ import ( "go.uber.org/zap" ) +var _ utils.ReceiverCallback = (*dropHandler)(nil) + type dropHandler struct { logger *zap.Logger } -func (d *dropHandler) Dropped(msg utils.Message) { +func (d dropHandler) Dropped(msg utils.Message) { d.logger.Warn("Dropped netflow message", zap.Any("msg", msg)) } From 10fc878aaa33d16fd07616733198432c9ae73e86 Mon Sep 17 00:00:00 2001 From: David Lopes Date: Tue, 7 Jan 2025 23:39:46 -0600 Subject: [PATCH 22/30] Update .chloggen/netflow-receiver-implementation.yaml Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> --- .chloggen/netflow-receiver-implementation.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/netflow-receiver-implementation.yaml b/.chloggen/netflow-receiver-implementation.yaml index a7a0d7f718bf..9d1216640442 100644 --- a/.chloggen/netflow-receiver-implementation.yaml +++ b/.chloggen/netflow-receiver-implementation.yaml @@ -15,7 +15,7 @@ issues: [32732] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. # Use pipe (|) for multiline entries. -subtext: Implement the UDP listener and the producer, as well as adding tests. +subtext: The receiver now supports receiving NetFlow v5, NetFow v9, IPFIX, and sFlow v5 logs. # If your change doesn't affect end users or the exported elements of any package, # you should instead start your pull request title with [chore] or use the "Skip Changelog" label. From 54d8b60e83623ea11b5a48a17573827328e8b8b3 Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Tue, 7 Jan 2025 23:46:21 -0600 Subject: [PATCH 23/30] netflow - more concise startup log message --- receiver/netflowreceiver/receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/netflowreceiver/receiver.go b/receiver/netflowreceiver/receiver.go index 3bb02020507e..10443d52c63d 100644 --- a/receiver/netflowreceiver/receiver.go +++ b/receiver/netflowreceiver/receiver.go @@ -71,7 +71,7 @@ func (nr *netflowReceiver) Start(ctx context.Context, host component.Host) error return err } - nr.logger.Info("Start listening for NetFlow on UDP", zap.Any("config", nr.config)) + nr.logger.Info("Start listening for NetFlow over UDP with port", zap.Int("port", nr.config.Port)) if err := nr.udpReceiver.Start(nr.config.Hostname, nr.config.Port, decodeFunc); err != nil { return err } From d2c1c1f5cfe09b363be682878c9e11825cff2ba9 Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Wed, 8 Jan 2025 09:03:31 -0600 Subject: [PATCH 24/30] netflow - sflow and netflow schema documentation * Log empty sflow packets, which can happen if a packet only contains counter samples, which are not yet supported --- receiver/netflowreceiver/README.md | 63 ++++++++++++++--------- receiver/netflowreceiver/producer.go | 9 +++- receiver/netflowreceiver/producer_test.go | 3 +- receiver/netflowreceiver/receiver.go | 4 +- 4 files changed, 50 insertions(+), 29 deletions(-) diff --git a/receiver/netflowreceiver/README.md b/receiver/netflowreceiver/README.md index 2370c33d11b4..ec8e5528c8e5 100644 --- a/receiver/netflowreceiver/README.md +++ b/receiver/netflowreceiver/README.md @@ -72,28 +72,41 @@ You would then configure your network devices to send netflow, sflow, or ipfix d The netflow data is standardized for the different schemas and is converted to OpenTelemetry log records following the [semantic conventions](https://opentelemetry.io/docs/specs/semconv/general/attributes/#server-client-and-shared-network-attributes) -The log record will have the following attributes: - -``` -ObservedTimestamp: 2025-01-08 04:14:49.8308464 +0000 UTC -Timestamp: 2025-01-08 04:14:49.918929427 +0000 UTC -SeverityText: -SeverityNumber: Unspecified(0) -Body: Empty() -Attributes: - -> source.address: Str(132.189.238.100) - -> source.port: Int(1255) - -> destination.address: Str(241.171.33.110) - -> destination.port: Int(64744) - -> network.transport: Str(tcp) - -> network.type: Str(ipv4) - -> network.io.bytes: Int(853) - -> network.io.packets: Int(83) - -> network.flow.type: Str(netflow_v5) - -> network.flow.sequence_num: Int(191) - -> network.flow.time_received: Int(1736309689918929427) - -> network.flow.start: Int(1736309689830846400) - -> network.flow.end: Int(1736309689871846400) - -> network.flow.sampling_rate: Int(0) - -> network.flow.sampler_address: Str(172.28.176.1) -``` +The log record will have the following attributes (with examples): + +* **source.address**: Str(132.189.238.100) +* **source.port**: Int(1255) +* **destination.address**: Str(241.171.33.110) +* **destination.port**: Int(64744) +* **network.transport**: Str(tcp) +* **network.type**: Str(ipv4) +* **network.io.bytes**: Int(853) +* **network.io.packets**: Int(83) +* **network.flow.type**: Str(netflow_v5) +* **network.flow.sequence_num**: Int(191) +* **network.flow.time_received**: Int(1736309689918929427) +* **network.flow.start**: Int(1736309689830846400) +* **network.flow.end**: Int(1736309689871846400) +* **network.flow.sampling_rate**: Int(0) +* **network.flow.sampler_address**: Str(172.28.176.1) + +The log record timestamps will be: + +* **Observed timestamp**: The time the flow was received. +* **Timestamp**: The flow `start` field. + +### Schema support + +#### netflow + +* Process [Template Records](https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html) if present +* Process Netflow V5, V9, and IPFIX messages +* Extract the attributes documented above +* Mapping of custom fields is not yet supported + +#### sflow + +* Process [sFlow version 5](https://sflow.org/sflow_version_5.txt) datagrams +* `flow_sample` and `flow_sample_expanded` are supported. +* `counter_sample` and `counter_sample_expanded` are NOT yet supported. +* Mapping of custom fields is not yet supported diff --git a/receiver/netflowreceiver/producer.go b/receiver/netflowreceiver/producer.go index 34bcfa38636f..971e13fd5b3f 100644 --- a/receiver/netflowreceiver/producer.go +++ b/receiver/netflowreceiver/producer.go @@ -9,6 +9,7 @@ import ( "github.com/netsampler/goflow2/v2/producer" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver/internal/metadata" ) @@ -17,6 +18,7 @@ import ( type OtelLogsProducerWrapper struct { wrapped producer.ProducerInterface logConsumer consumer.Logs + logger *zap.Logger } // Produce converts the message into a list log records and sends them to log consumer @@ -46,6 +48,10 @@ func (o *OtelLogsProducerWrapper) Produce(msg any, args *producer.ProduceArgs) ( } + if len(flowMessageSet) == 0 { + o.logger.Info("received a packet with no flow messages from", zap.String("agent", args.SamplerAddress.String())) + } + err = o.logConsumer.ConsumeLogs(context.Background(), log) if err != nil { return flowMessageSet, err @@ -62,9 +68,10 @@ func (o *OtelLogsProducerWrapper) Commit(flowMessageSet []producer.ProducerMessa o.wrapped.Commit(flowMessageSet) } -func newOtelLogsProducer(wrapped producer.ProducerInterface, logConsumer consumer.Logs) producer.ProducerInterface { +func newOtelLogsProducer(wrapped producer.ProducerInterface, logConsumer consumer.Logs, logger *zap.Logger) producer.ProducerInterface { return &OtelLogsProducerWrapper{ wrapped: wrapped, logConsumer: logConsumer, + logger: logger, } } diff --git a/receiver/netflowreceiver/producer_test.go b/receiver/netflowreceiver/producer_test.go index cb1419b37ad2..2b0b718f4dfe 100644 --- a/receiver/netflowreceiver/producer_test.go +++ b/receiver/netflowreceiver/producer_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer/consumertest" + "go.uber.org/zap" ) func TestProduce(t *testing.T) { @@ -53,7 +54,7 @@ func TestProduce(t *testing.T) { protoProducer, err := protoproducer.CreateProtoProducer(cfgm, protoproducer.CreateSamplingSystem) require.NoError(t, err) - otelLogsProducer := newOtelLogsProducer(protoProducer, consumertest.NewNop()) + otelLogsProducer := newOtelLogsProducer(protoProducer, consumertest.NewNop(), zap.NewNop()) messages, err := otelLogsProducer.Produce(message, &producer.ProduceArgs{}) require.NoError(t, err) require.NotNil(t, messages) diff --git a/receiver/netflowreceiver/receiver.go b/receiver/netflowreceiver/receiver.go index 10443d52c63d..0fae8b9fd786 100644 --- a/receiver/netflowreceiver/receiver.go +++ b/receiver/netflowreceiver/receiver.go @@ -71,7 +71,7 @@ func (nr *netflowReceiver) Start(ctx context.Context, host component.Host) error return err } - nr.logger.Info("Start listening for NetFlow over UDP with port", zap.Int("port", nr.config.Port)) + nr.logger.Info("Start listening over UDP", zap.String("scheme", nr.config.Scheme), zap.Int("port", nr.config.Port)) if err := nr.udpReceiver.Start(nr.config.Hostname, nr.config.Port, decodeFunc); err != nil { return err } @@ -112,7 +112,7 @@ func (nr *netflowReceiver) buildDecodeFunc() (utils.DecoderFunc, error) { // the otel log producer converts those messages into OpenTelemetry logs // it is a wrapper around the protobuf producer - otelLogsProducer := newOtelLogsProducer(protoProducer, nr.logConsumer) + otelLogsProducer := newOtelLogsProducer(protoProducer, nr.logConsumer, nr.logger) cfgPipe := &utils.PipeConfig{ Producer: otelLogsProducer, From 41b68a282fad13db437616c1f18f3ec228fec8bc Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Wed, 8 Jan 2025 09:08:51 -0600 Subject: [PATCH 25/30] netflow - remove 'flow' scheme The flow scheme was strange as it was just netflow + sflow in a single pipe which checked the protocol before forwarding tp the actual pipe We remove that option for clarity, so users can only chose netflow or sflow as the scheme --- receiver/netflowreceiver/README.md | 2 +- receiver/netflowreceiver/config.go | 4 ++-- receiver/netflowreceiver/config_test.go | 14 ++------------ receiver/netflowreceiver/receiver.go | 2 -- receiver/netflowreceiver/testdata/config.yaml | 10 +--------- 5 files changed, 6 insertions(+), 26 deletions(-) diff --git a/receiver/netflowreceiver/README.md b/receiver/netflowreceiver/README.md index ec8e5528c8e5..ea0ab39c3b12 100644 --- a/receiver/netflowreceiver/README.md +++ b/receiver/netflowreceiver/README.md @@ -61,7 +61,7 @@ You would then configure your network devices to send netflow, sflow, or ipfix d | Field | Description | Examples | Default | |-------|-------------|--------| ------- | -| scheme | The type of flow data that to receive | `sflow`, `netflow`, `flow` | `netflow` | +| scheme | The type of flow data that to receive | `sflow`, `netflow` | `netflow` | | hostname | The hostname or IP address to bind to | `localhost` | `0.0.0.0` | | port | The port to bind to | `2055` or `6343` | `2055` | | sockets | The number of sockets to use | 1 | 1 | diff --git a/receiver/netflowreceiver/config.go b/receiver/netflowreceiver/config.go index e92eb79feb2e..5ecc4be91d21 100644 --- a/receiver/netflowreceiver/config.go +++ b/receiver/netflowreceiver/config.go @@ -32,7 +32,7 @@ type Config struct { // Validate checks if the receiver configuration is valid func (cfg *Config) Validate() error { - validSchemes := [3]string{"sflow", "netflow", "flow"} + validSchemes := [3]string{"sflow", "netflow"} validScheme := false for _, scheme := range validSchemes { @@ -42,7 +42,7 @@ func (cfg *Config) Validate() error { } } if !validScheme { - return fmt.Errorf("scheme must be one of sflow, netflow, or flow") + return fmt.Errorf("scheme must be netflow or sflow") } if cfg.Sockets <= 0 { diff --git a/receiver/netflowreceiver/config_test.go b/receiver/netflowreceiver/config_test.go index 347282f2404f..2a51b8b783e1 100644 --- a/receiver/netflowreceiver/config_test.go +++ b/receiver/netflowreceiver/config_test.go @@ -53,17 +53,7 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, "sflow"), expected: &Config{ Scheme: "sflow", - Port: 2055, - Sockets: 1, - Workers: 1, - QueueSize: 1000, - }, - }, - { - id: component.NewIDWithName(metadata.Type, "flow"), - expected: &Config{ - Scheme: "flow", - Port: 2055, + Port: 6343, Sockets: 1, Workers: 1, QueueSize: 1000, @@ -98,7 +88,7 @@ func TestInvalidConfig(t *testing.T) { }{ { id: component.NewIDWithName(metadata.Type, "invalid_schema"), - err: "scheme must be one of sflow, netflow, or flow", + err: "scheme must be netflow or sflow", }, { id: component.NewIDWithName(metadata.Type, "invalid_port"), diff --git a/receiver/netflowreceiver/receiver.go b/receiver/netflowreceiver/receiver.go index 0fae8b9fd786..ff43c2930bcd 100644 --- a/receiver/netflowreceiver/receiver.go +++ b/receiver/netflowreceiver/receiver.go @@ -125,8 +125,6 @@ func (nr *netflowReceiver) buildDecodeFunc() (utils.DecoderFunc, error) { p = utils.NewSFlowPipe(cfgPipe) case "netflow": p = utils.NewNetFlowPipe(cfgPipe) - case "flow": - p = utils.NewFlowPipe(cfgPipe) default: return nil, fmt.Errorf("scheme does not exist: %s", nr.config.Scheme) } diff --git a/receiver/netflowreceiver/testdata/config.yaml b/receiver/netflowreceiver/testdata/config.yaml index 467ed8fefaa0..1cc4164c1fbd 100644 --- a/receiver/netflowreceiver/testdata/config.yaml +++ b/receiver/netflowreceiver/testdata/config.yaml @@ -36,15 +36,7 @@ netflow/zero_queue: netflow/sflow: scheme: sflow - port: 2055 + port: 6343 sockets: 1 workers: 1 queue_size: 0 - -netflow/flow: - scheme: flow - port: 2055 - sockets: 1 - workers: 1 - queue_size: 0 - From 7dbde9bd57c77e81032a525d8521202e0a113438 Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Wed, 8 Jan 2025 09:11:17 -0600 Subject: [PATCH 26/30] netflow - add sflow to sample yaml config --- receiver/netflowreceiver/README.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/receiver/netflowreceiver/README.md b/receiver/netflowreceiver/README.md index ea0ab39c3b12..c2afe326a07f 100644 --- a/receiver/netflowreceiver/README.md +++ b/receiver/netflowreceiver/README.md @@ -32,6 +32,11 @@ receivers: port: 2055 sockets: 16 workers: 32 + netflow/sflow: + - scheme: sflow + port: 6343 + sockets: 16 + workers: 32 processors: batch: @@ -45,7 +50,7 @@ exporters: service: pipelines: logs: - receivers: [netflow] + receivers: [netflow, netflow/sflow] processors: [batch] exporters: [debug] telemetry: From b33920b8d72d7de84ba72a751521a0baa4c6b1a8 Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Wed, 8 Jan 2025 09:12:02 -0600 Subject: [PATCH 27/30] netflow - fix default queue size --- receiver/netflowreceiver/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/netflowreceiver/README.md b/receiver/netflowreceiver/README.md index c2afe326a07f..95b1ad4628e2 100644 --- a/receiver/netflowreceiver/README.md +++ b/receiver/netflowreceiver/README.md @@ -71,7 +71,7 @@ You would then configure your network devices to send netflow, sflow, or ipfix d | port | The port to bind to | `2055` or `6343` | `2055` | | sockets | The number of sockets to use | 1 | 1 | | workers | The number of workers used to decode incoming flow messages | 2 | 2 | -| queue_size | The size of the incoming netflow packets queue | 1000 | 1000000 | +| queue_size | The size of the incoming netflow packets queue | 1000 | 1000 | ## Data format From 77ec06a7f56ec3748aadceaea84e4fcaf31edb50 Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Wed, 8 Jan 2025 09:16:49 -0600 Subject: [PATCH 28/30] netflow - go mod tidy --- receiver/netflowreceiver/go.mod | 3 ++- receiver/netflowreceiver/go.sum | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/receiver/netflowreceiver/go.mod b/receiver/netflowreceiver/go.mod index 9895ff7dada6..5d8a46c78c8e 100644 --- a/receiver/netflowreceiver/go.mod +++ b/receiver/netflowreceiver/go.mod @@ -10,8 +10,10 @@ require ( go.opentelemetry.io/collector/confmap v1.23.0 go.opentelemetry.io/collector/consumer v1.23.0 go.opentelemetry.io/collector/consumer/consumertest v0.117.0 + go.opentelemetry.io/collector/pdata v1.23.0 go.opentelemetry.io/collector/receiver v0.117.0 go.opentelemetry.io/collector/receiver/receivertest v0.117.0 + go.opentelemetry.io/collector/semconv v0.117.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 ) @@ -36,7 +38,6 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.117.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.117.0 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.117.0 // indirect - go.opentelemetry.io/collector/pdata v1.23.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.117.0 // indirect go.opentelemetry.io/collector/pipeline v0.117.0 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.117.0 // indirect diff --git a/receiver/netflowreceiver/go.sum b/receiver/netflowreceiver/go.sum index f7a5942dc457..ffab4f024807 100644 --- a/receiver/netflowreceiver/go.sum +++ b/receiver/netflowreceiver/go.sum @@ -84,6 +84,8 @@ go.opentelemetry.io/collector/receiver/receivertest v0.117.0 h1:aN4zOuWsiARa+RG9 go.opentelemetry.io/collector/receiver/receivertest v0.117.0/go.mod h1:1wnGEowDmlO89feq1P+b4tQI2G/+iJxRrMallw7zeJE= go.opentelemetry.io/collector/receiver/xreceiver v0.117.0 h1:HJjBj6P3/WQoYaRKZkWZHnUUCVFpBieqGKzKHcT6HUw= go.opentelemetry.io/collector/receiver/xreceiver v0.117.0/go.mod h1:K1qMjIiAg6i3vHA+/EpM8nkhna3uIgoEellE2yuhz7A= +go.opentelemetry.io/collector/semconv v0.117.0 h1:SavOvSbHPVD/QdAnXlI/cMca+yxCNyXStY1mQzerHs4= +go.opentelemetry.io/collector/semconv v0.117.0/go.mod h1:N6XE8Q0JKgBN2fAhkUQtqK9LT7rEGR6+Wu/Rtbal1iI= go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= From a1f642212d22647dde30ef5a59ff0a5b19c9e393 Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Wed, 8 Jan 2025 09:25:57 -0600 Subject: [PATCH 29/30] netflow - format --- receiver/netflowreceiver/factory.go | 4 ++-- receiver/netflowreceiver/parser_test.go | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/receiver/netflowreceiver/factory.go b/receiver/netflowreceiver/factory.go index 6f65bc466747..f9a150f3c9e4 100644 --- a/receiver/netflowreceiver/factory.go +++ b/receiver/netflowreceiver/factory.go @@ -14,8 +14,8 @@ import ( ) const ( - defaultSockets = 1 - defaultWorkers = 2 + defaultSockets = 1 + defaultWorkers = 2 // The default UDP packet buffer size in GoFlow2 is 9000 bytes, which means // that for a full queue of 1000 messages, the size in memory will be 9MB. // Source: https://github.com/netsampler/goflow2/blob/v2.2.1/README.md#security-notes-and-assumptions diff --git a/receiver/netflowreceiver/parser_test.go b/receiver/netflowreceiver/parser_test.go index 91e444f98c30..ac385a08ad31 100644 --- a/receiver/netflowreceiver/parser_test.go +++ b/receiver/netflowreceiver/parser_test.go @@ -85,7 +85,6 @@ func TestConvertToOtel(t *testing.T) { expectedAttributes.PutStr("network.flow.sampler_address", "192.168.1.100") assert.Equal(t, expectedAttributes, record.Attributes()) - } func TestEmptyConvertToOtel(t *testing.T) { From c7383985a4a638b0bf109b19a83652833f654ecd Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Wed, 8 Jan 2025 09:48:10 -0600 Subject: [PATCH 30/30] netflow - lint and typing --- receiver/netflowreceiver/producer.go | 2 -- receiver/netflowreceiver/receiver.go | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/receiver/netflowreceiver/producer.go b/receiver/netflowreceiver/producer.go index 971e13fd5b3f..eee58d9fb92d 100644 --- a/receiver/netflowreceiver/producer.go +++ b/receiver/netflowreceiver/producer.go @@ -39,13 +39,11 @@ func (o *OtelLogsProducerWrapper) Produce(msg any, args *producer.ProduceArgs) ( // A single netflow packet can contain multiple flow messages for _, msg := range flowMessageSet { - logRecord := logRecords.AppendEmpty() parseErr := addMessageAttributes(msg, &logRecord) if parseErr != nil { continue } - } if len(flowMessageSet) == 0 { diff --git a/receiver/netflowreceiver/receiver.go b/receiver/netflowreceiver/receiver.go index ff43c2930bcd..18d8a5a6b9ce 100644 --- a/receiver/netflowreceiver/receiver.go +++ b/receiver/netflowreceiver/receiver.go @@ -62,7 +62,7 @@ func newNetflowLogsReceiver(params receiver.Settings, cfg Config, consumer consu return nr, nil } -func (nr *netflowReceiver) Start(ctx context.Context, host component.Host) error { +func (nr *netflowReceiver) Start(_ context.Context, _ component.Host) error { nr.logger.Info("NetFlow receiver is starting...") // The function that will decode packets