Skip to content

Commit

Permalink
Merge pull request #7077 from TheThingsNetwork/fix/gs-udp-throughput
Browse files Browse the repository at this point in the history
Improve Gateway Server throughput
  • Loading branch information
adriansmares authored May 15, 2024
2 parents f092c97 + dc7e194 commit ba8be7d
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 64 deletions.
43 changes: 26 additions & 17 deletions pkg/gatewayserver/io/udp/firewall.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"net"
"sync"
"sync/atomic"
"time"

"go.thethings.network/lorawan-stack/v3/pkg/errors"
Expand All @@ -35,8 +36,8 @@ type noopFirewall struct{}
func (noopFirewall) Filter(encoding.Packet) error { return nil }

type addrTime struct {
net.IP
lastSeen time.Time
ip net.IP
lastSeen atomic.Pointer[time.Time]
}

type memoryFirewall struct {
Expand All @@ -51,10 +52,10 @@ func NewMemoryFirewall(ctx context.Context, addrChangeBlock time.Duration) Firew
}
go func() {
ticker := time.NewTicker(addrChangeBlock)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
f.gc()
Expand All @@ -79,28 +80,36 @@ func (f *memoryFirewall) Filter(packet encoding.Packet) error {
}
now := time.Now().UTC()
eui := *packet.GatewayEUI
val, ok := f.m.Load(eui)
if ok {
a := val.(addrTime)
if !a.IP.Equal(packet.GatewayAddr.IP) && a.lastSeen.Add(f.addrChangeBlock).After(now) {
return errAlreadyConnected.WithAttributes(
"connected_ip", a.IP.String(),
"connecting_ip", packet.GatewayAddr.IP.String(),
)
entry := &addrTime{
ip: packet.GatewayAddr.IP,
}
entry.lastSeen.Store(&now)
actual, loaded := f.m.LoadOrStore(eui, entry)
if !loaded {
// This is a new entry. There are no checks or updates to be done.
return nil
}
a := actual.(*addrTime) // nolint:revive
lastSeen := a.lastSeen.Load()
if !a.ip.Equal(packet.GatewayAddr.IP) && lastSeen.Add(f.addrChangeBlock).After(now) {
return errAlreadyConnected.WithAttributes(
"connected_ip", a.ip.String(),
"connecting_ip", packet.GatewayAddr.IP.String(),
)
}
for ; lastSeen.Before(now); lastSeen = a.lastSeen.Load() {
if a.lastSeen.CompareAndSwap(lastSeen, &now) {
return nil
}
}
f.m.Store(eui, addrTime{
IP: packet.GatewayAddr.IP,
lastSeen: now,
})
return nil
}

func (f *memoryFirewall) gc() {
now := time.Now().UTC()
f.m.Range(func(k, val any) bool {
a := val.(addrTime)
if a.lastSeen.Add(f.addrChangeBlock).Before(now) {
a := val.(*addrTime) // nolint:revive
if a.lastSeen.Load().Add(f.addrChangeBlock).Before(now) {
f.m.Delete(k)
}
return true
Expand Down
8 changes: 4 additions & 4 deletions pkg/gatewayserver/io/udp/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,22 @@ import (
const subsystem = "gs_io_udp"

var udpMetrics = &messageMetrics{
messageReceived: prometheus.NewCounter(
messageReceived: metrics.NewCounter(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "message_received_total",
Help: "Total number of received UDP messages",
},
),
messageForwarded: prometheus.NewCounterVec(
messageForwarded: metrics.NewCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "message_forwarded_total",
Help: "Total number of forwarded UDP messages",
},
[]string{"type"},
),
messageDropped: prometheus.NewCounterVec(
messageDropped: metrics.NewCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "message_dropped_total",
Expand All @@ -51,7 +51,7 @@ var udpMetrics = &messageMetrics{
[]string{"error"},
),

unmarshalTypeErrors: prometheus.NewCounterVec(
unmarshalTypeErrors: metrics.NewCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "unmarshal_type_errors_total",
Expand Down
110 changes: 67 additions & 43 deletions pkg/gatewayserver/io/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

type filteringItem struct {
now time.Time
packetBuf []byte
addr *net.UDPAddr
}

type srv struct {
ctx context.Context
config Config
Expand All @@ -48,6 +54,9 @@ type srv struct {
firewall Firewall

limitLogs ratelimit.Interface

filterPool workerpool.WorkerPool[filteringItem]
processPool workerpool.WorkerPool[encoding.Packet]
}

func (*srv) Protocol() string { return "udp" }
Expand All @@ -56,7 +65,7 @@ func (*srv) DutyCycleStyle() scheduling.DutyCycleStyle { return scheduling.Defau

var (
limitLogsConfig = config.RateLimitingProfile{MaxPerMin: 1}
limitLogsSize uint = 1 << 13
limitLogsStoreConfig = ratelimit.StoreConfig{Memory: config.RateLimitingMemory{MaxSize: 1 << 13}}
)

// Serve serves the UDP frontend.
Expand All @@ -71,10 +80,7 @@ func Serve(ctx context.Context, server io.Server, conn *net.UDPConn, conf Config
if conf.RateLimiting.Enable {
firewall = NewRateLimitingFirewall(firewall, conf.RateLimiting.Messages, conf.RateLimiting.Threshold)
}
limitLogs, err := ratelimit.NewProfile(ctx, limitLogsConfig, ratelimit.StoreConfig{
Provider: conf.RateLimiting.Provider,
Redis: conf.RateLimiting.Redis.Client,
})
limitLogs, err := ratelimit.NewProfile(ctx, limitLogsConfig, limitLogsStoreConfig)
if err != nil {
return err
}
Expand All @@ -87,25 +93,33 @@ func Serve(ctx context.Context, server io.Server, conn *net.UDPConn, conf Config

limitLogs: limitLogs,
}
wp := workerpool.NewWorkerPool(workerpool.Config[encoding.Packet]{
s.processPool = workerpool.NewWorkerPool(workerpool.Config[encoding.Packet]{
Component: server,
Context: ctx,
Name: "udp",
Name: "udp_process",
Handler: s.handlePacket,
MaxWorkers: conf.PacketHandlers,
QueueSize: conf.PacketBuffer,
})
s.filterPool = workerpool.NewWorkerPool(workerpool.Config[filteringItem]{
Component: server,
Context: ctx,
Name: "udp_filter",
Handler: s.filterPacket,
MaxWorkers: conf.PacketHandlers,
QueueSize: conf.PacketBuffer,
})
go s.gc()
go func() {
<-ctx.Done()
s.conn.Close()
}()
return s.read(wp)
return s.read()
}

var errPacketType = errors.DefineInvalidArgument("packet_type", "invalid packet type")

func (s *srv) read(wp workerpool.WorkerPool[encoding.Packet]) error {
func (s *srv) read() error {
var buf [65507]byte
for {
n, addr, err := s.conn.ReadFromUDP(buf[:])
Expand All @@ -115,50 +129,60 @@ func (s *srv) read(wp workerpool.WorkerPool[encoding.Packet]) error {
}
return err
}
now := time.Now()
ctx := log.NewContextWithField(s.ctx, "remote_addr", addr.String())
logger := log.FromContext(ctx)

registerMessageReceived(ctx)
if err := ratelimit.Require(s.server.RateLimiter(), ratelimit.GatewayUDPTrafficResource(addr)); err != nil {
if ratelimit.Require(s.limitLogs, ratelimit.NewCustomResource(addr.IP.String())) == nil {
logger.WithError(err).Warn("Drop packet")
}
if err := s.filterPool.Publish(ctx, filteringItem{
now: time.Now(),
packetBuf: slices.Clone(buf[:n]),
addr: addr,
}); err != nil {
log.FromContext(ctx).WithError(err).Warn("Drop packet")
registerMessageDropped(ctx, err)
continue
}
}
}

packetBuf := slices.Clone(buf[:n])
func (s *srv) filterPacket(ctx context.Context, item filteringItem) {
logger := log.FromContext(ctx)
now, packetBuf, addr := item.now, item.packetBuf, item.addr

packet := encoding.Packet{
GatewayAddr: addr,
ReceivedAt: now,
}
if err := packet.UnmarshalBinary(packetBuf); err != nil {
logger.WithError(err).Debug("Failed to unmarshal packet")
registerMessageDropped(ctx, err)
continue
}
switch packet.PacketType {
case encoding.PullData, encoding.PushData, encoding.TxAck:
default:
logger.WithField("packet_type", packet.PacketType).Debug("Invalid packet type for uplink")
registerMessageDropped(ctx, errPacketType)
continue
}
if packet.GatewayEUI == nil {
logger.Debug("No gateway EUI in uplink message")
registerMessageDropped(ctx, errNoEUI)
continue
if err := ratelimit.Require(s.server.RateLimiter(), ratelimit.GatewayUDPTrafficResource(addr)); err != nil {
if ratelimit.Require(s.limitLogs, ratelimit.NewCustomResource(addr.IP.String())) == nil {
logger.WithError(err).Warn("Drop packet")
}
registerMessageDropped(ctx, err)
return
}

if err := wp.Publish(ctx, packet); err != nil {
logger.WithError(err).Warn("UDP packet publishing failed")
registerMessageDropped(ctx, err)
continue
}
registerMessageForwarded(ctx, packet.PacketType)
packet := encoding.Packet{
GatewayAddr: addr,
ReceivedAt: now,
}
if err := packet.UnmarshalBinary(packetBuf); err != nil {
logger.WithError(err).Debug("Failed to unmarshal packet")
registerMessageDropped(ctx, err)
return
}
switch packet.PacketType {
case encoding.PullData, encoding.PushData, encoding.TxAck:
default:
logger.WithField("packet_type", packet.PacketType).Debug("Invalid packet type for uplink")
registerMessageDropped(ctx, errPacketType)
return
}
if packet.GatewayEUI == nil {
logger.Debug("No gateway EUI in uplink message")
registerMessageDropped(ctx, errNoEUI)
return
}

if err := s.processPool.Publish(ctx, packet); err != nil {
logger.WithError(err).Warn("UDP packet publishing failed")
registerMessageDropped(ctx, err)
return
}
registerMessageForwarded(ctx, packet.PacketType)
}

func (s *srv) handlePacket(ctx context.Context, packet encoding.Packet) {
Expand Down

0 comments on commit ba8be7d

Please sign in to comment.