Skip to content

Commit

Permalink
eBPF procs monitor method: fixed race conditions
Browse files Browse the repository at this point in the history
It'd probably be a good idea to write a module and encapsulate all the
functionality of the fields in funcs(), to lock them properly
(get/set maps, etc).

TODO: replace monitorLocalAddress() by
netlink.AddrSubscribeWithoptions(), to receive addresses' events
asynchronously.
  • Loading branch information
gustavo-iniguez-goya committed May 26, 2021
1 parent 6041493 commit 1db03b5
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 24 deletions.
8 changes: 6 additions & 2 deletions daemon/procmon/ebpf/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,17 @@ func PrintEverything() {
_, _ = exec.Command(bash, "-c", "bpftool map dump id "+strconv.Itoa(j)+" > dump"+strconv.Itoa(j)).Output()
}

for sock1, v := range alreadyEstablishedTCP {
alreadyEstablished.RLock()
for sock1, v := range alreadyEstablished.TCP {
fmt.Println(*sock1, v)
}

fmt.Println("---------------------")
for sock1, v := range alreadyEstablishedTCPv6 {
for sock1, v := range alreadyEstablished.TCPv6 {
fmt.Println(*sock1, v)
}
alreadyEstablished.RUnlock()

fmt.Println("---------------------")
sockets, _ := daemonNetlink.SocketsDump(syscall.AF_INET, syscall.IPPROTO_TCP)
for idx := range sockets {
Expand Down
40 changes: 33 additions & 7 deletions daemon/procmon/ebpf/ebpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,30 @@ type bpf_lookup_elem_t struct {
value uintptr
}

type alreadyEstablishedConns struct {
sync.RWMutex
TCP map[*daemonNetlink.Socket]int
TCPv6 map[*daemonNetlink.Socket]int
}

var (
m *elf.Module
lock = sync.RWMutex{}
mapSize = 12000
ebpfMaps map[string]*ebpfMapsForProto
//connections which were established at the time when opensnitch started
alreadyEstablishedTCP = make(map[*daemonNetlink.Socket]int)
alreadyEstablishedTCPv6 = make(map[*daemonNetlink.Socket]int)
alreadyEstablished = alreadyEstablishedConns{
TCP: make(map[*daemonNetlink.Socket]int),
TCPv6: make(map[*daemonNetlink.Socket]int),
}

//stop == true is a signal for all goroutines to stop
stop = false

// list of local addresses of this machine
localAddresses []net.IP
localAddressesLock sync.RWMutex
hostByteOrder binary.ByteOrder
localAddresses []net.IP

hostByteOrder binary.ByteOrder
)

//Start installs ebpf kprobes
Expand Down Expand Up @@ -80,6 +91,7 @@ func Start() error {
}
}

lock.Lock()
//determine host byte order
buf := [2]byte{}
*(*uint16)(unsafe.Pointer(&buf[0])) = uint16(0xABCD)
Expand All @@ -91,6 +103,7 @@ func Start() error {
default:
log.Error("Could not determine host byte order.")
}
lock.Unlock()

ebpfMaps = map[string]*ebpfMapsForProto{
"tcp": {lastPurgedMax: 0,
Expand All @@ -117,7 +130,9 @@ func Start() error {
inode := int((*sock).INode)
pid := procmon.GetPIDFromINode(inode, fmt.Sprint(inode,
(*sock).ID.Source, (*sock).ID.SourcePort, (*sock).ID.Destination, (*sock).ID.DestinationPort))
alreadyEstablishedTCP[sock] = pid
alreadyEstablished.Lock()
alreadyEstablished.TCP[sock] = pid
alreadyEstablished.Unlock()
}

socketListTCPv6, err := daemonNetlink.SocketsDump(uint8(syscall.AF_INET6), uint8(syscall.IPPROTO_TCP))
Expand All @@ -129,7 +144,9 @@ func Start() error {
inode := int((*sock).INode)
pid := procmon.GetPIDFromINode(inode, fmt.Sprint(inode,
(*sock).ID.Source, (*sock).ID.SourcePort, (*sock).ID.Destination, (*sock).ID.DestinationPort))
alreadyEstablishedTCPv6[sock] = pid
alreadyEstablished.Lock()
alreadyEstablished.TCPv6[sock] = pid
alreadyEstablished.Unlock()
}

go monitorMaps()
Expand All @@ -140,12 +157,21 @@ func Start() error {

// Stop stops monitoring connections using kprobes
func Stop() {
lock.Lock()
stop = true
lock.Unlock()
if m != nil {
m.Close()
}
}

func isStopped() bool {
lock.RLock()
defer lock.RUnlock()

return stop
}

//make bpf() syscall with bpf_lookup prepared by the caller
func makeBpfSyscall(bpf_lookup *bpf_lookup_elem_t) uintptr {
BPF_MAP_LOOKUP_ELEM := 1 //cmd number
Expand Down
18 changes: 12 additions & 6 deletions daemon/procmon/ebpf/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,19 @@ func getPidFromEbpf(proto string, srcPort uint, srcIP net.IP, dstIP net.IP, dstP
// FindInAlreadyEstablishedTCP searches those TCP connections which were already established at the time
// when opensnitch started
func findInAlreadyEstablishedTCP(proto string, srcPort uint, srcIP net.IP, dstIP net.IP, dstPort uint) (int, int, error) {
var alreadyEstablished map[*daemonNetlink.Socket]int
lock.RLock()
defer lock.RUnlock()

alreadyEstablished.RLock()
var _alreadyEstablished map[*daemonNetlink.Socket]int
if proto == "tcp" {
alreadyEstablished = alreadyEstablishedTCP
_alreadyEstablished = alreadyEstablished.TCP
} else if proto == "tcp6" {
alreadyEstablished = alreadyEstablishedTCPv6
_alreadyEstablished = alreadyEstablished.TCPv6
}
for sock, v := range alreadyEstablished {
alreadyEstablished.RUnlock()

for sock, v := range _alreadyEstablished {
if (*sock).ID.SourcePort == uint16(srcPort) && (*sock).ID.Source.Equal(srcIP) &&
(*sock).ID.Destination.Equal(dstIP) && (*sock).ID.DestinationPort == uint16(dstPort) {
return v, int((*sock).UID), nil
Expand All @@ -131,8 +137,8 @@ func findInAlreadyEstablishedTCP(proto string, srcPort uint, srcIP net.IP, dstIP

//returns true if addr is in the list of this machine's addresses
func findAddressInLocalAddresses(addr net.IP) bool {
localAddressesLock.Lock()
defer localAddressesLock.Unlock()
lock.Lock()
defer lock.Unlock()
for _, a := range localAddresses {
if addr.String() == a.String() {
return true
Expand Down
27 changes: 18 additions & 9 deletions daemon/procmon/ebpf/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func monitorMaps() {
zeroKey := make([]byte, 4)
for {
time.Sleep(time.Second * 1)
if stop {
if isStopped() {
return
}
for name, ebpfMap := range ebpfMaps {
Expand All @@ -26,7 +26,9 @@ func monitorMaps() {
unsafe.Pointer(&zeroKey[0]), unsafe.Pointer(&value[0])); err != nil {
log.Error("eBPF m.LookupElement error: %v", err)
}
lock.RLock()
counterValue := hostByteOrder.Uint64(value)
lock.RUnlock()
if counterValue-ebpfMap.lastPurgedMax > 10000 {
ebpfMap.lastPurgedMax = counterValue - 5000
deleteOld(ebpfMap.bpfmap, name == "tcp6" || name == "udp6", ebpfMap.lastPurgedMax)
Expand All @@ -36,21 +38,22 @@ func monitorMaps() {
}

// maintains a list of this machine's local addresses
// TODO: use netlink.AddrSubscribeWithOptions()
func monitorLocalAddresses() {
for {
addr, err := netlink.AddrList(nil, netlink.FAMILY_ALL)
if err != nil {
log.Error("eBPF error looking up this machine's addresses via netlink: %v", err)
continue
}
localAddressesLock.Lock()
lock.Lock()
localAddresses = nil
for _, a := range addr {
localAddresses = append(localAddresses, a.IP)
}
localAddressesLock.Unlock()
lock.Unlock()
time.Sleep(time.Second * 1)
if stop {
if isStopped() {
return
}
}
Expand All @@ -62,15 +65,16 @@ func monitorLocalAddresses() {
func monitorAlreadyEstablished() {
for {
time.Sleep(time.Second * 1)
if stop {
if isStopped() {
return
}
socketListTCP, err := daemonNetlink.SocketsDump(uint8(syscall.AF_INET), uint8(syscall.IPPROTO_TCP))
if err != nil {
log.Error("eBPF error in dumping TCP sockets via netlink")
continue
}
for aesock := range alreadyEstablishedTCP {
alreadyEstablished.Lock()
for aesock := range alreadyEstablished.TCP {
found := false
for _, sock := range socketListTCP {
if (*aesock).INode == (*sock).INode &&
Expand All @@ -85,16 +89,18 @@ func monitorAlreadyEstablished() {
}
}
if !found {
delete(alreadyEstablishedTCP, aesock)
delete(alreadyEstablished.TCP, aesock)
}
}
alreadyEstablished.Unlock()

socketListTCPv6, err := daemonNetlink.SocketsDump(uint8(syscall.AF_INET6), uint8(syscall.IPPROTO_TCP))
if err != nil {
log.Error("eBPF error in dumping TCPv6 sockets via netlink")
continue
}
for aesock := range alreadyEstablishedTCPv6 {
alreadyEstablished.Lock()
for aesock := range alreadyEstablished.TCPv6 {
found := false
for _, sock := range socketListTCPv6 {
if (*aesock).INode == (*sock).INode &&
Expand All @@ -109,9 +115,10 @@ func monitorAlreadyEstablished() {
}
}
if !found {
delete(alreadyEstablishedTCPv6, aesock)
delete(alreadyEstablished.TCPv6, aesock)
}
}
alreadyEstablished.Unlock()
}
}

Expand Down Expand Up @@ -153,7 +160,9 @@ func deleteOld(bpfmap *elf.Map, isIPv6 bool, maxToDelete uint64) {
continue
}
// last 8 bytes of value is counter value
lock.RLock()
counterValue := hostByteOrder.Uint64(value[16:24])
lock.RUnlock()
if counterValue > maxToDelete {
copy(lookupKey, nextKey)
continue
Expand Down

0 comments on commit 1db03b5

Please sign in to comment.