Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/netflow] Netflow receiver implementation - PR 2 #36865

Open
wants to merge 41 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
f7ff90b
#32732 - Initial implementation
dlopes7 Dec 15, 2024
9871936
#32732 - Make internal methods private
dlopes7 Dec 15, 2024
09a91d1
#32732 - netflow - make checks
dlopes7 Dec 15, 2024
26d5aec
#32732 - netflow - simplify implementation, remove listener
dlopes7 Dec 16, 2024
64acc94
netflow - add parser tests
dlopes7 Dec 16, 2024
658c414
netflow - add more tests
dlopes7 Dec 16, 2024
0a50dbb
netflow - add test to check for udp receiver creation
dlopes7 Dec 16, 2024
dd6a006
netflow - gofmt
dlopes7 Dec 16, 2024
363f93b
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
dlopes7 Dec 16, 2024
3854269
netflow - make checks
dlopes7 Dec 16, 2024
0fe206e
netflowreceiver - add changelog for implementation
dlopes7 Dec 17, 2024
d7c0782
netflow - fix license header
dlopes7 Dec 17, 2024
b343ab4
netflowreceiver - lint
dlopes7 Dec 17, 2024
f504466
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Dec 17, 2024
d31a81e
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Dec 19, 2024
8d07a6a
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Dec 19, 2024
f7b1512
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Dec 20, 2024
33d0cee
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
dlopes7 Dec 27, 2024
5e178ac
netflow - update .chloggen/netflow-receiver-implementation.yaml
dlopes7 Jan 2, 2025
ebfa73d
netflow - remove scaffold issue
dlopes7 Jan 2, 2025
86701b9
netflow - add comment about default queue size value
dlopes7 Jan 2, 2025
52bca59
netflow - fix duplicate test case
dlopes7 Jan 2, 2025
edbbbc0
netflow - fix typo
dlopes7 Jan 2, 2025
d68e826
netflow - better error handling messages
dlopes7 Jan 8, 2025
053543c
netflow - use a simple log record instead of struct
dlopes7 Jan 8, 2025
169373f
netflow - use context.Background() on producer
dlopes7 Jan 8, 2025
5cf7625
netflow - type the dropHandler interface
dlopes7 Jan 8, 2025
10fc878
Update .chloggen/netflow-receiver-implementation.yaml
dlopes7 Jan 8, 2025
2281a06
Merge branch 'netflow-receiver-implementation' of github.com:dlopes7/…
dlopes7 Jan 8, 2025
54d8b60
netflow - more concise startup log message
dlopes7 Jan 8, 2025
d2c1c1f
netflow - sflow and netflow schema documentation
dlopes7 Jan 8, 2025
41b68a2
netflow - remove 'flow' scheme
dlopes7 Jan 8, 2025
7dbde9b
netflow - add sflow to sample yaml config
dlopes7 Jan 8, 2025
b33920b
netflow - fix default queue size
dlopes7 Jan 8, 2025
12e8425
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
dlopes7 Jan 8, 2025
77ec06a
netflow - go mod tidy
dlopes7 Jan 8, 2025
a1f6422
netflow - format
dlopes7 Jan 8, 2025
c738398
netflow - lint and typing
dlopes7 Jan 8, 2025
ccf6e7c
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
dlopes7 Jan 8, 2025
c090043
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
dlopes7 Jan 9, 2025
3ef5c92
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Jan 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/netflow-receiver-implementation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component
dlopes7 marked this conversation as resolved.
Show resolved Hide resolved

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: netflowreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds the implementation of the netflow receiver

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32732, 97279]
dlopes7 marked this conversation as resolved.
Show resolved Hide resolved

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Implement the UDP listener and the producer, as well as adding tests.
dlopes7 marked this conversation as resolved.
Show resolved Hide resolved

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
50 changes: 49 additions & 1 deletion receiver/netflowreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,47 @@ func TestLoadConfig(t *testing.T) {
Port: 2055,
Sockets: 1,
Workers: 1,
QueueSize: 1000000,
QueueSize: 1000,
},
},
{
id: component.NewIDWithName(metadata.Type, "zero_queue"),
expected: &Config{
Scheme: "netflow",
Port: 2055,
Sockets: 1,
Workers: 1,
QueueSize: 1000,
},
},
{
id: component.NewIDWithName(metadata.Type, "zero_queue"),
expected: &Config{
Scheme: "netflow",
Port: 2055,
Sockets: 1,
Workers: 1,
QueueSize: 1000,
},
},
dlopes7 marked this conversation as resolved.
Show resolved Hide resolved
{
id: component.NewIDWithName(metadata.Type, "sflow"),
expected: &Config{
Scheme: "sflow",
Port: 2055,
Sockets: 1,
Workers: 1,
QueueSize: 1000,
},
},
{
id: component.NewIDWithName(metadata.Type, "flow"),
expected: &Config{
Scheme: "flow",
Port: 2055,
Sockets: 1,
Workers: 1,
QueueSize: 1000,
},
},
}
Expand Down Expand Up @@ -74,6 +114,14 @@ func TestInvalidConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "invalid_port"),
err: "port must be greater than 0",
},
{
id: component.NewIDWithName(metadata.Type, "zero_sockets"),
err: "sockets must be greater than 0",
},
{
id: component.NewIDWithName(metadata.Type, "zero_workers"),
err: "workers must be greater than 0",
},
}

