Skip to content

Commit

Permalink
Merge pull request #7080 from TheThingsNetwork/feature/upgrade-thrott…
Browse files Browse the repository at this point in the history
…led-usage

Upgrade throttled usage
  • Loading branch information
adriansmares authored May 15, 2024
2 parents 5d83cbd + b6ce6e2 commit 2457908
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pkg/applicationserver/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func registerDropUp(ctx context.Context, msg *ttnpb.ApplicationUp, err error) {
func registerUplinkLatency(ctx context.Context, msg *ttnpb.ApplicationUplink) {
asMetrics.nsAsUplinkLatency.WithLabelValues(ctx).Observe(time.Since(*ttnpb.StdTime(msg.ReceivedAt)).Seconds())
for _, meta := range msg.RxMetadata {
if stdTime := ttnpb.StdTime(meta.Time); meta.Time != nil && !stdTime.IsZero() {
if stdTime := ttnpb.StdTime(meta.ReceivedAt); stdTime != nil && !stdTime.IsZero() {
asMetrics.gtwAsUplinkLatency.WithLabelValues(ctx).Observe(time.Since(*stdTime).Seconds())
}
}
Expand Down
26 changes: 20 additions & 6 deletions pkg/ratelimit/rate_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ratelimit

import (
"context"
"fmt"

"github.com/throttled/throttled/v2"
"go.thethings.network/lorawan-stack/v3/pkg/log"
Expand All @@ -40,18 +41,20 @@ func (*NoopRateLimiter) RateLimit(Resource) (bool, Result) {

type rateLimiter struct {
ctx context.Context
limiter throttled.RateLimiter
limiter throttled.RateLimiterCtx
}

// RateLimit implements ratelimit.Interface.
func (l *rateLimiter) RateLimit(resource Resource) (bool, Result) {
ok, result, err := l.limiter.RateLimit(resource.Key(), 1)
ok, result, err := l.limiter.RateLimitCtx(l.ctx, resource.Key(), 1)
if err != nil {
// NOTE: The memstore.MemStore implementation does not fail.
log.FromContext(l.ctx).Error("Rate limiter failed")
return true, Result{}
log.FromContext(l.ctx).
WithError(err).
WithFields(logFields(resource)).
Error("Rate limiter failed")
// NOTE: The missing return here is intentional - the underlying implementation
// will still return a valid ok and result even on error.
}

return ok, Result{
Limit: result.Limit,
Remaining: result.Remaining,
Expand Down Expand Up @@ -94,3 +97,14 @@ func Require(limiter Interface, resource Resource) error {
type RateLimitKeyer interface {
RateLimitKey() string
}

func logFields(resource Resource) log.Fielder {
fields := append(
make([]any, 0, 2+2*len(resource.Classes())),
"resource_key", resource.Key(),
)
for i, c := range resource.Classes() {
fields = append(fields, fmt.Sprintf("resource_class_%d", i), c)
}
return log.Fields(fields...)
}
6 changes: 3 additions & 3 deletions pkg/ratelimit/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func grpcStreamAcceptResource(ctx context.Context, fullMethod string) Resource {
// grpcStreamUpResource represents client messages for a gRPC server stream.
func grpcStreamUpResource(ctx context.Context, fullMethod string) Resource {
return &resource{
key: fmt.Sprintf("grpc:stream:up:%s:streamID:%s", fullMethod, events.NewCorrelationID()),
key: fmt.Sprintf("grpc:stream:up:%s:id:%s", fullMethod, events.NewCorrelationID()),
classes: []string{fmt.Sprintf("grpc:stream:up:%s", fullMethod), "grpc:stream:up"},
}
}
Expand All @@ -140,10 +140,10 @@ func GatewayAcceptMQTTConnectionResource(remoteAddr string) Resource {
}
}

// GatewayUDPTrafficResource represents UDP gateway traffic from a remote IP address.
// GatewayUDPTrafficResource represents UDP gateway traffic from a remote address.
func GatewayUDPTrafficResource(addr *net.UDPAddr) Resource {
return &resource{
key: fmt.Sprintf("gs:up:udp:ip:%s", addr.IP.String()),
key: fmt.Sprintf("gs:up:udp:addr:%s", addr.String()),
classes: []string{"gs:up:udp", "gs:up"},
}
}
Expand Down

0 comments on commit 2457908

Please sign in to comment.