Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Linearize UDP packet filtering #7081

Merged
merged 3 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions pkg/gatewayserver/io/udp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@ package udp

import (
"time"

"go.thethings.network/lorawan-stack/v3/pkg/config"
)

// RateLimitingConfig contains configuration settings for the rate limiting
// capabilities of the UDP gateway frontend firewall.
type RateLimitingConfig struct {
Enable bool `name:"enable" description:"Enable rate limiting for gateways"`
Messages int `name:"messages" description:"Number of past messages to check timestamp for"`
Threshold time.Duration `name:"threshold" description:"Filter packet if timestamp is not newer than the older timestamps of the previous messages by this threshold"` //nolint:lll
Provider string `name:"provider" description:"Rate limiting store provider (memory, redis)"`
Redis config.RateLimitingRedis `name:"redis" description:"In-Redis rate limiting store configuration"`
Enable bool `name:"enable" description:"Enable rate limiting for gateways"`
Messages int `name:"messages" description:"Number of past messages to check timestamp for"`
Threshold time.Duration `name:"threshold" description:"Filter packet if timestamp is not newer than the older timestamps of the previous messages by this threshold"` //nolint:lll
}

// Config contains configuration settings for the UDP gateway frontend.
Expand Down
91 changes: 76 additions & 15 deletions pkg/gatewayserver/io/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ type srv struct {

limitLogs ratelimit.Interface

filterPool workerpool.WorkerPool[filteringItem]
processPool workerpool.WorkerPool[encoding.Packet]
filterQueues sync.Map // netip.AddrPort -> filterQueue
filterPool workerpool.WorkerPool[filteringItem]
processPool workerpool.WorkerPool[encoding.Packet]
}

func (*srv) Protocol() string { return "udp" }
Expand Down Expand Up @@ -105,7 +106,7 @@ func Serve(ctx context.Context, server io.Server, conn *net.UDPConn, conf Config
Component: server,
Context: ctx,
Name: "udp_filter",
Handler: s.filterPacket,
Handler: s.preFilterPacket,
MaxWorkers: conf.PacketHandlers,
QueueSize: conf.PacketBuffer,
})
Expand Down Expand Up @@ -143,6 +144,65 @@ func (s *srv) read() error {
}
}

func (s *srv) preFilterPacket(ctx context.Context, item filteringItem) {
addrPort := item.addr.AddrPort()

type filterQueue struct {
items chan filteringItem
closed chan struct{}
}
q := filterQueue{
items: make(chan filteringItem),
closed: make(chan struct{}),
}

for {
actual, loaded := s.filterQueues.LoadOrStore(addrPort, q)
if loaded {
// At this point we're not the goroutine which handles this address.
// As such, we attempt to send the item to the channel.
q := actual.(filterQueue) // nolint: revive
select {
case <-ctx.Done():
return
case q.items <- item:
// The goroutine which handles this address has received the item.
return
case <-q.closed:
// Between the actual load and store, and the channel write, the channel might have been closed.
continue
}
}
break
}

defer func() {
if !s.filterQueues.CompareAndDelete(addrPort, q) {
panic("unreachable")
}
close(q.closed)
}()

// At this point we are the goroutine which handles this address.
// As such, we start by handling our own item.
s.filterPacket(ctx, item)
// Clear the item to allow GC to collect it.
item = filteringItem{}

for {
select {
case <-ctx.Done():
return
case item := <-q.items:
// We have received an item from another goroutine which we need to handle.
s.filterPacket(ctx, item)
default:
// We don't have any more work items.
return
}
}
}

func (s *srv) filterPacket(ctx context.Context, item filteringItem) {
logger := log.FromContext(ctx)
now, packetBuf, addr := item.now, item.packetBuf, item.addr
Expand Down Expand Up @@ -177,6 +237,19 @@ func (s *srv) filterPacket(ctx context.Context, item filteringItem) {
return
}

if err := s.firewall.Filter(packet); err != nil {
registerMessageDropped(ctx, err)
if !errors.IsResourceExhausted(err) {
goto filtered
}
if ratelimit.Require(s.limitLogs, ratelimit.NewCustomResource(packet.GatewayEUI.String())) != nil {
return
}
filtered:
logger.WithError(err).Warn("Packet filtered")
return
}

if err := s.processPool.Publish(ctx, packet); err != nil {
logger.WithError(err).Warn("UDP packet publishing failed")
registerMessageDropped(ctx, err)
Expand All @@ -197,18 +270,6 @@ func (s *srv) handlePacket(ctx context.Context, packet encoding.Packet) {
}
}

if err := s.firewall.Filter(packet); err != nil {
if !errors.IsResourceExhausted(err) {
goto filtered
}
if ratelimit.Require(s.limitLogs, ratelimit.NewCustomResource(eui.String())) != nil {
return
}
filtered:
logger.WithError(err).Warn("Packet filtered")
return
}

cs, err := s.connect(ctx, eui, packet.GatewayAddr)
if err != nil {
logger.WithError(err).Warn("Failed to connect")
Expand Down
Loading