for _, tt := range tests {
Expand Down
17 changes: 10 additions & 7 deletions receiver/netflowreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
const (
defaultSockets = 1
defaultWorkers = 2
defaultQueueSize = 1_000_000
defaultQueueSize = 1_000
dlopes7 marked this conversation as resolved.
Show resolved Hide resolved
)

// NewFactory creates a factory for netflow receiver.
Expand All @@ -27,6 +27,8 @@ func NewFactory() receiver.Factory {
receiver.WithLogs(createLogsReceiver, metadata.LogsStability))
}

// Config defines configuration for netflow receiver.
// By default we listen for netflow traffic on port 2055
func createDefaultConfig() component.Config {
return &Config{
Scheme: "netflow",
Expand All @@ -37,14 +39,15 @@ func createDefaultConfig() component.Config {
}
}

// createLogsReceiver creates a netflow receiver.
// We also create the UDP receiver, which is the piece of software that actually listens
// for incoming netflow traffic on an UDP port.
func createLogsReceiver(_ context.Context, params receiver.Settings, cfg component.Config, consumer consumer.Logs) (receiver.Logs, error) {
logger := params.Logger
conf := cfg.(*Config)
conf := *(cfg.(*Config))

nr := &netflowReceiver{
logger: logger,
logConsumer: consumer,
config: conf,
nr, err := newNetflowLogsReceiver(params, conf, consumer)
if err != nil {
return nil, err
}

return nr, nil
Expand Down
4 changes: 3 additions & 1 deletion receiver/netflowreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflo
go 1.22.0

require (
github.com/netsampler/goflow2/v2 v2.2.1
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.116.1-0.20241220212031-7c2639723f67
go.opentelemetry.io/collector/component/componenttest v0.116.1-0.20241220212031-7c2639723f67
go.opentelemetry.io/collector/confmap v1.22.1-0.20241220212031-7c2639723f67
go.opentelemetry.io/collector/consumer v1.22.1-0.20241220212031-7c2639723f67
go.opentelemetry.io/collector/consumer/consumertest v0.116.1-0.20241220212031-7c2639723f67
go.opentelemetry.io/collector/pdata v1.22.1-0.20241220212031-7c2639723f67
go.opentelemetry.io/collector/receiver v0.116.1-0.20241220212031-7c2639723f67
go.opentelemetry.io/collector/receiver/receivertest v0.116.1-0.20241220212031-7c2639723f67
go.uber.org/goleak v1.3.0
Expand All @@ -26,6 +28,7 @@ require (
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.2 // indirect
github.com/libp2p/go-reuseport v0.4.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -34,7 +37,6 @@ require (
go.opentelemetry.io/collector/config/configtelemetry v0.116.1-0.20241220212031-7c2639723f67 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.116.1-0.20241220212031-7c2639723f67 // indirect
go.opentelemetry.io/collector/consumer/xconsumer v0.116.1-0.20241220212031-7c2639723f67 // indirect
go.opentelemetry.io/collector/pdata v1.22.1-0.20241220212031-7c2639723f67 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.116.1-0.20241220212031-7c2639723f67 // indirect
go.opentelemetry.io/collector/pipeline v0.116.1-0.20241220212031-7c2639723f67 // indirect
go.opentelemetry.io/collector/receiver/xreceiver v0.116.1-0.20241220212031-7c2639723f67 // indirect
Expand Down
4 changes: 4 additions & 0 deletions receiver/netflowreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 0 additions & 11 deletions receiver/netflowreceiver/listener.go

This file was deleted.

27 changes: 0 additions & 27 deletions receiver/netflowreceiver/listener_test.go

This file was deleted.

139 changes: 139 additions & 0 deletions receiver/netflowreceiver/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver"

import (
"errors"
"net/netip"
"time"

"github.com/netsampler/goflow2/v2/producer"
protoproducer "github.com/netsampler/goflow2/v2/producer/proto"
)

var (
etypeName = map[uint32]string{
0x806: "ARP",
0x800: "IPv4",
0x86dd: "IPv6",
}
protoName = map[uint32]string{
1: "ICMP",
6: "TCP",
17: "UDP",
58: "ICMPv6",
132: "SCTP",
}

flowTypeName = map[int32]string{
0: "UNKNOWN",
1: "SFLOW_5",
2: "NETFLOW_V5",
3: "NETFLOW_V9",
4: "IPFIX",
}
)

type NetworkAddress struct {
Address string `json:"address,omitempty"`
Port uint32 `json:"port,omitempty"`
}

type Flow struct {
Type string `json:"type,omitempty"`
TimeReceived time.Time `json:"time_received,omitempty"`
Start time.Time `json:"start,omitempty"`
End time.Time `json:"end,omitempty"`
SequenceNum uint32 `json:"sequence_num,omitempty"`
SamplingRate uint64 `json:"sampling_rate,omitempty"`
SamplerAddress string `json:"sampler_address,omitempty"`
}

type Protocol struct {
Name []byte `json:"name,omitempty"` // Layer 7
}

type NetworkIO struct {
Bytes uint64 `json:"bytes,omitempty"`
Packets uint64 `json:"packets,omitempty"`
}

type OtelNetworkMessage struct {
Source NetworkAddress `json:"source,omitempty"`
Destination NetworkAddress `json:"destination,omitempty"`
Transport string `json:"transport,omitempty"` // Layer 4
Type string `json:"type,omitempty"` // Layer 3
IO NetworkIO `json:"io,omitempty"`
Flow Flow `json:"flow,omitempty"`
}

func getEtypeName(etype uint32) string {
if name, ok := etypeName[etype]; ok {
return name
}
return "unknown"
}

func getProtoName(proto uint32) string {
if name, ok := protoName[proto]; ok {
return name
}
return "unknown"
}

func getFlowTypeName(flowType int32) string {
if name, ok := flowTypeName[flowType]; ok {
return name
}
return "unknown"
}

// convertToOtel converts a ProtoProducerMessage to an OtelNetworkMessage
func convertToOtel(m producer.ProducerMessage) (*OtelNetworkMessage, error) {
// we know msg is ProtoProducerMessage because that is the parent producer
pm, ok := m.(*protoproducer.ProtoProducerMessage)
if !ok {
return nil, errors.New("message is not ProtoProducerMessage")
}

// Parse IP addresses bytes to netip.Addr
srcAddr, _ := netip.AddrFromSlice(pm.SrcAddr)
dstAddr, _ := netip.AddrFromSlice(pm.DstAddr)
samplerAddr, _ := netip.AddrFromSlice(pm.SamplerAddress)

// Time the receiver received the message
receivedTime := time.Unix(0, int64(pm.TimeReceivedNs))
startTime := time.Unix(0, int64(pm.TimeFlowStartNs))
endTime := time.Unix(0, int64(pm.TimeFlowEndNs))

// Construct the actual log record based on the otel semantic conventions
// see https://opentelemetry.io/docs/specs/semconv/general/attributes/
otelMessage := OtelNetworkMessage{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of this looks like metadata on the log to me. Instead of creating this struct, could we create a plog.LogRecord and set these values as attributes using the corresponding semconv key for each value? We can probably use the network semantic conventions here. I'm having trouble linking to the Go registry copy of the docs, but you can use the go.opentelemetry.io/collector/semconv module to get constants for these, as seen here.

For the timestamps, we could just set those on the log record. I believe the body will be empty since the incoming messages don't seem to include any kind of equivalent arbitrary-text field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dug into it some more and I see that the network semconv isn't necessarily a 1:1 fit here. I think we should do as much as we can, and we can create attributes for the pieces that don't fit so long as they're documented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented here

Source: NetworkAddress{
Address: srcAddr.String(),
Port: pm.SrcPort,
},
Destination: NetworkAddress{
Address: dstAddr.String(),
Port: pm.DstPort,
},
Type: getEtypeName(pm.Etype), // Layer 3
Transport: getProtoName(pm.Proto), // Layer 4
IO: NetworkIO{
Bytes: pm.Bytes,
Packets: pm.Packets,
},
Flow: Flow{
Type: getFlowTypeName(int32(pm.Type)),
TimeReceived: receivedTime,
Start: startTime,
End: endTime,
SequenceNum: pm.SequenceNum,
SamplingRate: pm.SamplingRate,
SamplerAddress: samplerAddr.String(),
},
}

return &otelMessage, nil
}
Loading
Loading