Skip to content

Commit

Permalink
Observation domain (cloudflare#98)
Browse files Browse the repository at this point in the history
* netflow: Add observation domain and point to message

The ObservationDomainID and ObservationPointID are two IPFIX fields that
identify the entity that is capturing flows and can be used to enrich
the context around a specific sample.

Parse these fields from the sample and add them to the FlowMessage.

Signed-off-by: Adrian Moreno <[email protected]>
Co-authored-by: Adrian Moreno <[email protected]>
  • Loading branch information
lspgn and amorenoz authored Jul 21, 2022
1 parent 838505b commit 0020934
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 50 deletions.
2 changes: 1 addition & 1 deletion cmd/enricher/pb/flowext.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion format/common/text.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package common
import (
"encoding/binary"
"fmt"
"github.com/golang/protobuf/proto"
"net"
"reflect"
"strings"

"github.com/golang/protobuf/proto"
)

const (
Expand Down Expand Up @@ -52,6 +53,8 @@ var (

TextFields = []string{
"Type",
"ObservationPointID",
"ObservationDomainID",
"TimeReceived",
"SequenceNum",
"SamplingRate",
Expand Down Expand Up @@ -99,6 +102,8 @@ var (
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_IP,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
Expand Down
112 changes: 67 additions & 45 deletions pb/flow.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pb/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ message FlowMessage {
uint32 MPLSLastTTL = 61; // Last TTL
uint32 MPLSLastLabel = 62; // Last Label

uint32 ObservationDomainID = 70;
uint32 ObservationPointID = 71;

// Custom fields: start after ID 1000:
// uint32 MyCustomField = 1000;

Expand Down
4 changes: 4 additions & 0 deletions producer/producer_nf.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ func ConvertNetFlowDataSet(version uint16, baseTime uint32, uptime uint32, recor

switch df.Type {

case netflow.IPFIX_FIELD_observationPointId:
DecodeUNumber(v, &(flowMessage.ObservationPointID))

// Statistics
case netflow.NFV9_FIELD_IN_BYTES:
DecodeUNumber(v, &(flowMessage.Bytes))
Expand Down Expand Up @@ -560,6 +563,7 @@ func ProcessMessageNetFlowConfig(msgDec interface{}, samplingRateSys SamplingRat
for _, fmsg := range flowMessageSet {
fmsg.SequenceNum = seqnum
fmsg.SamplingRate = uint64(samplingRate)
fmsg.ObservationDomainID = obsDomainId
}
default:
return flowMessageSet, errors.New("Bad NetFlow/IPFIX version")
Expand Down
4 changes: 2 additions & 2 deletions producer/producer_nflegacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ func ConvertNetFlowLegacyRecord(baseTime uint32, uptime uint32, record netflowle

timeDiffFirst := (uptime - record.First)
timeDiffLast := (uptime - record.Last)
flowMessage.TimeFlowStart = uint64(baseTime - timeDiffFirst / 1000)
flowMessage.TimeFlowStart = uint64(baseTime - timeDiffFirst/1000)
flowMessage.TimeFlowStartMs = uint64(baseTime)*1000 - uint64(timeDiffFirst)
flowMessage.TimeFlowEnd = uint64(baseTime - timeDiffLast / 1000)
flowMessage.TimeFlowEnd = uint64(baseTime - timeDiffLast/1000)
flowMessage.TimeFlowEndMs = uint64(baseTime)*1000 - uint64(timeDiffLast)

v := make(net.IP, 4)
Expand Down
7 changes: 6 additions & 1 deletion utils/stopper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ func (s *stopper) start() error {

func (s *stopper) Shutdown() {
if s.stopCh != nil {
close(s.stopCh)
select {
case <-s.stopCh:
default:
close(s.stopCh)
}

s.stopCh = nil
}
}

0 comments on commit 0020934

Please sign in to comment.