Skip to content

Commit

Permalink
as: Add end device registry cache implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
vlasebian committed Jan 22, 2025
1 parent 5858f4b commit 83a788d
Show file tree
Hide file tree
Showing 33 changed files with 1,830 additions and 701 deletions.
7 changes: 3 additions & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
run:
skip-dirs:
- node_modules

linters:

Check warning on line 1 in .golangci.yml

View workflow job for this annotation

GitHub Actions / Check Mergeability

.golangci.yml has a conflict when merging TheThingsIndustries/lorawan-stack:v3.33.
disable-all: true
enable:
Expand Down Expand Up @@ -113,3 +109,6 @@ issues:
- linters:
- paralleltest
text: 'does not use range value in test Run'
exclude-dirs:
- node_modules

6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@ For details about compatibility between different releases, see the **Commitment

### Added

- Add the attributes field to all ApplicationUp messages.
- Add the locations, version_ids, network_ids fields to ApplicationUp messages that were missing them, i.e. ApplicationJoinAccept, ApplicationDownlink, ApplicationDownlinkFailed, ApplicationInvalidatedDownlinks, ApplicationServiceData.
- Add Timeout and Cache fields in the EndDeviceMetadataStorageConfig of the AS.

### Changed

### Deprecated

- Deprecate the Location field (and its subfields) in the EndDeviceMetadataStorageConfig of AS.

### Removed

