diff --git a/pkg/gatewayserver/io/udp/config.go b/pkg/gatewayserver/io/udp/config.go index 5ec36a60d4..5df21321f1 100644 --- a/pkg/gatewayserver/io/udp/config.go +++ b/pkg/gatewayserver/io/udp/config.go @@ -16,18 +16,14 @@ package udp import ( "time" - - "go.thethings.network/lorawan-stack/v3/pkg/config" ) // RateLimitingConfig contains configuration settings for the rate limiting // capabilities of the UDP gateway frontend firewall. type RateLimitingConfig struct { - Enable bool `name:"enable" description:"Enable rate limiting for gateways"` - Messages int `name:"messages" description:"Number of past messages to check timestamp for"` - Threshold time.Duration `name:"threshold" description:"Filter packet if timestamp is not newer than the older timestamps of the previous messages by this threshold"` //nolint:lll - Provider string `name:"provider" description:"Rate limiting store provider (memory, redis)"` - Redis config.RateLimitingRedis `name:"redis" description:"In-Redis rate limiting store configuration"` + Enable bool `name:"enable" description:"Enable rate limiting for gateways"` + Messages int `name:"messages" description:"Number of past messages to check timestamp for"` + Threshold time.Duration `name:"threshold" description:"Filter packet if timestamp is not newer than the older timestamps of the previous messages by this threshold"` //nolint:lll } // Config contains configuration settings for the UDP gateway frontend. diff --git a/pkg/gatewayserver/io/udp/udp.go b/pkg/gatewayserver/io/udp/udp.go index 8797a7a2fa..56705d2dcf 100644 --- a/pkg/gatewayserver/io/udp/udp.go +++ b/pkg/gatewayserver/io/udp/udp.go @@ -55,8 +55,9 @@ type srv struct { limitLogs ratelimit.Interface - filterPool workerpool.WorkerPool[filteringItem] - processPool workerpool.WorkerPool[encoding.Packet] + filterQueues sync.Map // netip.AddrPort -> filterQueue + filterPool workerpool.WorkerPool[filteringItem] + processPool workerpool.WorkerPool[encoding.Packet] } func (*srv) Protocol() string { return "udp" } @@ -105,7 +106,7 @@ func Serve(ctx context.Context, server io.Server, conn *net.UDPConn, conf Config Component: server, Context: ctx, Name: "udp_filter", - Handler: s.filterPacket, + Handler: s.preFilterPacket, MaxWorkers: conf.PacketHandlers, QueueSize: conf.PacketBuffer, }) @@ -143,6 +144,65 @@ func (s *srv) read() error { } } +func (s *srv) preFilterPacket(ctx context.Context, item filteringItem) { + addrPort := item.addr.AddrPort() + + type filterQueue struct { + items chan filteringItem + closed chan struct{} + } + q := filterQueue{ + items: make(chan filteringItem), + closed: make(chan struct{}), + } + + for { + actual, loaded := s.filterQueues.LoadOrStore(addrPort, q) + if loaded { + // At this point we're not the goroutine which handles this address. + // As such, we attempt to send the item to the channel. + q := actual.(filterQueue) // nolint: revive + select { + case <-ctx.Done(): + return + case q.items <- item: + // The goroutine which handles this address has received the item. + return + case <-q.closed: + // Between the actual load and store, and the channel write, the channel might have been closed. + continue + } + } + break + } + + defer func() { + if !s.filterQueues.CompareAndDelete(addrPort, q) { + panic("unreachable") + } + close(q.closed) + }() + + // At this point we are the goroutine which handles this address. + // As such, we start by handling our own item. + s.filterPacket(ctx, item) + // Clear the item to allow GC to collect it. + item = filteringItem{} + + for { + select { + case <-ctx.Done(): + return + case item := <-q.items: + // We have received an item from another goroutine which we need to handle. + s.filterPacket(ctx, item) + default: + // We don't have any more work items. + return + } + } +} + func (s *srv) filterPacket(ctx context.Context, item filteringItem) { logger := log.FromContext(ctx) now, packetBuf, addr := item.now, item.packetBuf, item.addr @@ -177,6 +237,19 @@ func (s *srv) filterPacket(ctx context.Context, item filteringItem) { return } + if err := s.firewall.Filter(packet); err != nil { + registerMessageDropped(ctx, err) + if !errors.IsResourceExhausted(err) { + goto filtered + } + if ratelimit.Require(s.limitLogs, ratelimit.NewCustomResource(packet.GatewayEUI.String())) != nil { + return + } + filtered: + logger.WithError(err).Warn("Packet filtered") + return + } + if err := s.processPool.Publish(ctx, packet); err != nil { logger.WithError(err).Warn("UDP packet publishing failed") registerMessageDropped(ctx, err) @@ -197,18 +270,6 @@ func (s *srv) handlePacket(ctx context.Context, packet encoding.Packet) { } } - if err := s.firewall.Filter(packet); err != nil { - if !errors.IsResourceExhausted(err) { - goto filtered - } - if ratelimit.Require(s.limitLogs, ratelimit.NewCustomResource(eui.String())) != nil { - return - } - filtered: - logger.WithError(err).Warn("Packet filtered") - return - } - cs, err := s.connect(ctx, eui, packet.GatewayAddr) if err != nil { logger.WithError(err).Warn("Failed to connect")