diff --git a/.golangci.yml b/.golangci.yml index cbd0fc46f8..445e35f88e 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,7 +1,3 @@ -run: - skip-dirs: - - node_modules - linters: disable-all: true enable: @@ -113,3 +109,6 @@ issues: - linters: - paralleltest text: 'does not use range value in test Run' + exclude-dirs: + - node_modules + diff --git a/CHANGELOG.md b/CHANGELOG.md index b24f23f0e3..40a1e8f2e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,10 +11,16 @@ For details about compatibility between different releases, see the **Commitment ### Added +- Add the attributes field to all ApplicationUp messages. +- Add the locations, version_ids, network_ids fields to ApplicationUp messages that were missing them, i.e. ApplicationJoinAccept, ApplicationDownlink, ApplicationDownlinkFailed, ApplicationInvalidatedDownlinks, ApplicationServiceData. +- Add Timeout and Cache fields in the EndDeviceMetadataStorageConfig of the AS. + ### Changed ### Deprecated +- Deprecate the Location field (and its subfields) in the EndDeviceMetadataStorageConfig of AS. + ### Removed ### Fixed diff --git a/cmd/internal/shared/applicationserver/config.go b/cmd/internal/shared/applicationserver/config.go index 0a680a3819..bc16ad7d96 100644 --- a/cmd/internal/shared/applicationserver/config.go +++ b/cmd/internal/shared/applicationserver/config.go @@ -49,14 +49,12 @@ var DefaultApplicationServerConfig = applicationserver.Config{ Downlinks: web.DownlinksConfig{PublicAddress: shared.DefaultPublicURL + "/api/v3"}, }, EndDeviceMetadataStorage: applicationserver.EndDeviceMetadataStorageConfig{ - Location: applicationserver.EndDeviceLocationStorageConfig{ - Timeout: 5 * time.Second, - Cache: applicationserver.EndDeviceLocationStorageCacheConfig{ - Enable: true, - MinRefreshInterval: 15 * time.Minute, - MaxRefreshInterval: 4 * time.Hour, - TTL: 14 * 24 * time.Hour, - }, + Timeout: 5 * time.Second, + Cache: applicationserver.EndDeviceRegistryStorageCacheConfig{ + Enable: true, + MinRefreshInterval: 15 * time.Minute, + MaxRefreshInterval: 4 * time.Hour, + TTL: 14 * 24 * time.Hour, }, }, Distribution: applicationserver.DistributionConfig{ diff --git a/cmd/ttn-lw-stack/commands/start.go b/cmd/ttn-lw-stack/commands/start.go index ec3eaf7b09..82ff347b4a 100644 --- a/cmd/ttn-lw-stack/commands/start.go +++ b/cmd/ttn-lw-stack/commands/start.go @@ -445,21 +445,21 @@ var startCommand = &cobra.Command{ } config.AS.Webhooks.Registry = webhookRegistry } - if cache := &config.AS.EndDeviceMetadataStorage.Location.Cache; cache.Enable { + if cache := &config.AS.EndDeviceMetadataStorage.Cache; cache.Enable { switch config.Cache.Service { case "redis": - cache.Cache = &asmetaredis.EndDeviceLocationCache{ - Redis: redis.New(config.Cache.Redis.WithNamespace("as", "metadata", "locations")), + cache.Cache = &asmetaredis.EndDeviceCache{ + Redis: redis.New(config.Cache.Redis.WithNamespace("as", "metadata", "end-devices")), } default: cache.Enable = false } } - locationRegistry, err := config.AS.EndDeviceMetadataStorage.Location.NewRegistry(ctx, c) + endDeviceRegistry, err := config.AS.EndDeviceMetadataStorage.NewRegistry(ctx, c) if err != nil { return shared.ErrInitializeApplicationServer.WithCause(err) } - config.AS.EndDeviceMetadataStorage.Location.Registry = locationRegistry + config.AS.EndDeviceMetadataStorage.Registry = endDeviceRegistry as, err := applicationserver.New(c, &config.AS) if err != nil { return shared.ErrInitializeApplicationServer.WithCause(err) diff --git a/config/messages.json b/config/messages.json index a474d02dbf..b1b0cd98ab 100644 --- a/config/messages.json +++ b/config/messages.json @@ -2861,13 +2861,40 @@ "file": "io.go" } }, + "error:pkg/applicationserver/metadata/redis:cache_entry_malformed": { + "translations": { + "en": "cache entry malformed" + }, + "description": { + "package": "pkg/applicationserver/metadata/redis", + "file": "redis.go" + } + }, "error:pkg/applicationserver/metadata/redis:cache_miss": { "translations": { "en": "cache miss" }, "description": { "package": "pkg/applicationserver/metadata/redis", - "file": "location_cache.go" + "file": "redis.go" + } + }, + "error:pkg/applicationserver/metadata:end_device_not_found": { + "translations": { + "en": "end device not found" + }, + "description": { + "package": "pkg/applicationserver/metadata", + "file": "metadata.go" + } + }, + "error:pkg/applicationserver/metadata:field_mask_path_not_supported": { + "translations": { + "en": "field mask path `{path}` is not supported" + }, + "description": { + "package": "pkg/applicationserver/metadata", + "file": "metadata.go" } }, "error:pkg/applicationserver/redis:application_uid": { diff --git a/data/lorawan-devices b/data/lorawan-devices index a1ab482267..d3e556caf0 160000 --- a/data/lorawan-devices +++ b/data/lorawan-devices @@ -1 +1 @@ -Subproject commit a1ab48226792e28cd1dc46f15cc31174caecedcf +Subproject commit d3e556caf03229dbad7458441e28d0674fab22aa diff --git a/data/lorawan-frequency-plans b/data/lorawan-frequency-plans index 258b3f94d5..906ef3079c 160000 --- a/data/lorawan-frequency-plans +++ b/data/lorawan-frequency-plans @@ -1 +1 @@ -Subproject commit 258b3f94d52c11d3191fb29333e7a7920a3bc99e +Subproject commit 906ef3079ccb3ea6ae3f0c29078eee3fe3954440 diff --git a/pkg/applicationserver/applicationserver.go b/pkg/applicationserver/applicationserver.go index 1559c1736f..48aeeb306c 100644 --- a/pkg/applicationserver/applicationserver.go +++ b/pkg/applicationserver/applicationserver.go @@ -81,7 +81,7 @@ type ApplicationServer struct { linkRegistry LinkRegistry deviceRegistry DeviceRegistry - locationRegistry metadata.EndDeviceLocationRegistry + endDeviceRegistry metadata.EndDeviceRegistry formatters messageprocessors.MapPayloadProcessor webhooks ioweb.Webhooks webhookTemplates ioweb.TemplateStore @@ -156,14 +156,14 @@ func New(c *component.Component, conf *Config) (as *ApplicationServer, err error } as = &ApplicationServer{ - Component: c, - ctx: ctx, - config: conf, - linkRegistry: conf.Links, - deviceRegistry: wrapEndDeviceRegistryWithReplacedFields(conf.Devices, replacedEndDeviceFields...), - appPkgRegistry: conf.Packages.Registry, - locationRegistry: conf.EndDeviceMetadataStorage.Location.Registry, - formatters: make(messageprocessors.MapPayloadProcessor), + Component: c, + ctx: ctx, + config: conf, + linkRegistry: conf.Links, + deviceRegistry: wrapEndDeviceRegistryWithReplacedFields(conf.Devices, replacedEndDeviceFields...), + appPkgRegistry: conf.Packages.Registry, + endDeviceRegistry: conf.EndDeviceMetadataStorage.Registry, + formatters: make(messageprocessors.MapPayloadProcessor), clusterDistributor: distribution.NewPubSubDistributor( ctx, c, @@ -1049,6 +1049,14 @@ func (as *ApplicationServer) handleJoinAccept( return err } + if entity, err := as.endDeviceRegistry.Get(ctx, ids, []string{"attributes"}); err != nil { + log.FromContext(ctx).WithError(err).Warn( + "Failed to retrieve end device attributes on join-accept", + ) + } else { + joinAccept.Attributes = entity.Attributes + } + // Publish last seen event. if err := as.deviceLastSeenPool.Publish(ctx, lastSeenAtInfo{ ids: ids, @@ -1159,6 +1167,7 @@ func (as *ApplicationServer) publishNormalizedUplink(ctx context.Context, info u Locations: info.uplink.Locations, VersionIds: info.uplink.VersionIds, NetworkIds: info.uplink.NetworkIds, + Attributes: info.uplink.Attributes, }, }, Simulated: info.simulated, @@ -1208,6 +1217,15 @@ func (as *ApplicationServer) handleUplink(ctx context.Context, info uplinkInfo) return err } + if entity, err := as.endDeviceRegistry.Get(ctx, info.ids, []string{"attributes", "locations"}); err != nil { + log.FromContext(ctx).WithError(err).Warn( + "Failed to retrieve the end device attributes and locations on uplink", + ) + } else { + info.uplink.Attributes = entity.Attributes + info.uplink.Locations = entity.Locations + } + if !as.skipPayloadCrypto(ctx, info.link, dev, dev.Session) { if err := as.decryptAndDecodeUplink(ctx, dev, info.uplink, info.link.DefaultFormatters); err != nil { return err @@ -1227,11 +1245,6 @@ func (as *ApplicationServer) handleUplink(ctx context.Context, info uplinkInfo) } // Set location in message and publish location solved if the payload contains location information. - if locations, err := as.locationRegistry.Get(ctx, info.ids); err != nil { - log.FromContext(ctx).WithError(err).Warn("Failed to retrieve end device locations") - } else { - info.uplink.Locations = locations - } loc := as.locationFromPayload(info.uplink) if loc != nil { if info.uplink.Locations == nil { @@ -1285,10 +1298,14 @@ func (as *ApplicationServer) handleSimulatedUplink(ctx context.Context, info upl return err } - if locations, err := as.locationRegistry.Get(ctx, info.ids); err != nil { - log.FromContext(ctx).WithError(err).Warn("Failed to retrieve end device locations") + if entity, err := as.endDeviceRegistry.Get(ctx, info.ids, []string{"attributes", "locations"}); err != nil { + log.FromContext(ctx).WithError(err).Warn( + "Failed to retrieve the end device from entity registry on simulated uplink", + ) } else { - info.uplink.Locations = locations + info.uplink.Attributes = entity.Attributes + info.uplink.Locations = entity.Locations + } if err := as.decodeUplink(ctx, dev, info.uplink, info.link.DefaultFormatters); err != nil { @@ -1370,7 +1387,19 @@ func (as *ApplicationServer) handleDownlinkQueueInvalidated( return dev, mask, nil }, ) - return pass, err + if err != nil { + return pass, err + } + + if entity, err := as.endDeviceRegistry.Get(ctx, ids, []string{"attributes"}); err != nil { + log.FromContext(ctx).WithError(err).Warn( + "Failed to retrieve end device attributes on downlink queue invalidated", + ) + } else { + invalid.Attributes = entity.Attributes + } + + return pass, nil } func (as *ApplicationServer) handleDownlinkNack( @@ -1455,17 +1484,47 @@ func (as *ApplicationServer) handleDownlinkNack( return dev, mask, nil }, ) - return err + if err != nil { + return err + } + + if _, err = as.endDeviceRegistry.Set(ctx, ids, []string{"attributes"}, + func(entity *ttnpb.EndDevice) (*ttnpb.EndDevice, []string, error) { + if entity == nil { + return nil, nil, errDeviceNotFound.WithAttributes("device_uid", unique.ID(ctx, ids)) + } + msg.Attributes = entity.Attributes + return entity, []string{""}, nil + }, + ); err != nil { + log.FromContext(ctx).WithError(err).Warn( + "Failed to retrieve end device attributes on downlink nack", + ) + } + + return nil } -// handleLocationSolved saves the provided *ttnpb.ApplicationLocation in the Entity Registry as part of the device locations. -// Locations provided by other services will be maintained. +// handleLocationSolved saves the provided *ttnpb.ApplicationLocation in the Entity Registry as part of the device +// locations. Locations provided by other services will be maintained. func (as *ApplicationServer) handleLocationSolved(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, msg *ttnpb.ApplicationLocation, link *ttnpb.ApplicationLink) error { defer trace.StartRegion(ctx, "handle location solved").End() - if _, err := as.locationRegistry.Merge(ctx, ids, map[string]*ttnpb.Location{ - msg.Service: msg.Location, - }); err != nil { + if _, err := as.endDeviceRegistry.Set(ctx, ids, []string{"locations"}, + func(stored *ttnpb.EndDevice) (*ttnpb.EndDevice, []string, error) { + if stored == nil { + return nil, nil, errDeviceNotFound.WithAttributes("device_uid", unique.ID(ctx, ids)) + } + + if len(stored.Locations) == 0 { + stored.Locations = make(map[string]*ttnpb.Location) + } + + stored.Locations[msg.Service] = msg.Location + + return stored, []string{"locations"}, nil + }, + ); err != nil { log.FromContext(ctx).WithError(err).Warn("Failed to merge end device locations") } return nil @@ -1486,6 +1545,15 @@ func (as *ApplicationServer) decryptDownlinkMessage(ctx context.Context, ids *tt if err != nil { return err } + + if entity, err := as.endDeviceRegistry.Get(ctx, ids, []string{"attributes"}); err != nil { + log.FromContext(ctx).WithError(err).Warn( + "Failed to retrieve end device attributes on downlink message", + ) + } else { + msg.Attributes = entity.Attributes + } + var session *ttnpb.Session switch { case dev.Session != nil && bytes.Equal(dev.Session.Keys.SessionKeyId, msg.SessionKeyId): diff --git a/pkg/applicationserver/applicationserver_test.go b/pkg/applicationserver/applicationserver_test.go index 0c37b39110..ce5999de30 100644 --- a/pkg/applicationserver/applicationserver_test.go +++ b/pkg/applicationserver/applicationserver_test.go @@ -303,9 +303,7 @@ func TestApplicationServer(t *testing.T) { }, }, EndDeviceMetadataStorage: applicationserver.EndDeviceMetadataStorageConfig{ - Location: applicationserver.EndDeviceLocationStorageConfig{ - Registry: metadata.NewNoopEndDeviceLocationRegistry(), - }, + Registry: metadata.NewNoopEndDeviceRegistry(), }, Downlinks: applicationserver.DownlinksConfig{ ConfirmationConfig: applicationserver.ConfirmationConfig{ @@ -2414,9 +2412,7 @@ func TestSkipPayloadCrypto(t *testing.T) { }, }, EndDeviceMetadataStorage: applicationserver.EndDeviceMetadataStorageConfig{ - Location: applicationserver.EndDeviceLocationStorageConfig{ - Registry: metadata.NewNoopEndDeviceLocationRegistry(), - }, + Registry: metadata.NewNoopEndDeviceRegistry(), }, Downlinks: applicationserver.DownlinksConfig{ ConfirmationConfig: applicationserver.ConfirmationConfig{ @@ -2916,9 +2912,7 @@ func TestLocationFromPayload(t *testing.T) { }, }, EndDeviceMetadataStorage: applicationserver.EndDeviceMetadataStorageConfig{ - Location: applicationserver.EndDeviceLocationStorageConfig{ - Registry: metadata.NewClusterEndDeviceLocationRegistry(c, (1<<4)*Timeout), - }, + Registry: metadata.NewClusterEndDeviceRegistry(c, (1<<4)*Timeout), }, Downlinks: applicationserver.DownlinksConfig{ ConfirmationConfig: applicationserver.ConfirmationConfig{ @@ -3101,9 +3095,7 @@ func TestUplinkNormalized(t *testing.T) { }, }, EndDeviceMetadataStorage: applicationserver.EndDeviceMetadataStorageConfig{ - Location: applicationserver.EndDeviceLocationStorageConfig{ - Registry: metadata.NewClusterEndDeviceLocationRegistry(c, (1<<4)*Timeout), - }, + Registry: metadata.NewClusterEndDeviceRegistry(c, (1<<4)*Timeout), }, Downlinks: applicationserver.DownlinksConfig{ ConfirmationConfig: applicationserver.ConfirmationConfig{ diff --git a/pkg/applicationserver/config.go b/pkg/applicationserver/config.go index 54b46ca378..260ee8c494 100644 --- a/pkg/applicationserver/config.go +++ b/pkg/applicationserver/config.go @@ -49,9 +49,9 @@ type InteropConfig struct { // EndDeviceFetcherConfig represents configuration for the end device fetcher in Application Server. type EndDeviceFetcherConfig struct { - Timeout time.Duration `name:"timeout" description:"Timeout of the end device retrival operation"` - Cache EndDeviceFetcherCacheConfig `name:"cache" description:"Cache configuration options for the end device fetcher"` - CircuitBreaker EndDeviceFetcherCircuitBreakerConfig `name:"circuit-breaker" description:"Circuit breaker options for the end device fetcher"` + Timeout time.Duration `name:"timeout" description:"Timeout of the end device retrival operation"` // nolint:lll + Cache EndDeviceFetcherCacheConfig `name:"cache" description:"Cache configuration options for the end device fetcher"` // nolint:lll + CircuitBreaker EndDeviceFetcherCircuitBreakerConfig `name:"circuit-breaker" description:"Circuit breaker options for the end device fetcher"` // nolint:lll } // EndDeviceFetcherCacheConfig represents configuration for device information caching in Application Server. @@ -65,33 +65,47 @@ type EndDeviceFetcherCacheConfig struct { type EndDeviceFetcherCircuitBreakerConfig struct { Enable bool `name:"enable" description:"Enable circuit breaker behavior on burst errors"` Timeout time.Duration `name:"timeout" description:"Timeout after which the circuit breaker closes"` - Threshold int `name:"threshold" description:"Number of failed fetching attempts after which the circuit breaker opens"` + Threshold int `name:"threshold" description:"Number of failed fetching attempts after which the circuit breaker opens"` // nolint:lll } // EndDeviceMetadataStorageConfig represents the configuration of end device metadata operations. type EndDeviceMetadataStorageConfig struct { - Location EndDeviceLocationStorageConfig `name:"location"` + // DEPRECATED: use the EndDeviceRegistry for location storage. + Location EndDeviceLocationStorageConfig `name:"location" description:"DEPRECATED setting."` + + Registry metadata.EndDeviceRegistry `name:"-"` + Timeout time.Duration `name:"timeout" description:"Timeout of the entity registry retrieval operation"` // nolint:lll + Cache EndDeviceRegistryStorageCacheConfig `name:"cache"` +} + +// EndDeviceRegistryStorageCacheConfig represents the configuration of entity registry caching. +type EndDeviceRegistryStorageCacheConfig struct { + Cache metadata.EndDeviceRegistryCache `name:"-"` + Enable bool `name:"enable" description:"Enable caching of end device items"` // nolint:lll + MinRefreshInterval time.Duration `name:"min-refresh-interval" description:"Minimum time interval between two asynchronous refreshes"` // nolint:lll + MaxRefreshInterval time.Duration `name:"max-refresh-interval" description:"Maximum time interval between two asynchronous refreshes"` // nolint:lll + TTL time.Duration `name:"eviction-ttl" description:"Time to live of cached end devices"` // nolint:lll } // EndDeviceLocationStorageConfig represents the configuration of end device locations storage. +// DEPRECATED: use the metadata.EndDeviceRegistry for location storage. type EndDeviceLocationStorageConfig struct { - Registry metadata.EndDeviceLocationRegistry `name:"-"` - Timeout time.Duration `name:"timeout" description:"Timeout of the end device retrival operation"` - Cache EndDeviceLocationStorageCacheConfig `name:"cache"` + Timeout time.Duration `name:"timeout" description:"Timeout of the end device retrieval operation. DEPRECATED: use the end device metadata storage directly instead."` // nolint:lll + Cache EndDeviceLocationStorageCacheConfig `name:"cache" description:"DEPRECATED: use the end device metadata storage directly instead."` // nolint:lll } // EndDeviceLocationStorageCacheConfig represents the configuration of end device location registry caching. +// DEPRECATED: use the metadata.EndDeviceRegistryCache and the locations field of end devices for caching. type EndDeviceLocationStorageCacheConfig struct { - Cache metadata.EndDeviceLocationCache `name:"-"` - Enable bool `name:"enable" description:"Enable caching of end device locations"` - MinRefreshInterval time.Duration `name:"min-refresh-interval" description:"Minimum time interval between two asynchronous refreshes"` - MaxRefreshInterval time.Duration `name:"max-refresh-interval" description:"Maximum time interval between two asynchronous refreshes"` - TTL time.Duration `name:"eviction-ttl" description:"Time to live of cached locations"` + Enable bool `name:"enable" description:"Enable caching of end device locations. DEPRECATED: use the end device metadata storage directly instead."` // nolint:lll + MinRefreshInterval time.Duration `name:"min-refresh-interval" description:"Minimum time interval between two asynchronous refreshes. DEPRECATED: use the end device metadata storage directly instead."` // nolint:lll + MaxRefreshInterval time.Duration `name:"max-refresh-interval" description:"Maximum time interval between two asynchronous refreshes. DEPRECATED: use the end device metadata storage directly instead."` // nolint:lll + TTL time.Duration `name:"eviction-ttl" description:"Time to live of cached locations. DEPRECATED: use the end device metadata storage directly instead."` // nolint:lll } // FormattersConfig represents the configuration for payload formatters. type FormattersConfig struct { - MaxParameterLength int `name:"max-parameter-length" description:"Maximum allowed size for length of formatter parameters (payload formatter scripts)"` + MaxParameterLength int `name:"max-parameter-length" description:"Maximum allowed size for length of formatter parameters (payload formatter scripts)"` // nolint:lll } // ConfirmationConfig represents the configuration for confirmed downlink. @@ -112,21 +126,21 @@ type PaginationConfig struct { // Config represents the ApplicationServer configuration. type Config struct { - LinkMode string `name:"link-mode" description:"Deprecated - mode to link applications to their Network Server (all, explicit)"` + LinkMode string `name:"link-mode" description:"Deprecated - mode to link applications to their Network Server (all, explicit)"` // nolint:lll Devices DeviceRegistry `name:"-"` Links LinkRegistry `name:"-"` UplinkStorage UplinkStorageConfig `name:"uplink-storage" description:"Application uplinks storage configuration"` Formatters FormattersConfig `name:"formatters" description:"Payload formatters configuration"` Distribution DistributionConfig `name:"distribution" description:"Distribution configuration"` - EndDeviceFetcher EndDeviceFetcherConfig `name:"fetcher" description:"Deprecated - End Device fetcher configuration"` - EndDeviceMetadataStorage EndDeviceMetadataStorageConfig `name:"end-device-metadata-storage" description:"End device metadata storage configuration"` + EndDeviceFetcher EndDeviceFetcherConfig `name:"fetcher" description:"Deprecated - End Device fetcher configuration"` // nolint:lll + EndDeviceMetadataStorage EndDeviceMetadataStorageConfig `name:"end-device-metadata-storage" description:"End device metadata storage configuration"` // nolint:lll MQTT config.MQTT `name:"mqtt" description:"MQTT configuration"` Webhooks WebhooksConfig `name:"webhooks" description:"Webhooks configuration"` PubSub PubSubConfig `name:"pubsub" description:"Pub/sub messaging configuration"` Packages ApplicationPackagesConfig `name:"packages" description:"Application packages configuration"` Interop InteropConfig `name:"interop" description:"Interop client configuration"` - DeviceKEKLabel string `name:"device-kek-label" description:"Label of KEK used to encrypt device keys at rest"` - DeviceLastSeen LastSeenConfig `name:"device-last-seen" description:"End Device last seen batch update configuration"` + DeviceKEKLabel string `name:"device-kek-label" description:"Label of KEK used to encrypt device keys at rest"` // nolint:lll + DeviceLastSeen LastSeenConfig `name:"device-last-seen" description:"End Device last seen batch update configuration"` // nolint:lll Downlinks DownlinksConfig `name:"downlinks" description:"Downlink configuration"` Pagination PaginationConfig `name:"pagination" description:"Pagination configuration"` } @@ -153,10 +167,10 @@ type UplinkStorageConfig struct { type WebhooksConfig struct { Registry web.WebhookRegistry `name:"-"` Target string `name:"target" description:"Target of the integration (direct)"` - Timeout time.Duration `name:"timeout" description:"Wait timeout of the target to process the request"` + Timeout time.Duration `name:"timeout" description:"Wait timeout of the target to process the request"` // nolint:lll QueueSize int `name:"queue-size" description:"Number of requests to queue"` Workers int `name:"workers" description:"Number of workers to process requests"` - UnhealthyAttemptsThreshold int `name:"unhealthy-attempts-threshold" description:"Number of failed webhook attempts before the webhook is disabled"` + UnhealthyAttemptsThreshold int `name:"unhealthy-attempts-threshold" description:"Number of failed webhook attempts before the webhook is disabled"` // nolint:lll UnhealthyRetryInterval time.Duration `name:"unhealthy-retry-interval" description:"Time interval after which disabled webhooks may execute again"` Templates web.TemplatesConfig `name:"templates" description:"The store of the webhook templates"` Downlinks web.DownlinksConfig `name:"downlink" description:"The downlink queue operations configuration"` @@ -292,7 +306,10 @@ func (c PubSubConfig) NewPubSub(comp *component.Component, server io.Server) (*p // NewApplicationPackages returns a new applications packages frontend based on the configuration. // If the registry is nil, it returns nil. -func (c ApplicationPackagesConfig) NewApplicationPackages(ctx context.Context, server io.Server) (packages.Server, error) { +func (c ApplicationPackagesConfig) NewApplicationPackages( + ctx context.Context, + server io.Server, +) (packages.Server, error) { if c.Registry == nil { return nil, nil } @@ -315,21 +332,25 @@ var ( errInvalidTTL = errors.DefineInvalidArgument("invalid_ttl", "invalid TTL `{ttl}`") ) -// NewRegistry returns a new end device location registry based on the configuration. -func (c EndDeviceLocationStorageConfig) NewRegistry(ctx context.Context, comp *component.Component) (metadata.EndDeviceLocationRegistry, error) { +// NewRegistry returns a new end device attributes registry based on the configuration. +func (c EndDeviceMetadataStorageConfig) NewRegistry( + ctx context.Context, + comp *component.Component, +) (metadata.EndDeviceRegistry, error) { if c.Timeout <= 0 { return nil, errInvalidTimeout.WithAttributes("timeout", c.Timeout) } - registry := metadata.NewClusterEndDeviceLocationRegistry(comp, c.Timeout) - registry = metadata.NewMetricsEndDeviceLocationRegistry(registry) + registry := metadata.NewClusterEndDeviceRegistry(comp, c.Timeout) + registry = metadata.NewMetricsEndDeviceRegistry(registry) if c.Cache.Enable { for _, ttl := range []time.Duration{c.Cache.MinRefreshInterval, c.Cache.MaxRefreshInterval, c.Cache.TTL} { if ttl <= 0 { return nil, errInvalidTTL.WithAttributes("ttl", ttl) } } - cache := metadata.NewMetricsEndDeviceLocationCache(c.Cache.Cache) - registry = metadata.NewCachedEndDeviceLocationRegistry(ctx, comp, registry, cache, c.Cache.MinRefreshInterval, c.Cache.MaxRefreshInterval, c.Cache.TTL) + cache := metadata.NewMetricsEndDeviceRegistryCache(c.Cache.Cache) + registry = metadata.NewCachedEndDeviceRegistry( + ctx, comp, registry, cache, c.Cache.MinRefreshInterval, c.Cache.MaxRefreshInterval, c.Cache.TTL) } return registry, nil } diff --git a/pkg/applicationserver/metadata/cluster.go b/pkg/applicationserver/metadata/cluster.go new file mode 100644 index 0000000000..63eee88360 --- /dev/null +++ b/pkg/applicationserver/metadata/cluster.go @@ -0,0 +1,29 @@ +// Copyright © 2025 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metadata + +import ( + "context" + + "go.thethings.network/lorawan-stack/v3/pkg/cluster" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "google.golang.org/grpc" +) + +// ClusterPeerAccess provides access to cluster peers. +type ClusterPeerAccess interface { + GetPeerConn(ctx context.Context, role ttnpb.ClusterRole, ids cluster.EntityIdentifiers) (*grpc.ClientConn, error) + WithClusterAuth() grpc.CallOption +} diff --git a/pkg/applicationserver/metadata/end_device_registry.go b/pkg/applicationserver/metadata/end_device_registry.go new file mode 100644 index 0000000000..0b6db57a22 --- /dev/null +++ b/pkg/applicationserver/metadata/end_device_registry.go @@ -0,0 +1,358 @@ +// Copyright © 2025 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metadata + +import ( + "context" + "time" + + "go.thethings.network/lorawan-stack/v3/pkg/errors" + "go.thethings.network/lorawan-stack/v3/pkg/log" + "go.thethings.network/lorawan-stack/v3/pkg/random" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/workerpool" +) + +// allowedFieldMaskPaths defines the allowed field mask paths that can be accessed for end devices from this package. +// Calls to the entity registry require an IS roundtrip call which can be cross-continental. This must be done for a +// high volume of end devices, so we want to limit the amount of data that is being transferred. +var allowedFieldMaskPaths = []string{ + "attributes", + "locations", +} + +// EndDeviceRegistry interface for the identity server. +type EndDeviceRegistry interface { + // Get returns an end device from the entity registry by its identifiers. + Get(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, paths []string) (*ttnpb.EndDevice, error) + // Set creates, updates or deletes an end device from the entity registry by its identifiers. + Set( + ctx context.Context, + ids *ttnpb.EndDeviceIdentifiers, + paths []string, + f func(*ttnpb.EndDevice) (*ttnpb.EndDevice, []string, error), + ) (*ttnpb.EndDevice, error) +} + +type noopEndDeviceRegistry struct{} + +// Get implements EndDeviceRegistry. +func (noopEndDeviceRegistry) Get( + _ context.Context, + _ *ttnpb.EndDeviceIdentifiers, + _ []string, +) (*ttnpb.EndDevice, error) { + return &ttnpb.EndDevice{}, nil // nolint: nilnil +} + +// Set implements EndDeviceRegistry. +func (noopEndDeviceRegistry) Set( + _ context.Context, + _ *ttnpb.EndDeviceIdentifiers, + _ []string, + f func(*ttnpb.EndDevice) (*ttnpb.EndDevice, []string, error), +) (*ttnpb.EndDevice, error) { + if f == nil { + return &ttnpb.EndDevice{}, nil // nolint: nilnil + } + + endDevice, _, err := f(&ttnpb.EndDevice{}) + return endDevice, err +} + +// NewNoopEndDeviceRegistry returns a noop EndDeviceRegistry. +func NewNoopEndDeviceRegistry() EndDeviceRegistry { + return noopEndDeviceRegistry{} +} + +type metricsEndDeviceRegistry struct { + inner EndDeviceRegistry +} + +// Get implements EndDeviceRegistry. +func (m *metricsEndDeviceRegistry) Get( + ctx context.Context, + ids *ttnpb.EndDeviceIdentifiers, + paths []string, +) (*ttnpb.EndDevice, error) { + registerMetadataRegistryRetrieval(ctx, endDeviceLabel) + return m.inner.Get(ctx, ids, paths) +} + +// Set implements EndDeviceRegistry. +func (m *metricsEndDeviceRegistry) Set( + ctx context.Context, + ids *ttnpb.EndDeviceIdentifiers, + paths []string, + f func(*ttnpb.EndDevice) (*ttnpb.EndDevice, []string, error), +) (*ttnpb.EndDevice, error) { + registerMetadataRegistryUpdate(ctx, endDeviceLabel) + return m.inner.Set(ctx, ids, paths, f) +} + +// NewMetricsEndDeviceRegistry returns an EndDeviceRegistry that collects metrics. +func NewMetricsEndDeviceRegistry(inner EndDeviceRegistry) EndDeviceRegistry { + return &metricsEndDeviceRegistry{ + inner: inner, + } +} + +type clusterEndDeviceRegistry struct { + ClusterPeerAccess + timeout time.Duration +} + +// Get implements EndDeviceRegistry. +func (c clusterEndDeviceRegistry) Get( + ctx context.Context, + ids *ttnpb.EndDeviceIdentifiers, + paths []string, +) (*ttnpb.EndDevice, error) { + paths, err := processEndDeviceFieldMaskPaths(paths) + if err != nil { + return nil, err + } + + cc, err := c.GetPeerConn(ctx, ttnpb.ClusterRole_ENTITY_REGISTRY, nil) + if err != nil { + return nil, err + } + + cl := ttnpb.NewEndDeviceRegistryClient(cc) + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + + dev, err := cl.Get(ctx, &ttnpb.GetEndDeviceRequest{ + EndDeviceIds: ids, + FieldMask: ttnpb.FieldMask(paths...), + }, c.WithClusterAuth()) + if err != nil { + return nil, err + } + + return dev, nil +} + +// Set implements EndDeviceRegistry. +func (c clusterEndDeviceRegistry) Set( + ctx context.Context, + ids *ttnpb.EndDeviceIdentifiers, + paths []string, + f func(*ttnpb.EndDevice) (*ttnpb.EndDevice, []string, error), +) (*ttnpb.EndDevice, error) { + paths, err := processEndDeviceFieldMaskPaths(paths) + if err != nil { + return nil, err + } + + cc, err := c.GetPeerConn(ctx, ttnpb.ClusterRole_ENTITY_REGISTRY, nil) + if err != nil { + return nil, err + } + + cl := ttnpb.NewEndDeviceRegistryClient(cc) + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + + dev, err := cl.Get(ctx, &ttnpb.GetEndDeviceRequest{ + EndDeviceIds: ids, + FieldMask: ttnpb.FieldMask(paths...), + }, c.WithClusterAuth()) + if err != nil { + return nil, err + } + + dev, paths, err = f(dev) + if err != nil || dev == nil { + return nil, err + } + dev, err = cl.Update(ctx, &ttnpb.UpdateEndDeviceRequest{ + EndDevice: dev, + FieldMask: ttnpb.FieldMask(paths...), + }, c.WithClusterAuth()) + if err != nil { + return nil, err + } + + return dev, nil +} + +// NewClusterEndDeviceRegistry returns an EndDeviceRegistry connected to the entity registry of the Identity Server. +func NewClusterEndDeviceRegistry(cluster ClusterPeerAccess, timeout time.Duration) EndDeviceRegistry { + return &clusterEndDeviceRegistry{ + ClusterPeerAccess: cluster, + timeout: timeout, + } +} + +type cachedEndDeviceRegistry struct { + registry EndDeviceRegistry + cache EndDeviceRegistryCache + + minRefreshInterval time.Duration + maxRefreshInterval time.Duration + ttl time.Duration + + replicationPool workerpool.WorkerPool[*ttnpb.EndDeviceIdentifiers] +} + +// Get implements EndDeviceRegistry. +func (c *cachedEndDeviceRegistry) Get( + ctx context.Context, + ids *ttnpb.EndDeviceIdentifiers, + paths []string, +) (*ttnpb.EndDevice, error) { + paths, err := processEndDeviceFieldMaskPaths(paths) + if err != nil { + return nil, err + } + + dev, storedAt, err := c.cache.Get(ctx, ids) + switch { + case err != nil && !errors.IsNotFound(err): + return nil, err + case err != nil && errors.IsNotFound(err): + dev = nil + case err == nil: + age := time.Since(*storedAt) + if age <= c.minRefreshInterval { + // If the object is younger than the minimum refresh interval, just return the cached value. + return dev, nil + } + if remaining := c.maxRefreshInterval - age; remaining > 0 { + // If the objects age is between the minimum and maximum refresh interval, check if we should asynchronously + // refresh the cache. + window := c.maxRefreshInterval - c.minRefreshInterval + threshold := time.Duration(random.Int63n(int64(window))) + // remaining is the remaining window of the refresh interval in the (0, window) interval. + // threshold is a uniformly distributed duration in the [0, window) interval. + if remaining >= threshold { + return dev, nil + } + } + } + if err := c.replicationPool.Publish(ctx, ids); err != nil { + log.FromContext(ctx).WithError(err).Warn("Failed to publish end device replication request") + } + + if dev == nil { + return nil, errEndDeviceNotFound.WithAttributes("ids", ids) + } + + if len(paths) == 0 { + return dev, nil + } + + return ttnpb.FilterGetEndDevice(dev, paths...) +} + +// Set implements EndDeviceRegistry. +func (c *cachedEndDeviceRegistry) Set( + ctx context.Context, + ids *ttnpb.EndDeviceIdentifiers, + paths []string, + f func(*ttnpb.EndDevice) (*ttnpb.EndDevice, []string, error), +) (*ttnpb.EndDevice, error) { + paths, err := processEndDeviceFieldMaskPaths(paths) + if err != nil { + return nil, err + } + + _, err = c.registry.Set(ctx, ids, paths, f) + if err != nil { + return nil, err + } + + // Get the end device with all the allowed field mask paths from the entity registry and store it in the cache to + // avoid calling the registry again for the same object with a different field mask. + dev, err := c.registry.Get(ctx, ids, allowedFieldMaskPaths) + if err != nil { + return nil, err + } + + if err := c.cache.Set(ctx, ids, dev, c.ttl); err != nil { + return nil, err + } + + if len(paths) == 0 { + return dev, nil + } + + return ttnpb.FilterGetEndDevice(dev, paths...) +} + +// NewCachedEndDeviceRegistry returns an EndDeviceRegistry that caches the responses of the provided EndDeviceRegistry +// in the provided EndDeviceCache. On cache miss, the registry will retrieve and cache the end devices asynchronously. +// Items whose TTL is within the soft TTL window have a chance to trigger an asynchronous cache synchronization event on +// end device retrieval. The probability of a synchronization event increases linearly between the soft TTL (0%) and the +// hard TTL (100%). +func NewCachedEndDeviceRegistry( + ctx context.Context, + c workerpool.Component, + registry EndDeviceRegistry, + cache EndDeviceRegistryCache, + minRefreshInterval, maxRefreshInterval, ttl time.Duration, +) EndDeviceRegistry { + st := &cachedEndDeviceRegistry{ + registry: registry, + cache: cache, + + minRefreshInterval: minRefreshInterval, + maxRefreshInterval: maxRefreshInterval, + ttl: ttl, + + replicationPool: workerpool.NewWorkerPool(workerpool.Config[*ttnpb.EndDeviceIdentifiers]{ + Component: c, + Context: ctx, + Name: "replicate_end_device", + Handler: func(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers) { + // Retrieve the end device with all the allowed field mask paths to avoid refreshing the cache for separate + // field mask paths. + dev, err := registry.Get(ctx, ids, allowedFieldMaskPaths) + if err != nil { + log.FromContext(ctx).WithError(err).Warn("Failed to retrieve end device") + return + } + + if err := cache.Set(ctx, ids, dev, ttl); err != nil { + log.FromContext(ctx).WithError(err).Warn("Failed to cache end device") + return + } + }, + }), + } + return st +} + +func processEndDeviceFieldMaskPaths(paths []string) ([]string, error) { + if len(paths) == 0 { + return allowedFieldMaskPaths, nil + } + + if err := validateEndDevicePaths(paths); err != nil { + return nil, err + } + + return paths, nil +} + +func validateEndDevicePaths(paths []string) error { + allowedFieldMaskSet := ttnpb.FieldMaskPathsSet(allowedFieldMaskPaths) + if ok, firstNotAllowedPath := ttnpb.FieldMaskPathsSetContainsAll(allowedFieldMaskSet, paths...); !ok { + return errFieldMaskPathNotSupported.WithAttributes("path", firstNotAllowedPath) + } + + return nil +} diff --git a/pkg/applicationserver/metadata/end_device_registry_cache.go b/pkg/applicationserver/metadata/end_device_registry_cache.go new file mode 100644 index 0000000000..d308ff2a26 --- /dev/null +++ b/pkg/applicationserver/metadata/end_device_registry_cache.go @@ -0,0 +1,72 @@ +// Copyright © 2025 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metadata + +import ( + "context" + "time" + + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" +) + +// EndDeviceRegistryCache is a cache for end devices fetched from the entity registry in Identity Server. +type EndDeviceRegistryCache interface { + // Get retrieves the end device and the remaining TTL for the entry. + Get(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers) (*ttnpb.EndDevice, *time.Time, error) + // Set sets the end device and the TTL for the entry. + Set(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, dev *ttnpb.EndDevice, ttl time.Duration) error + // Delete removes the end device from the cache. + Delete(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers) error +} + +type metricsEndDeviceRegistryCache struct { + inner EndDeviceRegistryCache +} + +// Get implements EndDeviceRegistryCache. +func (c *metricsEndDeviceRegistryCache) Get( + ctx context.Context, + ids *ttnpb.EndDeviceIdentifiers, +) (*ttnpb.EndDevice, *time.Time, error) { + m, storedAt, err := c.inner.Get(ctx, ids) + if storedAt == nil { + registerMetadataCacheMiss(ctx, locationLabel) + } else { + registerMetadataCacheHit(ctx, locationLabel) + } + return m, storedAt, err +} + +// Set implements EndDeviceRegistryCache. +func (c *metricsEndDeviceRegistryCache) Set( + ctx context.Context, + ids *ttnpb.EndDeviceIdentifiers, + dev *ttnpb.EndDevice, + ttl time.Duration, +) error { + return c.inner.Set(ctx, ids, dev, ttl) +} + +// Delete implements EndDeviceRegistryCache. +func (c *metricsEndDeviceRegistryCache) Delete(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers) error { + return c.inner.Delete(ctx, ids) +} + +// NewMetricsEndDeviceRegistryCache constructs an EndDeviceRegistryCache that collects metrics. +func NewMetricsEndDeviceRegistryCache(inner EndDeviceRegistryCache) EndDeviceRegistryCache { + return &metricsEndDeviceRegistryCache{ + inner: inner, + } +} diff --git a/pkg/applicationserver/metadata/end_device_registry_test.go b/pkg/applicationserver/metadata/end_device_registry_test.go new file mode 100644 index 0000000000..7ad68ccad2 --- /dev/null +++ b/pkg/applicationserver/metadata/end_device_registry_test.go @@ -0,0 +1,449 @@ +// Copyright © 2025 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metadata_test + +import ( + "testing" + "time" + + "go.thethings.network/lorawan-stack/v3/pkg/errors" + + "go.thethings.network/lorawan-stack/v3/pkg/applicationserver/metadata" + "go.thethings.network/lorawan-stack/v3/pkg/applicationserver/metadata/redis" + "go.thethings.network/lorawan-stack/v3/pkg/cluster" + "go.thethings.network/lorawan-stack/v3/pkg/component" + componenttest "go.thethings.network/lorawan-stack/v3/pkg/component/test" + "go.thethings.network/lorawan-stack/v3/pkg/config" + mockis "go.thethings.network/lorawan-stack/v3/pkg/identityserver/mock" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/util/test" + "go.thethings.network/lorawan-stack/v3/pkg/util/test/assertions/should" +) + +var ( + originalLocations = map[string]*ttnpb.Location{ + "baz": { + Altitude: 12, + Latitude: 23, + }, + } + locationsPatch = map[string]*ttnpb.Location{ + "bzz": { + Altitude: 23, + Latitude: 34, + }, + } + + originalAttributes = map[string]string{ + "attr1": "val1", + "attr2": "val2", + } + + attributesPatch = map[string]string{ + "attr3": "val3", + "attr4": "val4", + } +) + +func TestClusterEndDeviceRegistry(t *testing.T) { // nolint:gocyclo + registeredEndDeviceIDs := &ttnpb.EndDeviceIdentifiers{ + ApplicationIds: &ttnpb.ApplicationIdentifiers{ + ApplicationId: "foo", + }, + DeviceId: "bar", + } + + t.Parallel() + + a, ctx := test.New(t) + is, isAddr, closeIS := mockis.New(ctx) + defer closeIS() + + registeredEndDevice := ttnpb.EndDevice{ + Ids: registeredEndDeviceIDs, + Locations: originalLocations, + Attributes: originalAttributes, + } + is.EndDeviceRegistry().Add(ctx, ®isteredEndDevice) + + c := componenttest.NewComponent(t, &component.Config{ + ServiceBase: config.ServiceBase{ + Cluster: cluster.Config{ + IdentityServer: isAddr, + }, + }, + }) + componenttest.StartComponent(t, c) + defer c.Close() + mustHavePeer(ctx, c, ttnpb.ClusterRole_ENTITY_REGISTRY) + + registry := metadata.NewClusterEndDeviceRegistry(c, 10*time.Second) + + _, err := registry.Get(ctx, registeredEndDeviceIDs, []string{ + "network_server_address", "application_server_address", "join_server_address", + }) + a.So(errors.IsInvalidArgument(err), should.BeTrue) + + dev, err := registry.Get(ctx, registeredEndDeviceIDs, []string{"attributes", "locations"}) + if a.So(err, should.BeNil) { + a.So(dev, should.NotBeNil) + + a.So(dev.Locations, should.NotBeNil) + a.So(len(dev.Locations), should.Equal, len(registeredEndDevice.Locations)) + for k, v := range dev.Locations { + a.So(registeredEndDevice.Locations[k], should.Resemble, v) + } + for k, v := range originalLocations { + a.So(dev.Locations[k], should.Resemble, v) + } + + a.So(dev.Attributes, should.NotBeNil) + a.So(len(dev.Attributes), should.Equal, len(registeredEndDevice.Attributes)) + for k, v := range dev.Attributes { + a.So(registeredEndDevice.Attributes[k], should.Equal, v) + } + for k, v := range originalAttributes { + a.So(dev.Attributes[k], should.Equal, v) + } + } + + _, err = registry.Set(ctx, registeredEndDeviceIDs, []string{ + "network_server_address", "application_server_address", "join_server_address", + }, func(_ *ttnpb.EndDevice) (*ttnpb.EndDevice, []string, error) { + return nil, nil, nil // nolint: nilnil + }) + a.So(errors.IsInvalidArgument(err), should.BeTrue) + + // Update location and attributes. + dev, err = registry.Set(ctx, registeredEndDeviceIDs, []string{"locations", "attributes"}, + func(stored *ttnpb.EndDevice) (*ttnpb.EndDevice, []string, error) { + if stored == nil { + return nil, nil, errors.New("not found") + } + + if len(stored.Locations) == 0 { + stored.Locations = make(map[string]*ttnpb.Location, len(locationsPatch)) + } + + for k, l := range locationsPatch { + stored.Locations[k] = l + } + + if len(stored.Attributes) == 0 { + stored.Attributes = make(map[string]string, len(attributesPatch)) + } + + for k, v := range attributesPatch { + stored.Attributes[k] = v + } + + return stored, []string{"locations", "attributes"}, nil + }, + ) + if a.So(err, should.BeNil) { + a.So(dev, should.NotBeNil) + + a.So(dev.Locations, should.NotBeNil) + a.So(len(dev.Locations), should.Equal, len(registeredEndDevice.Locations)) + for k, v := range dev.Locations { + a.So(registeredEndDevice.Locations[k], should.Resemble, v) + } + for k, v := range originalLocations { + a.So(dev.Locations[k], should.Resemble, v) + } + for k, v := range locationsPatch { + a.So(dev.Locations[k], should.Resemble, v) + } + + a.So(dev.Attributes, should.NotBeNil) + a.So(len(dev.Attributes), should.Equal, len(registeredEndDevice.Attributes)) + for k, v := range dev.Attributes { + a.So(registeredEndDevice.Attributes[k], should.Equal, v) + } + for k, v := range originalAttributes { + a.So(dev.Attributes[k], should.Equal, v) + } + for k, v := range attributesPatch { + a.So(dev.Attributes[k], should.Equal, v) + } + } + + dev, err = registry.Get(ctx, registeredEndDeviceIDs, []string{"attributes", "locations"}) + if a.So(err, should.BeNil) { + a.So(dev, should.NotBeNil) + + a.So(dev.Locations, should.NotBeNil) + a.So(len(dev.Locations), should.Equal, len(registeredEndDevice.Locations)) + for k, v := range dev.Locations { + a.So(registeredEndDevice.Locations[k], should.Resemble, v) + } + for k, v := range originalLocations { + a.So(dev.Locations[k], should.Resemble, v) + } + for k, v := range locationsPatch { + a.So(dev.Locations[k], should.Resemble, v) + } + + a.So(dev.Attributes, should.NotBeNil) + a.So(len(dev.Attributes), should.Equal, len(registeredEndDevice.Attributes)) + for k, v := range dev.Attributes { + a.So(registeredEndDevice.Attributes[k], should.Equal, v) + } + for k, v := range originalAttributes { + a.So(dev.Attributes[k], should.Equal, v) + } + for k, v := range attributesPatch { + a.So(dev.Attributes[k], should.Equal, v) + } + } +} + +func TestCachedEndDeviceRegistry(t *testing.T) { // nolint:gocyclo + t.Parallel() + + var ( + registeredEndDeviceIDs = &ttnpb.EndDeviceIdentifiers{ + ApplicationIds: &ttnpb.ApplicationIdentifiers{ + ApplicationId: "foo", + }, + DeviceId: "bar", + } + + Timeout = (1 << 7) * test.Delay + ) + + a, ctx := test.New(t) + is, isAddr, closeIS := mockis.New(ctx) + defer closeIS() + + registeredEndDevice := ttnpb.EndDevice{ + Ids: registeredEndDeviceIDs, + Locations: originalLocations, + Attributes: originalAttributes, + } + is.EndDeviceRegistry().Add(ctx, ®isteredEndDevice) + + c := componenttest.NewComponent(t, &component.Config{ + ServiceBase: config.ServiceBase{ + Cluster: cluster.Config{ + IdentityServer: isAddr, + }, + }, + }) + componenttest.StartComponent(t, c) + defer c.Close() + mustHavePeer(ctx, c, ttnpb.ClusterRole_ENTITY_REGISTRY) + + registry := metadata.NewClusterEndDeviceRegistry(c, 4*Timeout) + cl, flush := test.NewRedis(ctx, "metadata_redis_test") + defer flush() + cache := &redis.EndDeviceCache{Redis: cl} + registry = metadata.NewCachedEndDeviceRegistry( + ctx, c, registry, cache, 4*Timeout, 8*Timeout, 16*Timeout, + ) + + _, err := registry.Get(ctx, registeredEndDeviceIDs, []string{ + "network_server_address", "application_server_address", "join_server_address", + }) + a.So(errors.IsInvalidArgument(err), should.BeTrue) + + dev, err := registry.Get(ctx, registeredEndDeviceIDs, nil) + a.So(errors.IsNotFound(err), should.BeTrue) + a.So(dev, should.BeNil) + + // Wait for the cache to be populated asynchronously. + time.Sleep(Timeout) + + dev, err = registry.Get(ctx, registeredEndDeviceIDs, []string{"attributes", "locations"}) + if a.So(err, should.BeNil) { + a.So(dev, should.NotBeNil) + + a.So(dev.Attributes, should.NotBeNil) + a.So(len(dev.Attributes), should.Equal, len(originalAttributes)) + for k, v := range originalAttributes { + a.So(dev.Attributes[k], should.Equal, v) + } + + a.So(dev.Locations, should.NotBeNil) + a.So(len(dev.Locations), should.Equal, len(originalLocations)) + for k, v := range originalLocations { + a.So(dev.Locations[k], should.Resemble, v) + } + } + + _, err = registry.Set(ctx, registeredEndDeviceIDs, []string{ + "network_server_address", "application_server_address", "join_server_address", + }, func(_ *ttnpb.EndDevice) (*ttnpb.EndDevice, []string, error) { + return nil, nil, nil // nolint: nilnil + }) + a.So(errors.IsInvalidArgument(err), should.BeTrue) + + dev, err = registry.Set(ctx, registeredEndDeviceIDs, []string{"locations", "attributes"}, + func(stored *ttnpb.EndDevice) (*ttnpb.EndDevice, []string, error) { + if stored == nil { + return nil, nil, errors.New("not found") + } + + if len(stored.Attributes) == 0 { + stored.Attributes = make(map[string]string, len(attributesPatch)) + } + + for k, v := range attributesPatch { + stored.Attributes[k] = v + } + + if len(stored.Locations) == 0 { + stored.Locations = make(map[string]*ttnpb.Location, len(locationsPatch)) + } + + for k, l := range locationsPatch { + stored.Locations[k] = l + } + + return stored, []string{"locations", "attributes"}, nil + }, + ) + if a.So(err, should.BeNil) { + a.So(dev, should.NotBeNil) + + a.So(dev.Attributes, should.NotBeNil) + a.So(len(dev.Attributes), should.Equal, len(registeredEndDevice.Attributes)) + for k, v := range dev.Attributes { + a.So(registeredEndDevice.Attributes[k], should.Equal, v) + } + for k, v := range originalAttributes { + a.So(dev.Attributes[k], should.Equal, v) + } + for k, v := range attributesPatch { + a.So(dev.Attributes[k], should.Equal, v) + } + + a.So(dev.Locations, should.NotBeNil) + a.So(len(dev.Locations), should.Equal, len(registeredEndDevice.Locations)) + for k, v := range dev.Locations { + a.So(registeredEndDevice.Locations[k], should.Resemble, v) + } + for k, v := range originalLocations { + a.So(dev.Locations[k], should.Resemble, v) + } + for k, v := range locationsPatch { + a.So(dev.Locations[k], should.Resemble, v) + } + } + + dev, err = registry.Get(ctx, registeredEndDeviceIDs, []string{"attributes"}) + if a.So(err, should.BeNil) { + a.So(dev, should.NotBeNil) + + a.So(dev.Attributes, should.NotBeNil) + a.So(len(dev.Attributes), should.Equal, len(registeredEndDevice.Attributes)) + for k, v := range dev.Attributes { + a.So(registeredEndDevice.Attributes[k], should.Equal, v) + } + for k, v := range originalAttributes { + a.So(dev.Attributes[k], should.Equal, v) + } + for k, v := range attributesPatch { + a.So(dev.Attributes[k], should.Equal, v) + } + } + + dev, err = registry.Get(ctx, registeredEndDeviceIDs, []string{"locations"}) + if a.So(err, should.BeNil) { + a.So(dev, should.NotBeNil) + + a.So(dev.Locations, should.NotBeNil) + a.So(len(dev.Locations), should.Equal, len(registeredEndDevice.Locations)) + for k, v := range dev.Locations { + a.So(registeredEndDevice.Locations[k], should.Resemble, v) + } + for k, v := range originalLocations { + a.So(dev.Locations[k], should.Resemble, v) + } + for k, v := range locationsPatch { + a.So(dev.Locations[k], should.Resemble, v) + } + } + + // Wait for the entry to be evicted. + time.Sleep(20 * Timeout) + + // There is no cached end device anymore, and we have triggered an asynchronous refresh. + dev, err = registry.Get(ctx, registeredEndDeviceIDs, []string{"locations"}) + a.So(errors.IsNotFound(err), should.BeTrue) + a.So(dev, should.BeNil) + + time.Sleep(Timeout) + + dev, err = registry.Get(ctx, registeredEndDeviceIDs, []string{"locations"}) + if a.So(err, should.BeNil) { + a.So(dev, should.NotBeNil) + + a.So(dev.Locations, should.NotBeNil) + a.So(len(dev.Locations), should.Equal, len(registeredEndDevice.Locations)) + for k, v := range dev.Locations { + a.So(registeredEndDevice.Locations[k], should.Resemble, v) + } + for k, v := range originalLocations { + a.So(dev.Locations[k], should.Resemble, v) + } + for k, v := range locationsPatch { + a.So(dev.Locations[k], should.Resemble, v) + } + } + + // Simulate a network partition. + closeIS() + time.Sleep(Timeout) + + // Do a read that will trigger an asynchronous cache refresh. + dev, err = registry.Get(ctx, registeredEndDeviceIDs, []string{"locations"}) + if a.So(err, should.BeNil) { + a.So(dev, should.NotBeNil) + + a.So(dev.Locations, should.NotBeNil) + a.So(len(dev.Locations), should.Equal, len(registeredEndDevice.Locations)) + for k, v := range dev.Locations { + a.So(registeredEndDevice.Locations[k], should.Resemble, v) + } + for k, v := range originalLocations { + a.So(dev.Locations[k], should.Resemble, v) + } + for k, v := range locationsPatch { + a.So(dev.Locations[k], should.Resemble, v) + } + } + + // Wait for the partition to be detected asynchronously. + time.Sleep(Timeout) + + // We now serve stale data. + dev, err = registry.Get(ctx, registeredEndDeviceIDs, []string{"locations"}) + if a.So(err, should.BeNil) { + a.So(dev, should.NotBeNil) + + a.So(dev.Locations, should.NotBeNil) + a.So(len(dev.Locations), should.Equal, len(registeredEndDevice.Locations)) + for k, v := range dev.Locations { + a.So(registeredEndDevice.Locations[k], should.Resemble, v) + } + for k, v := range originalLocations { + a.So(dev.Locations[k], should.Resemble, v) + } + for k, v := range locationsPatch { + a.So(dev.Locations[k], should.Resemble, v) + } + } +} diff --git a/pkg/applicationserver/metadata/location_cache.go b/pkg/applicationserver/metadata/location_cache.go deleted file mode 100644 index 6ad1a3de1a..0000000000 --- a/pkg/applicationserver/metadata/location_cache.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright © 2021 The Things Network Foundation, The Things Industries B.V. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metadata - -import ( - "context" - "time" - - "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" -) - -// EndDeviceLocationCache is a cache for end device locations. -type EndDeviceLocationCache interface { - // Get retrieves the end device locations and the remaining TTL for the entry. - Get(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers) (map[string]*ttnpb.Location, *time.Time, error) - // Set sets the end device locations. - Set(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, update map[string]*ttnpb.Location, ttl time.Duration) error - // Delete removes the locations from the cache. - Delete(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers) error -} - -type metricsEndDeviceLocationCache struct { - inner EndDeviceLocationCache -} - -// Get implements EndDeviceLocationCache. -func (c *metricsEndDeviceLocationCache) Get(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers) (map[string]*ttnpb.Location, *time.Time, error) { - m, storedAt, err := c.inner.Get(ctx, ids) - if storedAt == nil { - registerMetadataCacheMiss(ctx, locationLabel) - } else { - registerMetadataCacheHit(ctx, locationLabel) - } - return m, storedAt, err -} - -// Set implements EndDeviceLocationCache. -func (c *metricsEndDeviceLocationCache) Set(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, update map[string]*ttnpb.Location, ttl time.Duration) error { - return c.inner.Set(ctx, ids, update, ttl) -} - -// Delete implements EndDeviceLocationCache. -func (c *metricsEndDeviceLocationCache) Delete(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers) error { - return c.inner.Delete(ctx, ids) -} - -// NewMetricsEndDeviceLocationCache constructs an EndDeviceLocationCache that collects metrics. -func NewMetricsEndDeviceLocationCache(inner EndDeviceLocationCache) EndDeviceLocationCache { - return &metricsEndDeviceLocationCache{ - inner: inner, - } -} diff --git a/pkg/applicationserver/metadata/location_registry.go b/pkg/applicationserver/metadata/location_registry.go deleted file mode 100644 index b1c1a5fcef..0000000000 --- a/pkg/applicationserver/metadata/location_registry.go +++ /dev/null @@ -1,242 +0,0 @@ -// Copyright © 2021 The Things Network Foundation, The Things Industries B.V. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metadata - -import ( - "context" - "time" - - "go.thethings.network/lorawan-stack/v3/pkg/cluster" - "go.thethings.network/lorawan-stack/v3/pkg/errors" - "go.thethings.network/lorawan-stack/v3/pkg/log" - "go.thethings.network/lorawan-stack/v3/pkg/random" - "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" - "go.thethings.network/lorawan-stack/v3/pkg/workerpool" - "google.golang.org/grpc" -) - -// EndDeviceLocationRegistry is a registry for end device locations. -type EndDeviceLocationRegistry interface { - // Get retrieves the end device locations. - Get(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers) (map[string]*ttnpb.Location, error) - // Merge merges the end device locations. - Merge(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, update map[string]*ttnpb.Location) (map[string]*ttnpb.Location, error) -} - -type noopEndDeviceLocationRegistry struct{} - -// Get implements EndDeviceLocationRegistry. -func (noopEndDeviceLocationRegistry) Get(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers) (map[string]*ttnpb.Location, error) { - return nil, nil -} - -// Merge implements EndDeviceLocationRegistry. -func (noopEndDeviceLocationRegistry) Merge(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, update map[string]*ttnpb.Location) (map[string]*ttnpb.Location, error) { - return update, nil -} - -// NewNoopEndDeviceLocationRegistry returns a noop EndDeviceLocationRegistry. -func NewNoopEndDeviceLocationRegistry() EndDeviceLocationRegistry { - return noopEndDeviceLocationRegistry{} -} - -type metricsEndDeviceLocationRegistry struct { - inner EndDeviceLocationRegistry -} - -// Get implements EndDeviceLocationRegistry. -func (m *metricsEndDeviceLocationRegistry) Get(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers) (map[string]*ttnpb.Location, error) { - registerMetadataRegistryRetrieval(ctx, locationLabel) - return m.inner.Get(ctx, ids) -} - -// Merge implements EndDeviceLocationRegistry. -func (m *metricsEndDeviceLocationRegistry) Merge(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, update map[string]*ttnpb.Location) (map[string]*ttnpb.Location, error) { - registerMetadataRegistryUpdate(ctx, locationLabel) - return m.inner.Merge(ctx, ids, update) -} - -// NewMetricsEndDeviceLocationRegistry returns an EndDeviceLocationRegistry that collects metrics. -func NewMetricsEndDeviceLocationRegistry(inner EndDeviceLocationRegistry) EndDeviceLocationRegistry { - return &metricsEndDeviceLocationRegistry{ - inner: inner, - } -} - -// ClusterPeerAccess provides access to cluster peers. -type ClusterPeerAccess interface { - GetPeerConn(ctx context.Context, role ttnpb.ClusterRole, ids cluster.EntityIdentifiers) (*grpc.ClientConn, error) - WithClusterAuth() grpc.CallOption -} - -var endDeviceLocationFieldMask = ttnpb.FieldMask("locations") - -type clusterEndDeviceLocationRegistry struct { - ClusterPeerAccess - timeout time.Duration -} - -// Get implements EndDeviceLocationRegistry. -func (c clusterEndDeviceLocationRegistry) Get(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers) (map[string]*ttnpb.Location, error) { - cc, err := c.GetPeerConn(ctx, ttnpb.ClusterRole_ENTITY_REGISTRY, nil) - if err != nil { - return nil, err - } - cl := ttnpb.NewEndDeviceRegistryClient(cc) - ctx, cancel := context.WithTimeout(ctx, c.timeout) - defer cancel() - dev, err := cl.Get(ctx, &ttnpb.GetEndDeviceRequest{ - EndDeviceIds: ids, - FieldMask: endDeviceLocationFieldMask, - }, c.WithClusterAuth()) - if err != nil { - return nil, err - } - return dev.Locations, nil -} - -// Merge implements EndDeviceLocationRegistry. -func (c clusterEndDeviceLocationRegistry) Merge(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, update map[string]*ttnpb.Location) (map[string]*ttnpb.Location, error) { - cc, err := c.GetPeerConn(ctx, ttnpb.ClusterRole_ENTITY_REGISTRY, nil) - if err != nil { - return nil, err - } - cl := ttnpb.NewEndDeviceRegistryClient(cc) - ctx, cancel := context.WithTimeout(ctx, c.timeout) - defer cancel() - dev, err := cl.Get(ctx, &ttnpb.GetEndDeviceRequest{ - EndDeviceIds: ids, - FieldMask: endDeviceLocationFieldMask, - }, c.WithClusterAuth()) - if err != nil { - return nil, err - } - if len(update) == 0 { - return dev.Locations, nil - } - if dev.Locations == nil { - dev.Locations = make(map[string]*ttnpb.Location, len(update)) - } - for k, l := range update { - dev.Locations[k] = l - } - _, err = cl.Update(ctx, &ttnpb.UpdateEndDeviceRequest{ - EndDevice: &ttnpb.EndDevice{ - Ids: ids, - Locations: dev.Locations, - }, - FieldMask: endDeviceLocationFieldMask, - }, c.WithClusterAuth()) - if err != nil { - return nil, err - } - return dev.Locations, nil -} - -// NewClusterEndDeviceLocationRegistry returns an EndDeviceLocationRegistry connected to the Entity Registry. -func NewClusterEndDeviceLocationRegistry(cluster ClusterPeerAccess, timeout time.Duration) EndDeviceLocationRegistry { - return &clusterEndDeviceLocationRegistry{ - ClusterPeerAccess: cluster, - timeout: timeout, - } -} - -type cachedEndDeviceLocationRegistry struct { - registry EndDeviceLocationRegistry - cache EndDeviceLocationCache - - minRefreshInterval time.Duration - maxRefreshInterval time.Duration - ttl time.Duration - - replicationPool workerpool.WorkerPool[*ttnpb.EndDeviceIdentifiers] -} - -// Get implements EndDeviceLocationRegistry. -func (c *cachedEndDeviceLocationRegistry) Get(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers) (map[string]*ttnpb.Location, error) { - locations, storedAt, err := c.cache.Get(ctx, ids) - switch { - case err != nil && !errors.IsNotFound(err): - return nil, err - case err != nil && errors.IsNotFound(err): - locations = nil - case err == nil: - age := time.Since(*storedAt) - if age <= c.minRefreshInterval { - // If the object is younger than the minimum refresh interval, just return the cached value. - return locations, nil - } - if remaining := c.maxRefreshInterval - age; remaining > 0 { - // If the objects age is between the minimum and maximum refresh interval, check if we should asynchronously - // refresh the cache. - window := c.maxRefreshInterval - c.minRefreshInterval - threshold := time.Duration(random.Int63n(int64(window))) - // remaining is the remaining window of the refresh interval in the (0, window) interval. - // threshold is a uniformly distributed duration in the [0, window) interval. - if remaining >= threshold { - return locations, nil - } - } - } - if err := c.replicationPool.Publish(ctx, ids); err != nil { - log.FromContext(ctx).WithError(err).Warn("Failed to publish end device locations replication request") - } - return locations, nil -} - -// Merge implements EndDeviceLocationRegistry. -func (c *cachedEndDeviceLocationRegistry) Merge(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, update map[string]*ttnpb.Location) (map[string]*ttnpb.Location, error) { - locations, err := c.registry.Merge(ctx, ids, update) - if err != nil { - return nil, err - } - if err := c.cache.Set(ctx, ids, locations, c.ttl); err != nil { - return nil, err - } - return locations, nil -} - -// NewCachedEndDeviceLocationRegistry returns an EndDeviceLocationRegistry that caches the responses of the provided EndDeviceLocationRegistry in the provided -// EndDeviceLocationCache. On cache miss, the registry will retrieve and cache the locations asynchronously. -// Items whose TTL is within the soft TTL window have a chance to trigger an asynchronous cache synchronization event on location retrieval. -// The probability of a synchronization event increases linearly between the soft TTL (0%) and the hard TTL (100%). -func NewCachedEndDeviceLocationRegistry(ctx context.Context, c workerpool.Component, registry EndDeviceLocationRegistry, cache EndDeviceLocationCache, minRefreshInterval, maxRefreshInterval, ttl time.Duration) EndDeviceLocationRegistry { - st := &cachedEndDeviceLocationRegistry{ - registry: registry, - cache: cache, - - minRefreshInterval: minRefreshInterval, - maxRefreshInterval: maxRefreshInterval, - ttl: ttl, - - replicationPool: workerpool.NewWorkerPool(workerpool.Config[*ttnpb.EndDeviceIdentifiers]{ - Component: c, - Context: ctx, - Name: "replicate_end_device_locations", - Handler: func(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers) { - locations, err := registry.Get(ctx, ids) - if err != nil { - log.FromContext(ctx).WithError(err).Warn("Failed to retrieve end device locations") - return - } - if err := cache.Set(ctx, ids, locations, ttl); err != nil { - log.FromContext(ctx).WithError(err).Warn("Failed to cache end device locations") - return - } - }, - }), - } - return st -} diff --git a/pkg/applicationserver/metadata/location_registry_test.go b/pkg/applicationserver/metadata/location_registry_test.go deleted file mode 100644 index f366942a04..0000000000 --- a/pkg/applicationserver/metadata/location_registry_test.go +++ /dev/null @@ -1,274 +0,0 @@ -// Copyright © 2021 The Things Network Foundation, The Things Industries B.V. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metadata_test - -import ( - "context" - "testing" - "time" - - "go.thethings.network/lorawan-stack/v3/pkg/applicationserver/metadata" - "go.thethings.network/lorawan-stack/v3/pkg/applicationserver/metadata/redis" - "go.thethings.network/lorawan-stack/v3/pkg/cluster" - "go.thethings.network/lorawan-stack/v3/pkg/component" - componenttest "go.thethings.network/lorawan-stack/v3/pkg/component/test" - "go.thethings.network/lorawan-stack/v3/pkg/config" - mockis "go.thethings.network/lorawan-stack/v3/pkg/identityserver/mock" - "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" - "go.thethings.network/lorawan-stack/v3/pkg/util/test" - "go.thethings.network/lorawan-stack/v3/pkg/util/test/assertions/should" -) - -func mustHavePeer(ctx context.Context, c *component.Component, role ttnpb.ClusterRole) { - for i := 0; i < 20; i++ { - time.Sleep(20 * time.Millisecond) - if _, err := c.GetPeer(ctx, role, nil); err == nil { - return - } - } - panic("could not connect to peer") -} - -var ( - registeredEndDeviceIDs = &ttnpb.EndDeviceIdentifiers{ - ApplicationIds: &ttnpb.ApplicationIdentifiers{ - ApplicationId: "foo", - }, - DeviceId: "bar", - } - originalLocations = map[string]*ttnpb.Location{ - "baz": { - Altitude: 12, - Latitude: 23, - }, - } - locationsPatch = map[string]*ttnpb.Location{ - "bzz": { - Altitude: 23, - Latitude: 34, - }, - } - Timeout = (1 << 7) * test.Delay -) - -func TestClusterEndDeviceLocationRegistry(t *testing.T) { - a, ctx := test.New(t) - is, isAddr, closeIS := mockis.New(ctx) - defer closeIS() - - registeredEndDevice := ttnpb.EndDevice{ - Ids: registeredEndDeviceIDs, - Locations: originalLocations, - } - is.EndDeviceRegistry().Add(ctx, ®isteredEndDevice) - - c := componenttest.NewComponent(t, &component.Config{ - ServiceBase: config.ServiceBase{ - Cluster: cluster.Config{ - IdentityServer: isAddr, - }, - }, - }) - componenttest.StartComponent(t, c) - defer c.Close() - mustHavePeer(ctx, c, ttnpb.ClusterRole_ENTITY_REGISTRY) - - registry := metadata.NewClusterEndDeviceLocationRegistry(c, 10*time.Second) - - locations, err := registry.Get(ctx, registeredEndDeviceIDs) - if a.So(err, should.BeNil) { - a.So(locations, should.NotBeNil) - a.So(len(locations), should.Equal, len(registeredEndDevice.Locations)) - for k, v := range locations { - a.So(registeredEndDevice.Locations[k], should.Resemble, v) - } - for k, v := range originalLocations { - a.So(locations[k], should.Resemble, v) - } - } - - locations, err = registry.Merge(ctx, registeredEndDeviceIDs, locationsPatch) - if a.So(err, should.BeNil) { - a.So(locations, should.NotBeNil) - a.So(len(locations), should.Equal, len(registeredEndDevice.Locations)) - for k, v := range locations { - a.So(registeredEndDevice.Locations[k], should.Resemble, v) - } - for k, v := range originalLocations { - a.So(locations[k], should.Resemble, v) - } - for k, v := range locationsPatch { - a.So(locations[k], should.Resemble, v) - } - } - - locations, err = registry.Get(ctx, registeredEndDeviceIDs) - if a.So(err, should.BeNil) { - a.So(locations, should.NotBeNil) - a.So(len(locations), should.Equal, len(registeredEndDevice.Locations)) - for k, v := range locations { - a.So(registeredEndDevice.Locations[k], should.Resemble, v) - } - for k, v := range originalLocations { - a.So(locations[k], should.Resemble, v) - } - for k, v := range locationsPatch { - a.So(locations[k], should.Resemble, v) - } - } -} - -func TestCachedEndDeviceLocationRegistry(t *testing.T) { - a, ctx := test.New(t) - is, isAddr, closeIS := mockis.New(ctx) - defer closeIS() - - registeredEndDevice := ttnpb.EndDevice{ - Ids: registeredEndDeviceIDs, - Locations: originalLocations, - } - is.EndDeviceRegistry().Add(ctx, ®isteredEndDevice) - - c := componenttest.NewComponent(t, &component.Config{ - ServiceBase: config.ServiceBase{ - Cluster: cluster.Config{ - IdentityServer: isAddr, - }, - }, - }) - componenttest.StartComponent(t, c) - defer c.Close() - mustHavePeer(ctx, c, ttnpb.ClusterRole_ENTITY_REGISTRY) - - registry := metadata.NewClusterEndDeviceLocationRegistry(c, 4*Timeout) - cl, flush := test.NewRedis(ctx, "metadata_redis_test") - defer flush() - cache := &redis.EndDeviceLocationCache{ - Redis: cl, - } - registry = metadata.NewCachedEndDeviceLocationRegistry( - ctx, c, registry, cache, 4*Timeout, 8*Timeout, 16*Timeout, - ) - - locations, err := registry.Get(ctx, registeredEndDeviceIDs) - a.So(err, should.BeNil) - a.So(locations, should.HaveLength, 0) - - // Wait for the cache to be populated asynchronously. - time.Sleep(Timeout) - - locations, err = registry.Get(ctx, registeredEndDeviceIDs) - if a.So(err, should.BeNil) { - a.So(locations, should.NotBeNil) - a.So(len(locations), should.Equal, len(originalLocations)) - for k, v := range originalLocations { - a.So(locations[k], should.Resemble, v) - } - } - - locations, err = registry.Merge(ctx, registeredEndDeviceIDs, locationsPatch) - if a.So(err, should.BeNil) { - a.So(locations, should.NotBeNil) - a.So(len(locations), should.Equal, len(registeredEndDevice.Locations)) - for k, v := range locations { - a.So(registeredEndDevice.Locations[k], should.Resemble, v) - } - for k, v := range originalLocations { - a.So(locations[k], should.Resemble, v) - } - for k, v := range locationsPatch { - a.So(locations[k], should.Resemble, v) - } - } - - locations, err = registry.Get(ctx, registeredEndDeviceIDs) - if a.So(err, should.BeNil) { - a.So(locations, should.NotBeNil) - a.So(len(locations), should.Equal, len(registeredEndDevice.Locations)) - for k, v := range locations { - a.So(registeredEndDevice.Locations[k], should.Resemble, v) - } - for k, v := range originalLocations { - a.So(locations[k], should.Resemble, v) - } - for k, v := range locationsPatch { - a.So(locations[k], should.Resemble, v) - } - } - - // Wait for the entry to be evicted. - time.Sleep(20 * Timeout) - - // There is no cached location anymore, and we have triggered an asynchronous refresh. - locations, err = registry.Get(ctx, registeredEndDeviceIDs) - a.So(err, should.BeNil) - a.So(locations, should.HaveLength, 0) - - time.Sleep(Timeout) - - locations, err = registry.Get(ctx, registeredEndDeviceIDs) - if a.So(err, should.BeNil) { - a.So(locations, should.NotBeNil) - a.So(len(locations), should.Equal, len(registeredEndDevice.Locations)) - for k, v := range locations { - a.So(registeredEndDevice.Locations[k], should.Resemble, v) - } - for k, v := range originalLocations { - a.So(locations[k], should.Resemble, v) - } - for k, v := range locationsPatch { - a.So(locations[k], should.Resemble, v) - } - } - - // Simulate a network partition. - closeIS() - time.Sleep(Timeout) - - // Do a read that will trigger an asynchronous cache refresh. - locations, err = registry.Get(ctx, registeredEndDeviceIDs) - if a.So(err, should.BeNil) { - a.So(locations, should.NotBeNil) - a.So(len(locations), should.Equal, len(registeredEndDevice.Locations)) - for k, v := range locations { - a.So(registeredEndDevice.Locations[k], should.Resemble, v) - } - for k, v := range originalLocations { - a.So(locations[k], should.Resemble, v) - } - for k, v := range locationsPatch { - a.So(locations[k], should.Resemble, v) - } - } - - // Wait for the partition to be detected asynchronously. - time.Sleep(Timeout) - - // We now serve stale data. - locations, err = registry.Get(ctx, registeredEndDeviceIDs) - if a.So(err, should.BeNil) { - a.So(locations, should.NotBeNil) - a.So(len(locations), should.Equal, len(registeredEndDevice.Locations)) - for k, v := range locations { - a.So(registeredEndDevice.Locations[k], should.Resemble, v) - } - for k, v := range originalLocations { - a.So(locations[k], should.Resemble, v) - } - for k, v := range locationsPatch { - a.So(locations[k], should.Resemble, v) - } - } -} diff --git a/pkg/applicationserver/metadata/metadata.go b/pkg/applicationserver/metadata/metadata.go new file mode 100644 index 0000000000..5a061b22e6 --- /dev/null +++ b/pkg/applicationserver/metadata/metadata.go @@ -0,0 +1,24 @@ +// Copyright © 2025 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package metadata contains the metadata registry clients. +package metadata + +import "go.thethings.network/lorawan-stack/v3/pkg/errors" + +var ( + errEndDeviceNotFound = errors.DefineNotFound("end_device_not_found", "end device not found") + errFieldMaskPathNotSupported = errors.DefineInvalidArgument( + "field_mask_path_not_supported", "field mask path `{path}` is not supported") +) diff --git a/pkg/applicationserver/metadata/metadata_util_test.go b/pkg/applicationserver/metadata/metadata_util_test.go new file mode 100644 index 0000000000..56407cc45b --- /dev/null +++ b/pkg/applicationserver/metadata/metadata_util_test.go @@ -0,0 +1,33 @@ +// Copyright © 2025 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metadata_test + +import ( + "context" + "time" + + "go.thethings.network/lorawan-stack/v3/pkg/component" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" +) + +func mustHavePeer(ctx context.Context, c *component.Component, role ttnpb.ClusterRole) { // nolint: unparam + for i := 0; i < 20; i++ { + time.Sleep(20 * time.Millisecond) + if _, err := c.GetPeer(ctx, role, nil); err == nil { + return + } + } + panic("could not connect to peer") +} diff --git a/pkg/applicationserver/metadata/observability.go b/pkg/applicationserver/metadata/observability.go index 64ff2f6bc3..a012bfcf37 100644 --- a/pkg/applicationserver/metadata/observability.go +++ b/pkg/applicationserver/metadata/observability.go @@ -22,9 +22,10 @@ import ( ) const ( - subsystem = "as_metadata" - metadataLabel = "metadata" - locationLabel = "location" + subsystem = "as_metadata" + metadataLabel = "metadata" + locationLabel = "location" + endDeviceLabel = "end_device" ) var metaMetrics = &metadataMetrics{ @@ -87,20 +88,20 @@ func (m metadataMetrics) Collect(ch chan<- prometheus.Metric) { m.registryUpdates.Collect(ch) } -func registerMetadataCacheHit(ctx context.Context, metadata string) { +func registerMetadataCacheHit(_ context.Context, metadata string) { metaMetrics.cacheHits.WithLabelValues(metadata).Inc() metaMetrics.cacheMisses.WithLabelValues(metadata) } -func registerMetadataCacheMiss(ctx context.Context, metadata string) { +func registerMetadataCacheMiss(_ context.Context, metadata string) { metaMetrics.cacheHits.WithLabelValues(metadata) metaMetrics.cacheMisses.WithLabelValues(metadata).Inc() } -func registerMetadataRegistryRetrieval(ctx context.Context, metadata string) { +func registerMetadataRegistryRetrieval(_ context.Context, metadata string) { metaMetrics.registryRetrievals.WithLabelValues(metadata).Inc() } -func registerMetadataRegistryUpdate(ctx context.Context, metadata string) { +func registerMetadataRegistryUpdate(_ context.Context, metadata string) { metaMetrics.registryUpdates.WithLabelValues(metadata).Inc() } diff --git a/pkg/applicationserver/metadata/redis/end_device_cache.go b/pkg/applicationserver/metadata/redis/end_device_cache.go new file mode 100644 index 0000000000..f9e48a1de7 --- /dev/null +++ b/pkg/applicationserver/metadata/redis/end_device_cache.go @@ -0,0 +1,116 @@ +// Copyright © 2025 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package redis + +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/redis/go-redis/v9" + ttnredis "go.thethings.network/lorawan-stack/v3/pkg/redis" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/unique" +) + +// EndDeviceCache is a Redis end device cache. +type EndDeviceCache struct { + Redis *ttnredis.Client +} + +func (r *EndDeviceCache) uidKey(uid string) string { + return r.Redis.Key("uid", uid) +} + +// Get returns the end device by the identifiers. +func (r *EndDeviceCache) Get( + ctx context.Context, + ids *ttnpb.EndDeviceIdentifiers, +) (*ttnpb.EndDevice, *time.Time, error) { + uidKey := r.uidKey(unique.ID(ctx, ids)) + m, err := r.Redis.HGetAll(ctx, uidKey).Result() + if err != nil { + return nil, nil, ttnredis.ConvertError(err) + } + if len(m) == 0 { + return nil, nil, errCacheMiss.New() + } + + var storedAt time.Time + if s, ok := m[storedAtMarker]; ok { + n, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return nil, nil, err + } + storedAt = time.Unix(0, n) + } + if s, ok := m[errorMarker]; ok { + details := &ttnpb.ErrorDetails{} + if err := ttnredis.UnmarshalProto(s, details); err != nil { + return nil, nil, err + } + return nil, &storedAt, ttnpb.ErrorDetailsFromProto(details) + } + if marshalledDevice, ok := m[endDeviceMarker]; ok { + dev := new(ttnpb.EndDevice) + if err := ttnredis.UnmarshalProto(marshalledDevice, dev); err != nil { + return nil, nil, err + } + + return dev, &storedAt, nil + } + + return nil, nil, errCacheEntryMalformed +} + +// Set updates the end device by its identifiers. +func (r *EndDeviceCache) Set( + ctx context.Context, + ids *ttnpb.EndDeviceIdentifiers, + dev *ttnpb.EndDevice, + ttl time.Duration, +) error { + marshalledEndDevice, err := ttnredis.MarshalProto(dev) + if err != nil { + return err + } + pairs := []string{ + storedAtMarker, + fmt.Sprintf("%v", time.Now().UnixNano()), + endDeviceMarker, + marshalledEndDevice, + } + + uidKey := r.uidKey(unique.ID(ctx, ids)) + if _, err := r.Redis.Pipelined(ctx, func(p redis.Pipeliner) error { + p.Del(ctx, uidKey) + p.HSet(ctx, uidKey, pairs) + p.PExpire(ctx, uidKey, ttl) + return nil + }); err != nil { + return ttnredis.ConvertError(err) + } + return nil +} + +// Delete removes the end device by the identifiers. +func (r *EndDeviceCache) Delete(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers) error { + uidKey := r.uidKey(unique.ID(ctx, ids)) + if err := r.Redis.Del(ctx, uidKey).Err(); err != nil { + return ttnredis.ConvertError(err) + } + return nil +} diff --git a/pkg/applicationserver/metadata/redis/end_device_cache_test.go b/pkg/applicationserver/metadata/redis/end_device_cache_test.go new file mode 100644 index 0000000000..5aed3ad45a --- /dev/null +++ b/pkg/applicationserver/metadata/redis/end_device_cache_test.go @@ -0,0 +1,107 @@ +// Copyright © 2025 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package redis_test + +import ( + "testing" + "time" + + "go.thethings.network/lorawan-stack/v3/pkg/applicationserver/metadata/redis" + "go.thethings.network/lorawan-stack/v3/pkg/errors" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/util/test" + "go.thethings.network/lorawan-stack/v3/pkg/util/test/assertions/should" +) + +var ( + attributesA = map[string]string{ + "attr1": "val1", + "attr2": "val2", + } + attributesB = map[string]string{ + "attr3": "val3", + } +) + +func TestEndDeviceCache(t *testing.T) { + t.Parallel() + + endDevice := &ttnpb.EndDevice{ + Ids: registeredEndDeviceIDs, + Attributes: attributesA, + } + + a, ctx := test.New(t) + cl, flush := test.NewRedis(ctx, "metadata_redis_test") + defer flush() + cache := redis.EndDeviceCache{Redis: cl} + + _, _, err := cache.Get(ctx, registeredEndDeviceIDs) + a.So(err, should.NotBeNil) + a.So(errors.IsNotFound(err), should.BeTrue) + + storeTime := time.Now() + err = cache.Set(ctx, registeredEndDeviceIDs, endDevice, Timeout) + a.So(err, should.BeNil) + + dev, storedAt, err := cache.Get(ctx, endDevice.Ids) + if a.So(err, should.BeNil) { + if a.So(storedAt, should.NotBeNil) { + a.So(*storedAt, should.HappenOnOrAfter, storeTime) + } + a.So(dev, should.Resemble, endDevice) + } + + endDevice.Attributes = attributesB + + storeTime = time.Now() + err = cache.Set(ctx, registeredEndDeviceIDs, endDevice, Timeout) + a.So(err, should.BeNil) + + dev, storedAt, err = cache.Get(ctx, registeredEndDeviceIDs) + if a.So(err, should.BeNil) { + if a.So(storedAt, should.NotBeNil) { + a.So(*storedAt, should.HappenOnOrAfter, storeTime) + } + a.So(dev, should.Resemble, endDevice) + } + + err = cache.Delete(ctx, registeredEndDeviceIDs) + a.So(err, should.BeNil) + + _, _, err = cache.Get(ctx, registeredEndDeviceIDs) + a.So(err, should.NotBeNil) + a.So(errors.IsNotFound(err), should.BeTrue) + + endDevice.Attributes = attributesA + + storeTime = time.Now() + err = cache.Set(ctx, registeredEndDeviceIDs, endDevice, Timeout) + a.So(err, should.BeNil) + + dev, storedAt, err = cache.Get(ctx, registeredEndDeviceIDs) + if a.So(err, should.BeNil) { + if a.So(storedAt, should.NotBeNil) { + a.So(*storedAt, should.HappenOnOrAfter, storeTime) + } + a.So(dev, should.Resemble, endDevice) + } + + time.Sleep(2 * Timeout) + + _, _, err = cache.Get(ctx, registeredEndDeviceIDs) + a.So(err, should.NotBeNil) + a.So(errors.IsNotFound(err), should.BeTrue) +} diff --git a/pkg/applicationserver/metadata/redis/location_cache.go b/pkg/applicationserver/metadata/redis/location_cache.go index c8221e109e..d27d6f6469 100644 --- a/pkg/applicationserver/metadata/redis/location_cache.go +++ b/pkg/applicationserver/metadata/redis/location_cache.go @@ -21,7 +21,6 @@ import ( "time" "github.com/redis/go-redis/v9" - "go.thethings.network/lorawan-stack/v3/pkg/errors" ttnredis "go.thethings.network/lorawan-stack/v3/pkg/redis" "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" "go.thethings.network/lorawan-stack/v3/pkg/unique" @@ -36,17 +35,11 @@ func (r *EndDeviceLocationCache) uidKey(uid string) string { return r.Redis.Key("uid", uid) } -const ( - // storedAtMarker is used to store the timestamp of the last Set operation. - storedAtMarker = "_stored_at" - // errorMarker is used to store errors. - errorMarker = "_error" -) - -var errCacheMiss = errors.DefineNotFound("cache_miss", "cache miss") - // Get returns the locations by the end device identifiers. -func (r *EndDeviceLocationCache) Get(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers) (map[string]*ttnpb.Location, *time.Time, error) { +func (r *EndDeviceLocationCache) Get( + ctx context.Context, + ids *ttnpb.EndDeviceIdentifiers, +) (map[string]*ttnpb.Location, *time.Time, error) { uidKey := r.uidKey(unique.ID(ctx, ids)) m, err := r.Redis.HGetAll(ctx, uidKey).Result() if err != nil { @@ -86,7 +79,12 @@ func (r *EndDeviceLocationCache) Get(ctx context.Context, ids *ttnpb.EndDeviceId } // Set updates the locations by the end device identifiers. -func (r *EndDeviceLocationCache) Set(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, update map[string]*ttnpb.Location, ttl time.Duration) error { +func (r *EndDeviceLocationCache) Set( + ctx context.Context, + ids *ttnpb.EndDeviceIdentifiers, + update map[string]*ttnpb.Location, + ttl time.Duration, +) error { pairs := append(make([]string, 0, 2*len(update)+2), storedAtMarker, fmt.Sprintf("%v", time.Now().UnixNano())) for k, v := range update { s, err := ttnredis.MarshalProto(v) diff --git a/pkg/applicationserver/metadata/redis/location_cache_test.go b/pkg/applicationserver/metadata/redis/location_cache_test.go index eee514b062..706e036da6 100644 --- a/pkg/applicationserver/metadata/redis/location_cache_test.go +++ b/pkg/applicationserver/metadata/redis/location_cache_test.go @@ -26,13 +26,6 @@ import ( ) var ( - registeredEndDeviceIDs = &ttnpb.EndDeviceIdentifiers{ - ApplicationIds: &ttnpb.ApplicationIdentifiers{ - ApplicationId: "foo", - }, - DeviceId: "bar", - } - locationA = map[string]*ttnpb.Location{ "foo": { Latitude: 123, @@ -48,13 +41,11 @@ var ( Latitude: 567, }, } - - errUnavailable = errors.DefineUnavailable("unavailable", "unavailable") - - Timeout = (1 << 8) * test.Delay ) func TestLocationCache(t *testing.T) { + t.Parallel() + a, ctx := test.New(t) cl, flush := test.NewRedis(ctx, "metadata_redis_test") defer flush() diff --git a/pkg/applicationserver/metadata/redis/redis.go b/pkg/applicationserver/metadata/redis/redis.go new file mode 100644 index 0000000000..b997c79759 --- /dev/null +++ b/pkg/applicationserver/metadata/redis/redis.go @@ -0,0 +1,37 @@ +// Copyright © 2025 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package redis provides a Redis-based cache for end device metadata. +package redis + +import ( + "go.thethings.network/lorawan-stack/v3/pkg/errors" +) + +const ( + // nolint: godot + // endDeviceMarker is used to store an end device. + endDeviceMarker = "_end_device" + // nolint: godot + // errorMarker is used to store errors. + errorMarker = "_error" + // nolint: godot + // storedAtMarker is used to store the timestamp of the last Set operation. + storedAtMarker = "_stored_at" +) + +var ( + errCacheEntryMalformed = errors.DefineCorruption("cache_entry_malformed", "cache entry malformed") + errCacheMiss = errors.DefineNotFound("cache_miss", "cache miss") +) diff --git a/pkg/applicationserver/metadata/redis/redis_util_test.go b/pkg/applicationserver/metadata/redis/redis_util_test.go new file mode 100644 index 0000000000..5991749652 --- /dev/null +++ b/pkg/applicationserver/metadata/redis/redis_util_test.go @@ -0,0 +1,31 @@ +// Copyright © 2025 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package redis_test + +import ( + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/util/test" +) + +var ( + registeredEndDeviceIDs = &ttnpb.EndDeviceIdentifiers{ + ApplicationIds: &ttnpb.ApplicationIdentifiers{ + ApplicationId: "foo", + }, + DeviceId: "bar", + } + + Timeout = (1 << 8) * test.Delay +) diff --git a/pkg/networkserver/grpc_deviceregistry.go b/pkg/networkserver/grpc_deviceregistry.go index 756ca39e4c..d050c43a3f 100644 --- a/pkg/networkserver/grpc_deviceregistry.go +++ b/pkg/networkserver/grpc_deviceregistry.go @@ -3021,6 +3021,13 @@ func (ns *NetworkServer) Set(ctx context.Context, req *ttnpb.SetEndDeviceRequest "mac_state.pending_application_downlink.f_cnt", "mac_state.pending_application_downlink.f_port", "mac_state.pending_application_downlink.frm_payload", + "mac_state.pending_application_downlink.network_ids", + "mac_state.pending_application_downlink.network_ids.cluster_address", + "mac_state.pending_application_downlink.network_ids.cluster_id", + "mac_state.pending_application_downlink.network_ids.net_id", + "mac_state.pending_application_downlink.network_ids.ns_id", + "mac_state.pending_application_downlink.network_ids.tenant_address", + "mac_state.pending_application_downlink.network_ids.tenant_id", "mac_state.pending_application_downlink.priority", "mac_state.pending_application_downlink.session_key_id", "mac_state.pending_relay_downlink.raw_payload", diff --git a/pkg/ttnpb/end_device.go b/pkg/ttnpb/end_device.go index 41985da6de..1e79c4d18c 100644 --- a/pkg/ttnpb/end_device.go +++ b/pkg/ttnpb/end_device.go @@ -1829,6 +1829,8 @@ func (v *MACState) FieldIsZero(p string) bool { return v.LorawanVersion == 0 case "pending_application_downlink": return v.PendingApplicationDownlink == nil + case "pending_application_downlink.attributes": + return v.PendingApplicationDownlink.FieldIsZero("attributes") case "pending_application_downlink.class_b_c": return v.PendingApplicationDownlink.FieldIsZero("class_b_c") case "pending_application_downlink.class_b_c.absolute_time": @@ -1855,10 +1857,38 @@ func (v *MACState) FieldIsZero(p string) bool { return v.PendingApplicationDownlink.FieldIsZero("f_port") case "pending_application_downlink.frm_payload": return v.PendingApplicationDownlink.FieldIsZero("frm_payload") + case "pending_application_downlink.locations": + return v.PendingApplicationDownlink.FieldIsZero("locations") + case "pending_application_downlink.network_ids": + return v.PendingApplicationDownlink.FieldIsZero("network_ids") + case "pending_application_downlink.network_ids.cluster_address": + return v.PendingApplicationDownlink.FieldIsZero("network_ids.cluster_address") + case "pending_application_downlink.network_ids.cluster_id": + return v.PendingApplicationDownlink.FieldIsZero("network_ids.cluster_id") + case "pending_application_downlink.network_ids.net_id": + return v.PendingApplicationDownlink.FieldIsZero("network_ids.net_id") + case "pending_application_downlink.network_ids.ns_id": + return v.PendingApplicationDownlink.FieldIsZero("network_ids.ns_id") + case "pending_application_downlink.network_ids.tenant_address": + return v.PendingApplicationDownlink.FieldIsZero("network_ids.tenant_address") + case "pending_application_downlink.network_ids.tenant_id": + return v.PendingApplicationDownlink.FieldIsZero("network_ids.tenant_id") case "pending_application_downlink.priority": return v.PendingApplicationDownlink.FieldIsZero("priority") case "pending_application_downlink.session_key_id": return v.PendingApplicationDownlink.FieldIsZero("session_key_id") + case "pending_application_downlink.version_ids": + return v.PendingApplicationDownlink.FieldIsZero("version_ids") + case "pending_application_downlink.version_ids.band_id": + return v.PendingApplicationDownlink.FieldIsZero("version_ids.band_id") + case "pending_application_downlink.version_ids.brand_id": + return v.PendingApplicationDownlink.FieldIsZero("version_ids.brand_id") + case "pending_application_downlink.version_ids.firmware_version": + return v.PendingApplicationDownlink.FieldIsZero("version_ids.firmware_version") + case "pending_application_downlink.version_ids.hardware_version": + return v.PendingApplicationDownlink.FieldIsZero("version_ids.hardware_version") + case "pending_application_downlink.version_ids.model_id": + return v.PendingApplicationDownlink.FieldIsZero("version_ids.model_id") case "pending_join_request": return v.PendingJoinRequest == nil case "pending_join_request.cf_list": @@ -2998,3 +3028,25 @@ func (d *EndDevice) UpdateTimestamps(src *EndDevice) { // EndDeviceFieldPathsNestedWithoutWrappers is the set of EndDevice nested paths without the wrapper paths. var EndDeviceFieldPathsNestedWithoutWrappers = FieldsWithoutWrappers(EndDeviceFieldPathsNested) + +// FieldIsZero returns whether path p is zero. +func (v *NetworkIdentifiers) FieldIsZero(p string) bool { + if v == nil { + return true + } + switch p { + case "cluster_address": + return v.ClusterAddress == "" + case "cluster_id": + return v.ClusterId == "" + case "net_id": + return len(v.NetId) == 0 + case "ns_id": + return len(v.NsId) == 0 + case "tenant_address": + return v.TenantAddress == "" + case "tenant_id": // nolint:goconst + return v.TenantId == "" + } + panic(fmt.Sprintf("unknown path '%s'", p)) +} diff --git a/pkg/ttnpb/field_mask_validation.go b/pkg/ttnpb/field_mask_validation.go index d07e53d09a..42e9acdebc 100644 --- a/pkg/ttnpb/field_mask_validation.go +++ b/pkg/ttnpb/field_mask_validation.go @@ -439,6 +439,7 @@ var nsEndDeviceReadFieldPaths = [...]string{ "mac_state.last_network_initiated_downlink_at", "mac_state.lorawan_version", "mac_state.pending_application_downlink", + "mac_state.pending_application_downlink.attributes", "mac_state.pending_application_downlink.class_b_c", "mac_state.pending_application_downlink.class_b_c.absolute_time", "mac_state.pending_application_downlink.class_b_c.gateways", @@ -447,8 +448,22 @@ var nsEndDeviceReadFieldPaths = [...]string{ "mac_state.pending_application_downlink.f_cnt", "mac_state.pending_application_downlink.f_port", "mac_state.pending_application_downlink.frm_payload", + "mac_state.pending_application_downlink.locations", + "mac_state.pending_application_downlink.network_ids", + "mac_state.pending_application_downlink.network_ids.cluster_address", + "mac_state.pending_application_downlink.network_ids.cluster_id", + "mac_state.pending_application_downlink.network_ids.net_id", + "mac_state.pending_application_downlink.network_ids.ns_id", + "mac_state.pending_application_downlink.network_ids.tenant_address", + "mac_state.pending_application_downlink.network_ids.tenant_id", "mac_state.pending_application_downlink.priority", "mac_state.pending_application_downlink.session_key_id", + "mac_state.pending_application_downlink.version_ids", + "mac_state.pending_application_downlink.version_ids.band_id", + "mac_state.pending_application_downlink.version_ids.brand_id", + "mac_state.pending_application_downlink.version_ids.firmware_version", + "mac_state.pending_application_downlink.version_ids.hardware_version", + "mac_state.pending_application_downlink.version_ids.model_id", "mac_state.pending_relay_downlink", "mac_state.pending_relay_downlink.raw_payload", "mac_state.pending_requests", @@ -1358,6 +1373,13 @@ var RPCFieldMaskPaths = map[string]RPCFieldMaskPathValue{ "mac_state.pending_application_downlink.f_cnt", "mac_state.pending_application_downlink.f_port", "mac_state.pending_application_downlink.frm_payload", + "mac_state.pending_application_downlink.network_ids", + "mac_state.pending_application_downlink.network_ids.cluster_address", + "mac_state.pending_application_downlink.network_ids.cluster_id", + "mac_state.pending_application_downlink.network_ids.net_id", + "mac_state.pending_application_downlink.network_ids.ns_id", + "mac_state.pending_application_downlink.network_ids.tenant_address", + "mac_state.pending_application_downlink.network_ids.tenant_id", "mac_state.pending_application_downlink.priority", "mac_state.pending_application_downlink.session_key_id", "mac_state.pending_relay_downlink", diff --git a/pkg/ttnpb/messages.go b/pkg/ttnpb/messages.go index d7cfc2a436..18e06b66b5 100644 --- a/pkg/ttnpb/messages.go +++ b/pkg/ttnpb/messages.go @@ -71,6 +71,8 @@ func (v *ApplicationDownlink) FieldIsZero(p string) bool { return true } switch p { + case "attributes": + return v.Attributes == nil case "class_b_c": return v.ClassBC == nil case "class_b_c.absolute_time": @@ -79,6 +81,12 @@ func (v *ApplicationDownlink) FieldIsZero(p string) bool { return v.ClassBC.FieldIsZero("gateways") case "confirmed": return !v.Confirmed + case "confirmed_retry": + return v.ConfirmedRetry == nil + case "confirmed_retry.attempt": + return v.ConfirmedRetry.FieldIsZero("attempt") + case "confirmed_retry.max_attempts": + return v.ConfirmedRetry.FieldIsZero("max_attempts") case "correlation_ids": return v.CorrelationIds == nil case "decoded_payload": @@ -91,16 +99,38 @@ func (v *ApplicationDownlink) FieldIsZero(p string) bool { return v.FPort == 0 case "frm_payload": return v.FrmPayload == nil + case "locations": + return v.Locations == nil + case "network_ids": // nolint: goconst + return v.NetworkIds == nil + case "network_ids.cluster_address": + return v.NetworkIds.FieldIsZero("cluster_address") + case "network_ids.cluster_id": + return v.NetworkIds.FieldIsZero("cluster_id") + case "network_ids.net_id": + return v.NetworkIds.FieldIsZero("net_id") + case "network_ids.ns_id": + return v.NetworkIds.FieldIsZero("ns_id") + case "network_ids.tenant_address": + return v.NetworkIds.FieldIsZero("tenant_address") + case "network_ids.tenant_id": + return v.NetworkIds.FieldIsZero("tenant_id") case "priority": return v.Priority == 0 case "session_key_id": return v.SessionKeyId == nil - case "confirmed_retry": - return v.ConfirmedRetry == nil - case "confirmed_retry.attempt": - return v.ConfirmedRetry.FieldIsZero("attempt") - case "confirmed_retry.max_attempts": - return v.ConfirmedRetry.FieldIsZero("max_attempts") + case "version_ids": + return v.VersionIds == nil + case "version_ids.band_id": + return v.VersionIds.FieldIsZero("band_id") + case "version_ids.brand_id": + return v.VersionIds.FieldIsZero("brand_id") + case "version_ids.firmware_version": + return v.VersionIds.FieldIsZero("firmware_version") + case "version_ids.hardware_version": + return v.VersionIds.FieldIsZero("hardware_version") + case "version_ids.model_id": + return v.VersionIds.FieldIsZero("model_id") } panic(fmt.Sprintf("unknown path '%s'", p)) } diff --git a/pkg/webui/locales/ja.json b/pkg/webui/locales/ja.json index 3fbd6fac96..905ec7bc0c 100644 --- a/pkg/webui/locales/ja.json +++ b/pkg/webui/locales/ja.json @@ -2140,7 +2140,10 @@ "error:pkg/applicationserver/io/web:validate_body": "本文を検証する", "error:pkg/applicationserver/io/web:webhook_not_found": "Webhookが見つかりません", "error:pkg/applicationserver/io:buffer_full": "バッファがいっぱいです", + "error:pkg/applicationserver/metadata/redis:cache_entry_malformed": "", "error:pkg/applicationserver/metadata/redis:cache_miss": "キャッシュミス", + "error:pkg/applicationserver/metadata:end_device_not_found": "", + "error:pkg/applicationserver/metadata:field_mask_path_not_supported": "", "error:pkg/applicationserver/redis:application_uid": "無効なアプリケーションUID `{application_uid}`", "error:pkg/applicationserver/redis:invalid_fieldmask": "無効なフィールドマスク", "error:pkg/applicationserver/redis:invalid_identifiers": "無効な識別子", diff --git a/sdk/js/generated/api-definition.json b/sdk/js/generated/api-definition.json index 27bf4962ff..0b300e0563 100644 --- a/sdk/js/generated/api-definition.json +++ b/sdk/js/generated/api-definition.json @@ -745,6 +745,7 @@ "simulated", "up", "up.downlink_ack", + "up.downlink_ack.attributes", "up.downlink_ack.class_b_c", "up.downlink_ack.class_b_c.absolute_time", "up.downlink_ack.class_b_c.gateways", @@ -758,10 +759,26 @@ "up.downlink_ack.f_cnt", "up.downlink_ack.f_port", "up.downlink_ack.frm_payload", + "up.downlink_ack.locations", + "up.downlink_ack.network_ids", + "up.downlink_ack.network_ids.cluster_address", + "up.downlink_ack.network_ids.cluster_id", + "up.downlink_ack.network_ids.net_id", + "up.downlink_ack.network_ids.ns_id", + "up.downlink_ack.network_ids.tenant_address", + "up.downlink_ack.network_ids.tenant_id", "up.downlink_ack.priority", "up.downlink_ack.session_key_id", + "up.downlink_ack.version_ids", + "up.downlink_ack.version_ids.band_id", + "up.downlink_ack.version_ids.brand_id", + "up.downlink_ack.version_ids.firmware_version", + "up.downlink_ack.version_ids.hardware_version", + "up.downlink_ack.version_ids.model_id", "up.downlink_failed", + "up.downlink_failed.attributes", "up.downlink_failed.downlink", + "up.downlink_failed.downlink.attributes", "up.downlink_failed.downlink.class_b_c", "up.downlink_failed.downlink.class_b_c.absolute_time", "up.downlink_failed.downlink.class_b_c.gateways", @@ -775,8 +792,22 @@ "up.downlink_failed.downlink.f_cnt", "up.downlink_failed.downlink.f_port", "up.downlink_failed.downlink.frm_payload", + "up.downlink_failed.downlink.locations", + "up.downlink_failed.downlink.network_ids", + "up.downlink_failed.downlink.network_ids.cluster_address", + "up.downlink_failed.downlink.network_ids.cluster_id", + "up.downlink_failed.downlink.network_ids.net_id", + "up.downlink_failed.downlink.network_ids.ns_id", + "up.downlink_failed.downlink.network_ids.tenant_address", + "up.downlink_failed.downlink.network_ids.tenant_id", "up.downlink_failed.downlink.priority", "up.downlink_failed.downlink.session_key_id", + "up.downlink_failed.downlink.version_ids", + "up.downlink_failed.downlink.version_ids.band_id", + "up.downlink_failed.downlink.version_ids.brand_id", + "up.downlink_failed.downlink.version_ids.firmware_version", + "up.downlink_failed.downlink.version_ids.hardware_version", + "up.downlink_failed.downlink.version_ids.model_id", "up.downlink_failed.error", "up.downlink_failed.error.attributes", "up.downlink_failed.error.cause", @@ -791,7 +822,22 @@ "up.downlink_failed.error.message_format", "up.downlink_failed.error.name", "up.downlink_failed.error.namespace", + "up.downlink_failed.locations", + "up.downlink_failed.network_ids", + "up.downlink_failed.network_ids.cluster_address", + "up.downlink_failed.network_ids.cluster_id", + "up.downlink_failed.network_ids.net_id", + "up.downlink_failed.network_ids.ns_id", + "up.downlink_failed.network_ids.tenant_address", + "up.downlink_failed.network_ids.tenant_id", + "up.downlink_failed.version_ids", + "up.downlink_failed.version_ids.band_id", + "up.downlink_failed.version_ids.brand_id", + "up.downlink_failed.version_ids.firmware_version", + "up.downlink_failed.version_ids.hardware_version", + "up.downlink_failed.version_ids.model_id", "up.downlink_nack", + "up.downlink_nack.attributes", "up.downlink_nack.class_b_c", "up.downlink_nack.class_b_c.absolute_time", "up.downlink_nack.class_b_c.gateways", @@ -805,13 +851,43 @@ "up.downlink_nack.f_cnt", "up.downlink_nack.f_port", "up.downlink_nack.frm_payload", + "up.downlink_nack.locations", + "up.downlink_nack.network_ids", + "up.downlink_nack.network_ids.cluster_address", + "up.downlink_nack.network_ids.cluster_id", + "up.downlink_nack.network_ids.net_id", + "up.downlink_nack.network_ids.ns_id", + "up.downlink_nack.network_ids.tenant_address", + "up.downlink_nack.network_ids.tenant_id", "up.downlink_nack.priority", "up.downlink_nack.session_key_id", + "up.downlink_nack.version_ids", + "up.downlink_nack.version_ids.band_id", + "up.downlink_nack.version_ids.brand_id", + "up.downlink_nack.version_ids.firmware_version", + "up.downlink_nack.version_ids.hardware_version", + "up.downlink_nack.version_ids.model_id", "up.downlink_queue_invalidated", + "up.downlink_queue_invalidated.attributes", "up.downlink_queue_invalidated.downlinks", "up.downlink_queue_invalidated.last_f_cnt_down", + "up.downlink_queue_invalidated.locations", + "up.downlink_queue_invalidated.network_ids", + "up.downlink_queue_invalidated.network_ids.cluster_address", + "up.downlink_queue_invalidated.network_ids.cluster_id", + "up.downlink_queue_invalidated.network_ids.net_id", + "up.downlink_queue_invalidated.network_ids.ns_id", + "up.downlink_queue_invalidated.network_ids.tenant_address", + "up.downlink_queue_invalidated.network_ids.tenant_id", "up.downlink_queue_invalidated.session_key_id", + "up.downlink_queue_invalidated.version_ids", + "up.downlink_queue_invalidated.version_ids.band_id", + "up.downlink_queue_invalidated.version_ids.brand_id", + "up.downlink_queue_invalidated.version_ids.firmware_version", + "up.downlink_queue_invalidated.version_ids.hardware_version", + "up.downlink_queue_invalidated.version_ids.model_id", "up.downlink_queued", + "up.downlink_queued.attributes", "up.downlink_queued.class_b_c", "up.downlink_queued.class_b_c.absolute_time", "up.downlink_queued.class_b_c.gateways", @@ -825,9 +901,24 @@ "up.downlink_queued.f_cnt", "up.downlink_queued.f_port", "up.downlink_queued.frm_payload", + "up.downlink_queued.locations", + "up.downlink_queued.network_ids", + "up.downlink_queued.network_ids.cluster_address", + "up.downlink_queued.network_ids.cluster_id", + "up.downlink_queued.network_ids.net_id", + "up.downlink_queued.network_ids.ns_id", + "up.downlink_queued.network_ids.tenant_address", + "up.downlink_queued.network_ids.tenant_id", "up.downlink_queued.priority", "up.downlink_queued.session_key_id", + "up.downlink_queued.version_ids", + "up.downlink_queued.version_ids.band_id", + "up.downlink_queued.version_ids.brand_id", + "up.downlink_queued.version_ids.firmware_version", + "up.downlink_queued.version_ids.hardware_version", + "up.downlink_queued.version_ids.model_id", "up.downlink_sent", + "up.downlink_sent.attributes", "up.downlink_sent.class_b_c", "up.downlink_sent.class_b_c.absolute_time", "up.downlink_sent.class_b_c.gateways", @@ -841,17 +932,46 @@ "up.downlink_sent.f_cnt", "up.downlink_sent.f_port", "up.downlink_sent.frm_payload", + "up.downlink_sent.locations", + "up.downlink_sent.network_ids", + "up.downlink_sent.network_ids.cluster_address", + "up.downlink_sent.network_ids.cluster_id", + "up.downlink_sent.network_ids.net_id", + "up.downlink_sent.network_ids.ns_id", + "up.downlink_sent.network_ids.tenant_address", + "up.downlink_sent.network_ids.tenant_id", "up.downlink_sent.priority", "up.downlink_sent.session_key_id", + "up.downlink_sent.version_ids", + "up.downlink_sent.version_ids.band_id", + "up.downlink_sent.version_ids.brand_id", + "up.downlink_sent.version_ids.firmware_version", + "up.downlink_sent.version_ids.hardware_version", + "up.downlink_sent.version_ids.model_id", "up.join_accept", "up.join_accept.app_s_key", "up.join_accept.app_s_key.encrypted_key", "up.join_accept.app_s_key.kek_label", "up.join_accept.app_s_key.key", + "up.join_accept.attributes", "up.join_accept.invalidated_downlinks", + "up.join_accept.locations", + "up.join_accept.network_ids", + "up.join_accept.network_ids.cluster_address", + "up.join_accept.network_ids.cluster_id", + "up.join_accept.network_ids.net_id", + "up.join_accept.network_ids.ns_id", + "up.join_accept.network_ids.tenant_address", + "up.join_accept.network_ids.tenant_id", "up.join_accept.pending_session", "up.join_accept.received_at", "up.join_accept.session_key_id", + "up.join_accept.version_ids", + "up.join_accept.version_ids.band_id", + "up.join_accept.version_ids.brand_id", + "up.join_accept.version_ids.firmware_version", + "up.join_accept.version_ids.hardware_version", + "up.join_accept.version_ids.model_id", "up.location_solved", "up.location_solved.attributes", "up.location_solved.location", @@ -862,13 +982,29 @@ "up.location_solved.location.source", "up.location_solved.service", "up.service_data", + "up.service_data.attributes", "up.service_data.data", + "up.service_data.locations", + "up.service_data.network_ids", + "up.service_data.network_ids.cluster_address", + "up.service_data.network_ids.cluster_id", + "up.service_data.network_ids.net_id", + "up.service_data.network_ids.ns_id", + "up.service_data.network_ids.tenant_address", + "up.service_data.network_ids.tenant_id", "up.service_data.service", + "up.service_data.version_ids", + "up.service_data.version_ids.band_id", + "up.service_data.version_ids.brand_id", + "up.service_data.version_ids.firmware_version", + "up.service_data.version_ids.hardware_version", + "up.service_data.version_ids.model_id", "up.uplink_message", "up.uplink_message.app_s_key", "up.uplink_message.app_s_key.encrypted_key", "up.uplink_message.app_s_key.kek_label", "up.uplink_message.app_s_key.key", + "up.uplink_message.attributes", "up.uplink_message.confirmed", "up.uplink_message.consumed_airtime", "up.uplink_message.decoded_payload", @@ -920,6 +1056,7 @@ "up.uplink_message.version_ids.hardware_version", "up.uplink_message.version_ids.model_id", "up.uplink_normalized", + "up.uplink_normalized.attributes", "up.uplink_normalized.confirmed", "up.uplink_normalized.consumed_airtime", "up.uplink_normalized.f_cnt", @@ -4750,6 +4887,7 @@ "mac_state.last_network_initiated_downlink_at", "mac_state.lorawan_version", "mac_state.pending_application_downlink", + "mac_state.pending_application_downlink.attributes", "mac_state.pending_application_downlink.class_b_c", "mac_state.pending_application_downlink.class_b_c.absolute_time", "mac_state.pending_application_downlink.class_b_c.gateways", @@ -4758,8 +4896,22 @@ "mac_state.pending_application_downlink.f_cnt", "mac_state.pending_application_downlink.f_port", "mac_state.pending_application_downlink.frm_payload", + "mac_state.pending_application_downlink.locations", + "mac_state.pending_application_downlink.network_ids", + "mac_state.pending_application_downlink.network_ids.cluster_address", + "mac_state.pending_application_downlink.network_ids.cluster_id", + "mac_state.pending_application_downlink.network_ids.net_id", + "mac_state.pending_application_downlink.network_ids.ns_id", + "mac_state.pending_application_downlink.network_ids.tenant_address", + "mac_state.pending_application_downlink.network_ids.tenant_id", "mac_state.pending_application_downlink.priority", "mac_state.pending_application_downlink.session_key_id", + "mac_state.pending_application_downlink.version_ids", + "mac_state.pending_application_downlink.version_ids.band_id", + "mac_state.pending_application_downlink.version_ids.brand_id", + "mac_state.pending_application_downlink.version_ids.firmware_version", + "mac_state.pending_application_downlink.version_ids.hardware_version", + "mac_state.pending_application_downlink.version_ids.model_id", "mac_state.pending_relay_downlink", "mac_state.pending_relay_downlink.raw_payload", "mac_state.pending_requests", @@ -5366,6 +5518,13 @@ "mac_state.pending_application_downlink.f_cnt", "mac_state.pending_application_downlink.f_port", "mac_state.pending_application_downlink.frm_payload", + "mac_state.pending_application_downlink.network_ids", + "mac_state.pending_application_downlink.network_ids.cluster_address", + "mac_state.pending_application_downlink.network_ids.cluster_id", + "mac_state.pending_application_downlink.network_ids.net_id", + "mac_state.pending_application_downlink.network_ids.ns_id", + "mac_state.pending_application_downlink.network_ids.tenant_address", + "mac_state.pending_application_downlink.network_ids.tenant_id", "mac_state.pending_application_downlink.priority", "mac_state.pending_application_downlink.session_key_id", "mac_state.pending_relay_downlink", @@ -5962,6 +6121,7 @@ "mac_state.last_network_initiated_downlink_at", "mac_state.lorawan_version", "mac_state.pending_application_downlink", + "mac_state.pending_application_downlink.attributes", "mac_state.pending_application_downlink.class_b_c", "mac_state.pending_application_downlink.class_b_c.absolute_time", "mac_state.pending_application_downlink.class_b_c.gateways", @@ -5970,8 +6130,22 @@ "mac_state.pending_application_downlink.f_cnt", "mac_state.pending_application_downlink.f_port", "mac_state.pending_application_downlink.frm_payload", + "mac_state.pending_application_downlink.locations", + "mac_state.pending_application_downlink.network_ids", + "mac_state.pending_application_downlink.network_ids.cluster_address", + "mac_state.pending_application_downlink.network_ids.cluster_id", + "mac_state.pending_application_downlink.network_ids.net_id", + "mac_state.pending_application_downlink.network_ids.ns_id", + "mac_state.pending_application_downlink.network_ids.tenant_address", + "mac_state.pending_application_downlink.network_ids.tenant_id", "mac_state.pending_application_downlink.priority", "mac_state.pending_application_downlink.session_key_id", + "mac_state.pending_application_downlink.version_ids", + "mac_state.pending_application_downlink.version_ids.band_id", + "mac_state.pending_application_downlink.version_ids.brand_id", + "mac_state.pending_application_downlink.version_ids.firmware_version", + "mac_state.pending_application_downlink.version_ids.hardware_version", + "mac_state.pending_application_downlink.version_ids.model_id", "mac_state.pending_relay_downlink", "mac_state.pending_relay_downlink.raw_payload", "mac_state.pending_requests", diff --git a/sdk/js/generated/device-entity-map.json b/sdk/js/generated/device-entity-map.json index 0d65c728ec..62da12da47 100644 --- a/sdk/js/generated/device-entity-map.json +++ b/sdk/js/generated/device-entity-map.json @@ -2091,6 +2091,10 @@ "ns", "ns" ], + "attributes": [ + "ns", + "read_only" + ], "class_b_c": { "_root": [ "ns", @@ -2125,6 +2129,40 @@ "ns", "ns" ], + "locations": [ + "ns", + "read_only" + ], + "network_ids": { + "_root": [ + "ns", + "ns" + ], + "cluster_address": [ + "ns", + "ns" + ], + "cluster_id": [ + "ns", + "ns" + ], + "net_id": [ + "ns", + "ns" + ], + "ns_id": [ + "ns", + "ns" + ], + "tenant_address": [ + "ns", + "ns" + ], + "tenant_id": [ + "ns", + "ns" + ] + }, "priority": [ "ns", "ns" @@ -2132,7 +2170,33 @@ "session_key_id": [ "ns", "ns" - ] + ], + "version_ids": { + "_root": [ + "ns", + "read_only" + ], + "band_id": [ + "ns", + "read_only" + ], + "brand_id": [ + "ns", + "read_only" + ], + "firmware_version": [ + "ns", + "read_only" + ], + "hardware_version": [ + "ns", + "read_only" + ], + "model_id": [ + "ns", + "read_only" + ] + } }, "pending_relay_downlink": { "_root": [