### Fixed
Expand Down
14 changes: 6 additions & 8 deletions cmd/internal/shared/applicationserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,12 @@ var DefaultApplicationServerConfig = applicationserver.Config{
Downlinks: web.DownlinksConfig{PublicAddress: shared.DefaultPublicURL + "/api/v3"},
},
EndDeviceMetadataStorage: applicationserver.EndDeviceMetadataStorageConfig{
Location: applicationserver.EndDeviceLocationStorageConfig{
Timeout: 5 * time.Second,
Cache: applicationserver.EndDeviceLocationStorageCacheConfig{
Enable: true,
MinRefreshInterval: 15 * time.Minute,
MaxRefreshInterval: 4 * time.Hour,
TTL: 14 * 24 * time.Hour,
},
Timeout: 5 * time.Second,
Cache: applicationserver.EndDeviceRegistryStorageCacheConfig{
Enable: true,
MinRefreshInterval: 15 * time.Minute,
MaxRefreshInterval: 4 * time.Hour,
TTL: 14 * 24 * time.Hour,
},
},
Distribution: applicationserver.DistributionConfig{
Expand Down
10 changes: 5 additions & 5 deletions cmd/ttn-lw-stack/commands/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,21 +445,21 @@ var startCommand = &cobra.Command{
}
config.AS.Webhooks.Registry = webhookRegistry
}
if cache := &config.AS.EndDeviceMetadataStorage.Location.Cache; cache.Enable {
if cache := &config.AS.EndDeviceMetadataStorage.Cache; cache.Enable {
switch config.Cache.Service {
case "redis":
cache.Cache = &asmetaredis.EndDeviceLocationCache{
Redis: redis.New(config.Cache.Redis.WithNamespace("as", "metadata", "locations")),
cache.Cache = &asmetaredis.EndDeviceCache{
Redis: redis.New(config.Cache.Redis.WithNamespace("as", "metadata", "end-devices")),
}
default:
cache.Enable = false
}
}
locationRegistry, err := config.AS.EndDeviceMetadataStorage.Location.NewRegistry(ctx, c)
endDeviceRegistry, err := config.AS.EndDeviceMetadataStorage.NewRegistry(ctx, c)
if err != nil {
return shared.ErrInitializeApplicationServer.WithCause(err)
}
config.AS.EndDeviceMetadataStorage.Location.Registry = locationRegistry
config.AS.EndDeviceMetadataStorage.Registry = endDeviceRegistry
as, err := applicationserver.New(c, &config.AS)
if err != nil {
return shared.ErrInitializeApplicationServer.WithCause(err)
Expand Down
29 changes: 28 additions & 1 deletion config/messages.json
Original file line number Diff line number Diff line change
Expand Up @@ -2861,13 +2861,40 @@
"file": "io.go"
}
},
"error:pkg/applicationserver/metadata/redis:cache_entry_malformed": {
"translations": {
"en": "cache entry malformed"
},
"description": {
"package": "pkg/applicationserver/metadata/redis",
"file": "redis.go"
}
},
"error:pkg/applicationserver/metadata/redis:cache_miss": {
"translations": {
"en": "cache miss"
},
"description": {
"package": "pkg/applicationserver/metadata/redis",
"file": "location_cache.go"
"file": "redis.go"
}
},
"error:pkg/applicationserver/metadata:end_device_not_found": {
"translations": {
"en": "end device not found"
},
"description": {
"package": "pkg/applicationserver/metadata",
"file": "metadata.go"
}
},
"error:pkg/applicationserver/metadata:field_mask_path_not_supported": {
"translations": {
"en": "field mask path `{path}` is not supported"
},
"description": {
"package": "pkg/applicationserver/metadata",
"file": "metadata.go"
}
},
"error:pkg/applicationserver/redis:application_uid": {
Expand Down
2 changes: 1 addition & 1 deletion data/lorawan-devices
Submodule lorawan-devices updated 75 files
+1 −1 lib/payload.json
+4 −3 schema.json
+21 −44 vendor/accuwatch/3chbatteryvoltagesensor.yaml
+0 −14 vendor/accuwatch/index.yaml
+97 −0 vendor/decentlab/dl-cws2-codec.yaml
+109 −0 vendor/decentlab/dl-cws2.js
+ vendor/decentlab/dl-cws2.png
+93 −0 vendor/decentlab/dl-cws2.yaml
+1 −1 vendor/decentlab/dl-dws.yaml
+69 −0 vendor/decentlab/dl-ifd-codec.yaml
+80 −0 vendor/decentlab/dl-ifd.js
+ vendor/decentlab/dl-ifd.png
+88 −0 vendor/decentlab/dl-ifd.yaml
+69 −0 vendor/decentlab/dl-ilt-codec.yaml
+80 −0 vendor/decentlab/dl-ilt.js
+ vendor/decentlab/dl-ilt.png
+88 −0 vendor/decentlab/dl-ilt.yaml
+69 −0 vendor/decentlab/dl-isd-codec.yaml
+80 −0 vendor/decentlab/dl-isd.js
+ vendor/decentlab/dl-isd.png
+88 −0 vendor/decentlab/dl-isd.yaml
+1 −1 vendor/decentlab/dl-pr36.yaml
+1 −1 vendor/decentlab/dl-zn1.yaml
+1 −1 vendor/decentlab/dl-zn2.yaml
+4 −0 vendor/decentlab/index.yaml
+25 −17 vendor/dragino/lht65.js
+33 −13 vendor/dragino/lse01.js
+1 −1 vendor/dragino/lsn50-v2-codec.yaml
+29 −12 vendor/dragino/lsn50-v2.js
+1 −1 vendor/dragino/lsn50v2-s31-codec.yaml
+29 −12 vendor/dragino/lsn50v2-s31.js
+16 −0 vendor/index.yaml
+61 −0 vendor/makerfabs/ath20-codec.yaml
+ vendor/makerfabs/ath20.jpg
+54 −0 vendor/makerfabs/ath20.js
+109 −0 vendor/makerfabs/ath20.yaml
+47 −0 vendor/makerfabs/eu868-profile.yaml
+2 −0 vendor/makerfabs/index.yaml
+47 −0 vendor/makerfabs/us915-profile.yaml
+4 −4 vendor/mclimate/16aspm-codec.yaml
+1 −0 vendor/mclimate/16aspm-profile.yaml
+1 −4 vendor/mclimate/16aspm.yaml
+1 −1 vendor/mclimate/co2-display-lite.yaml
+1 −1 vendor/mclimate/co2-display.yaml
+1 −0 vendor/mclimate/fan-coil-thermostat-profile.yaml
+1 −1 vendor/mclimate/fan-coil-thermostat.yaml
+2 −2 vendor/mclimate/ht-sensor.js
+4 −0 vendor/mclimate/index.yaml
+4 −0 vendor/micropelt/mlr003-codec.yaml
+1 −0 vendor/micropelt/mlr003.js
+19 −9 vendor/milesight-iot/em300-th.js
+1 −1 vendor/milesight-iot/ws301-codec.yaml
+13 −8 vendor/milesight-iot/ws301.js
+1 −0 vendor/netvox/index.yaml
+184 −0 vendor/netvox/payload/r315la.js
+1 −1 vendor/netvox/payload/r603.js
+ vendor/netvox/photos/r315la.jpg
+140 −0 vendor/netvox/r315la-codec.yaml
+63 −0 vendor/netvox/r315la.yaml
+1 −1 vendor/netvox/r603-codec.yaml
+83 −1 vendor/plenom/busylight.js
+1 −1 vendor/quandify/cubicmeter-1-1-copper.yaml
+1 −1 vendor/quandify/cubicmeter-1-1-plastic.yaml
+0 −20 vendor/quandify/cubicmeter-1-1-uplink.js
+12 −9 vendor/sensative/strips.js
+1 −1 vendor/stmicroelectronics/nucleo-wl55jc-codec.yaml
+1 −1 vendor/stmicroelectronics/nucleo-wl55jc.js
+14 −0 vendor/yobiiq/index.yaml
+20 −0 vendor/yobiiq/sd-1001-codec.yaml
+ vendor/yobiiq/sd-1001-package.png
+47 −0 vendor/yobiiq/sd-1001-profile.yaml
+458 −0 vendor/yobiiq/sd-1001.js
+ vendor/yobiiq/sd-1001.png
+62 −0 vendor/yobiiq/sd-1001.yaml
+1 −0 vendor/yobiiq/yobiiq_logo.svg
2 changes: 1 addition & 1 deletion data/lorawan-frequency-plans
116 changes: 92 additions & 24 deletions pkg/applicationserver/applicationserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type ApplicationServer struct {

linkRegistry LinkRegistry
deviceRegistry DeviceRegistry
locationRegistry metadata.EndDeviceLocationRegistry
endDeviceRegistry metadata.EndDeviceRegistry
formatters messageprocessors.MapPayloadProcessor
webhooks ioweb.Webhooks
webhookTemplates ioweb.TemplateStore
Expand Down Expand Up @@ -156,14 +156,14 @@ func New(c *component.Component, conf *Config) (as *ApplicationServer, err error
}

as = &ApplicationServer{
Component: c,
ctx: ctx,
config: conf,
linkRegistry: conf.Links,
deviceRegistry: wrapEndDeviceRegistryWithReplacedFields(conf.Devices, replacedEndDeviceFields...),
appPkgRegistry: conf.Packages.Registry,
locationRegistry: conf.EndDeviceMetadataStorage.Location.Registry,
formatters: make(messageprocessors.MapPayloadProcessor),
Component: c,
ctx: ctx,
config: conf,
linkRegistry: conf.Links,
deviceRegistry: wrapEndDeviceRegistryWithReplacedFields(conf.Devices, replacedEndDeviceFields...),
appPkgRegistry: conf.Packages.Registry,
endDeviceRegistry: conf.EndDeviceMetadataStorage.Registry,
formatters: make(messageprocessors.MapPayloadProcessor),
clusterDistributor: distribution.NewPubSubDistributor(
ctx,
c,
Expand Down Expand Up @@ -1049,6 +1049,14 @@ func (as *ApplicationServer) handleJoinAccept(
return err
}

if entity, err := as.endDeviceRegistry.Get(ctx, ids, []string{"attributes"}); err != nil {
log.FromContext(ctx).WithError(err).Warn(
"Failed to retrieve end device attributes on join-accept",
)
} else {
joinAccept.Attributes = entity.Attributes
}

// Publish last seen event.
if err := as.deviceLastSeenPool.Publish(ctx, lastSeenAtInfo{
ids: ids,
Expand Down Expand Up @@ -1159,6 +1167,7 @@ func (as *ApplicationServer) publishNormalizedUplink(ctx context.Context, info u
Locations: info.uplink.Locations,
VersionIds: info.uplink.VersionIds,
NetworkIds: info.uplink.NetworkIds,
Attributes: info.uplink.Attributes,
},
},
Simulated: info.simulated,
Expand Down Expand Up @@ -1208,6 +1217,15 @@ func (as *ApplicationServer) handleUplink(ctx context.Context, info uplinkInfo)
return err
}

if entity, err := as.endDeviceRegistry.Get(ctx, info.ids, []string{"attributes", "locations"}); err != nil {
log.FromContext(ctx).WithError(err).Warn(
"Failed to retrieve the end device attributes and locations on uplink",
)
} else {
info.uplink.Attributes = entity.Attributes
info.uplink.Locations = entity.Locations
}

if !as.skipPayloadCrypto(ctx, info.link, dev, dev.Session) {
if err := as.decryptAndDecodeUplink(ctx, dev, info.uplink, info.link.DefaultFormatters); err != nil {
return err
Expand All @@ -1227,11 +1245,6 @@ func (as *ApplicationServer) handleUplink(ctx context.Context, info uplinkInfo)
}

// Set location in message and publish location solved if the payload contains location information.
if locations, err := as.locationRegistry.Get(ctx, info.ids); err != nil {
log.FromContext(ctx).WithError(err).Warn("Failed to retrieve end device locations")
} else {
info.uplink.Locations = locations
}
loc := as.locationFromPayload(info.uplink)
if loc != nil {
if info.uplink.Locations == nil {
Expand Down Expand Up @@ -1285,10 +1298,14 @@ func (as *ApplicationServer) handleSimulatedUplink(ctx context.Context, info upl
return err
}

if locations, err := as.locationRegistry.Get(ctx, info.ids); err != nil {
log.FromContext(ctx).WithError(err).Warn("Failed to retrieve end device locations")
if entity, err := as.endDeviceRegistry.Get(ctx, info.ids, []string{"attributes", "locations"}); err != nil {
log.FromContext(ctx).WithError(err).Warn(
"Failed to retrieve the end device from entity registry on simulated uplink",
)
} else {
info.uplink.Locations = locations
info.uplink.Attributes = entity.Attributes
info.uplink.Locations = entity.Locations

}

if err := as.decodeUplink(ctx, dev, info.uplink, info.link.DefaultFormatters); err != nil {
Expand Down Expand Up @@ -1370,7 +1387,19 @@ func (as *ApplicationServer) handleDownlinkQueueInvalidated(
return dev, mask, nil
},
)
return pass, err
if err != nil {
return pass, err
}

if entity, err := as.endDeviceRegistry.Get(ctx, ids, []string{"attributes"}); err != nil {
log.FromContext(ctx).WithError(err).Warn(
"Failed to retrieve end device attributes on downlink queue invalidated",
)
} else {
invalid.Attributes = entity.Attributes
}

return pass, nil
}

func (as *ApplicationServer) handleDownlinkNack(
Expand Down Expand Up @@ -1455,17 +1484,47 @@ func (as *ApplicationServer) handleDownlinkNack(
return dev, mask, nil
},
)
return err
if err != nil {
return err
}

if _, err = as.endDeviceRegistry.Set(ctx, ids, []string{"attributes"},
func(entity *ttnpb.EndDevice) (*ttnpb.EndDevice, []string, error) {
if entity == nil {
return nil, nil, errDeviceNotFound.WithAttributes("device_uid", unique.ID(ctx, ids))
}
msg.Attributes = entity.Attributes
return entity, []string{""}, nil
},
); err != nil {
log.FromContext(ctx).WithError(err).Warn(
"Failed to retrieve end device attributes on downlink nack",
)
}

return nil
}

// handleLocationSolved saves the provided *ttnpb.ApplicationLocation in the Entity Registry as part of the device locations.
// Locations provided by other services will be maintained.
// handleLocationSolved saves the provided *ttnpb.ApplicationLocation in the Entity Registry as part of the device
// locations. Locations provided by other services will be maintained.
func (as *ApplicationServer) handleLocationSolved(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, msg *ttnpb.ApplicationLocation, link *ttnpb.ApplicationLink) error {
defer trace.StartRegion(ctx, "handle location solved").End()

if _, err := as.locationRegistry.Merge(ctx, ids, map[string]*ttnpb.Location{
msg.Service: msg.Location,
}); err != nil {
if _, err := as.endDeviceRegistry.Set(ctx, ids, []string{"locations"},
func(stored *ttnpb.EndDevice) (*ttnpb.EndDevice, []string, error) {
if stored == nil {
return nil, nil, errDeviceNotFound.WithAttributes("device_uid", unique.ID(ctx, ids))
}

if len(stored.Locations) == 0 {
stored.Locations = make(map[string]*ttnpb.Location)
}

stored.Locations[msg.Service] = msg.Location

return stored, []string{"locations"}, nil
},
); err != nil {
log.FromContext(ctx).WithError(err).Warn("Failed to merge end device locations")
}
return nil
Expand All @@ -1486,6 +1545,15 @@ func (as *ApplicationServer) decryptDownlinkMessage(ctx context.Context, ids *tt
if err != nil {
return err
}

if entity, err := as.endDeviceRegistry.Get(ctx, ids, []string{"attributes"}); err != nil {
log.FromContext(ctx).WithError(err).Warn(
"Failed to retrieve end device attributes on downlink message",
)
} else {
msg.Attributes = entity.Attributes
}

var session *ttnpb.Session
switch {
case dev.Session != nil && bytes.Equal(dev.Session.Keys.SessionKeyId, msg.SessionKeyId):
Expand Down
16 changes: 4 additions & 12 deletions pkg/applicationserver/applicationserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,7 @@ func TestApplicationServer(t *testing.T) {
},
},
EndDeviceMetadataStorage: applicationserver.EndDeviceMetadataStorageConfig{
Location: applicationserver.EndDeviceLocationStorageConfig{
Registry: metadata.NewNoopEndDeviceLocationRegistry(),
},
Registry: metadata.NewNoopEndDeviceRegistry(),
},
Downlinks: applicationserver.DownlinksConfig{
ConfirmationConfig: applicationserver.ConfirmationConfig{
Expand Down Expand Up @@ -2414,9 +2412,7 @@ func TestSkipPayloadCrypto(t *testing.T) {
},
},
EndDeviceMetadataStorage: applicationserver.EndDeviceMetadataStorageConfig{
Location: applicationserver.EndDeviceLocationStorageConfig{
Registry: metadata.NewNoopEndDeviceLocationRegistry(),
},
Registry: metadata.NewNoopEndDeviceRegistry(),
},
Downlinks: applicationserver.DownlinksConfig{
ConfirmationConfig: applicationserver.ConfirmationConfig{
Expand Down Expand Up @@ -2916,9 +2912,7 @@ func TestLocationFromPayload(t *testing.T) {
},
},
EndDeviceMetadataStorage: applicationserver.EndDeviceMetadataStorageConfig{
Location: applicationserver.EndDeviceLocationStorageConfig{
Registry: metadata.NewClusterEndDeviceLocationRegistry(c, (1<<4)*Timeout),
},
Registry: metadata.NewClusterEndDeviceRegistry(c, (1<<4)*Timeout),
},
Downlinks: applicationserver.DownlinksConfig{
ConfirmationConfig: applicationserver.ConfirmationConfig{
Expand Down Expand Up @@ -3101,9 +3095,7 @@ func TestUplinkNormalized(t *testing.T) {
},
},
EndDeviceMetadataStorage: applicationserver.EndDeviceMetadataStorageConfig{
Location: applicationserver.EndDeviceLocationStorageConfig{
Registry: metadata.NewClusterEndDeviceLocationRegistry(c, (1<<4)*Timeout),
},
Registry: metadata.NewClusterEndDeviceRegistry(c, (1<<4)*Timeout),
},
Downlinks: applicationserver.DownlinksConfig{
ConfirmationConfig: applicationserver.ConfirmationConfig{
Expand Down
Loading

0 comments on commit 83a788d

Please sign in to comment.