From 3f08f4847b4ed965e8f9a35597b0fb684236d186 Mon Sep 17 00:00:00 2001 From: Johan Stokking Date: Thu, 11 Jul 2024 16:34:17 +0200 Subject: [PATCH] wip --- pkg/gatewayserver/io/ttigw/mapping.go | 5 +- pkg/gatewayserver/io/ttigw/ttigw.go | 7 +- pkg/gatewayserver/io/ttigw/ttigw_test.go | 208 +++++++++++++++++++++-- 3 files changed, 201 insertions(+), 19 deletions(-) diff --git a/pkg/gatewayserver/io/ttigw/mapping.go b/pkg/gatewayserver/io/ttigw/mapping.go index 541cb4f478..ab8769b439 100644 --- a/pkg/gatewayserver/io/ttigw/mapping.go +++ b/pkg/gatewayserver/io/ttigw/mapping.go @@ -149,7 +149,9 @@ var ( } ) -func toUplinkMessage(fp *frequencyplans.FrequencyPlan, msg *lorav1.UplinkMessage) (*ttnpb.UplinkMessage, error) { +func toUplinkMessage( + ids *ttnpb.GatewayIdentifiers, fp *frequencyplans.FrequencyPlan, msg *lorav1.UplinkMessage, +) (*ttnpb.UplinkMessage, error) { if msg.Board != 0 { return nil, errInvalidBoard.WithAttributes("board", msg.Board) } @@ -161,6 +163,7 @@ func toUplinkMessage(fp *frequencyplans.FrequencyPlan, msg *lorav1.UplinkMessage frequency uint64 dataRate = &ttnpb.DataRate{} rxMetadata = &ttnpb.RxMetadata{ + GatewayIds: ids, Timestamp: msg.Timestamp, Rssi: -msg.RssiChannelNegated, ChannelRssi: -msg.RssiChannelNegated, diff --git a/pkg/gatewayserver/io/ttigw/ttigw.go b/pkg/gatewayserver/io/ttigw/ttigw.go index 0991b3e808..5eaaccb10b 100644 --- a/pkg/gatewayserver/io/ttigw/ttigw.go +++ b/pkg/gatewayserver/io/ttigw/ttigw.go @@ -140,7 +140,7 @@ func (f *Frontend) handleGet(w http.ResponseWriter, r *http.Request) { return } - f.handleConnection(ctx, wsConn, srvConn) + f.handleConnection(wsConn, srvConn) } var errInvalidGatewayEUI = errors.Define("invalid_gateway_eui", "invalid gateway EUI", "common_name") @@ -232,7 +232,8 @@ func sendMessages( } } -func (f *Frontend) handleConnection(ctx context.Context, wsConn *websocket.Conn, srvConn *io.Connection) { +func (f *Frontend) handleConnection(wsConn *websocket.Conn, srvConn *io.Connection) { + ctx := srvConn.Context() logger := log.FromContext(ctx) gwConfig, err := buildLoRaGatewayConfig(srvConn.PrimaryFrequencyPlan()) @@ -349,7 +350,7 @@ func processGatewayMessage( logger.WithField("count", len(msg.UplinkMessagesNotification.Messages)).Debug("Received uplink messages") uplinkMessages := make([]*ttnpb.UplinkMessage, 0, len(msg.UplinkMessagesNotification.Messages)) for _, uplinkMsg := range msg.UplinkMessagesNotification.Messages { - up, err := toUplinkMessage(srvConn.PrimaryFrequencyPlan(), uplinkMsg) + up, err := toUplinkMessage(srvConn.Gateway().Ids, srvConn.PrimaryFrequencyPlan(), uplinkMsg) if err != nil { logger.WithError(err).Warn("Failed to convert uplink message") continue diff --git a/pkg/gatewayserver/io/ttigw/ttigw_test.go b/pkg/gatewayserver/io/ttigw/ttigw_test.go index 5d0872d4ab..f92801e569 100644 --- a/pkg/gatewayserver/io/ttigw/ttigw_test.go +++ b/pkg/gatewayserver/io/ttigw/ttigw_test.go @@ -17,29 +17,58 @@ package ttigw_test import ( "context" "crypto/tls" + "crypto/x509" + _ "embed" + "fmt" "net/http" "testing" + lorav1 "go.thethings.industries/pkg/api/gen/tti/gateway/data/lora/v1" "go.thethings.network/lorawan-stack/v3/pkg/component" "go.thethings.network/lorawan-stack/v3/pkg/config" "go.thethings.network/lorawan-stack/v3/pkg/config/tlsconfig" + "go.thethings.network/lorawan-stack/v3/pkg/errorcontext" "go.thethings.network/lorawan-stack/v3/pkg/gatewayserver" "go.thethings.network/lorawan-stack/v3/pkg/gatewayserver/io/iotest" "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" "go.thethings.network/lorawan-stack/v3/pkg/types" "go.thethings.network/lorawan-stack/v3/pkg/util/test" + "google.golang.org/protobuf/proto" "nhooyr.io/websocket" ) -func TestFrontend(t *testing.T) { +//go:embed testdata/serverca.pem +var serverCAPEM []byte + +func writeMessage(ctx context.Context, conn *websocket.Conn, msg *lorav1.GatewayMessage) error { + data, err := proto.Marshal(msg) + if err != nil { + return err + } + return conn.Write(ctx, websocket.MessageBinary, data) +} + +func readMessage(ctx context.Context, conn *websocket.Conn) (*lorav1.NetworkServerMessage, error) { + _, data, err := conn.Read(ctx) + if err != nil { + return nil, err + } + msg := &lorav1.NetworkServerMessage{} + if err := proto.Unmarshal(data, msg); err != nil { + return nil, err + } + return msg, nil +} + +func TestFrontend(t *testing.T) { //nolint:gocyclo t.Parallel() gatewayCerts := map[types.EUI64]tls.Certificate{ {0xaa, 0xee, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}: test.Must( - tls.LoadX509KeyPair("testdata/aaee00000000000000.pem", "testdata/aaee00000000000000-key.pem"), + tls.LoadX509KeyPair("testdata/aaee000000000000.pem", "testdata/aaee000000000000-key.pem"), ), {0xbb, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}: test.Must( - tls.LoadX509KeyPair("testdata/bbff00000000000000.pem", "testdata/bbff00000000000000-key.pem"), + tls.LoadX509KeyPair("testdata/bbff000000000000.pem", "testdata/bbff000000000000-key.pem"), ), } @@ -50,17 +79,13 @@ func TestFrontend(t *testing.T) { AuthenticatesWithEUI: true, IsAuthenticated: true, DeduplicatesUplinks: true, + UsesGatewayToken: true, CustomComponentConfig: func(componentConfig *component.Config) { componentConfig.TLS = tlsconfig.Config{ ServerAuth: tlsconfig.ServerAuth{ Source: "file", - Certificate: "testdata/server.pem", - }, - } - componentConfig.KeyVault = config.KeyVault{ - Provider: "static", - Static: map[string][]byte{ - "test": {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f}, + Certificate: "testdata/servercert.pem", + Key: "testdata/serverkey.pem", }, } componentConfig.MTLSAuth = config.MTLSAuthConfig{ @@ -70,21 +95,23 @@ func TestFrontend(t *testing.T) { }, CustomGatewayServerConfig: func(gsConfig *gatewayserver.Config) { gsConfig.TheThingsIndustriesGateway.ListenTLS = ":8888" - gsConfig.GatewayTokenHashKeyID = "test" }, Link: func( ctx context.Context, - t *testing.T, - gs *gatewayserver.GatewayServer, + _ *testing.T, + _ *gatewayserver.GatewayServer, ids *ttnpb.GatewayIdentifiers, - key string, + _ string, upCh <-chan *ttnpb.GatewayUp, - downCh chan<- *ttnpb.GatewayDown, + _ chan<- *ttnpb.GatewayDown, ) error { + rootCAs := x509.NewCertPool() + rootCAs.AppendCertsFromPEM(serverCAPEM) transport := http.DefaultTransport.(*http.Transport).Clone() transport.TLSClientConfig = &tls.Config{ MinVersion: tls.VersionTLS12, Certificates: []tls.Certificate{gatewayCerts[types.MustEUI64(ids.Eui).OrZero()]}, + RootCAs: rootCAs, } conn, _, err := websocket.Dial( //nolint:bodyclose ctx, @@ -101,6 +128,157 @@ func TestFrontend(t *testing.T) { } defer conn.CloseNow() //nolint:errcheck + if err := writeMessage(ctx, conn, &lorav1.GatewayMessage{ + Message: &lorav1.GatewayMessage_ClientHelloNotification{ + ClientHelloNotification: &lorav1.ClientHelloNotification{ + DeviceManufacturer: 0x42, + DeviceModel: "test", + HardwareVersion: "test", + RuntimeVersion: "test", + FirmwareVersion: "test", + }, + }, + }); err != nil { + return err + } + if serverHello, err := readMessage(ctx, conn); err != nil { + return err + } else if serverHello.GetServerHelloNotification() == nil { + return fmt.Errorf("expected server hello, got %T", serverHello.Message) + } + gwConfig, err := readMessage(ctx, conn) + if err != nil { + return err + } else if gwConfig.GetConfigureLoraGatewayRequest() == nil { + return fmt.Errorf("expected configure LoRa gateway request, got %T", gwConfig.Message) + } + multiSFIFChains := map[uint64]uint32{} + for _, b := range gwConfig.GetConfigureLoraGatewayRequest().Config.Boards { + rfChainFreqs := []int64{int64(b.RfChain0.GetFrequency()), int64(b.RfChain1.GetFrequency())} + for i, multiSF := range []*lorav1.Board_IntermediateFrequencies_MultipleSF{ + b.Ifs.GetMultipleSf0(), + b.Ifs.GetMultipleSf1(), + b.Ifs.GetMultipleSf2(), + b.Ifs.GetMultipleSf3(), + b.Ifs.GetMultipleSf4(), + b.Ifs.GetMultipleSf5(), + b.Ifs.GetMultipleSf6(), + b.Ifs.GetMultipleSf7(), + } { + if multiSF == nil { + continue + } + freq := uint64(rfChainFreqs[multiSF.RfChain] + int64(multiSF.Frequency)) + multiSFIFChains[freq] = uint32(i) + } + } + + ctx, cancel := errorcontext.New(ctx) + replyCh := make(chan *lorav1.GatewayMessage, 1) + // Write upstream. + go func() { + for { + select { + case <-ctx.Done(): + return + case msg := <-upCh: + if len(msg.UplinkMessages) > 0 { + messages := make([]*lorav1.UplinkMessage, 0, len(msg.UplinkMessages)) + for _, up := range msg.UplinkMessages { + uplink := &lorav1.UplinkMessage{ + Board: 0, + Timestamp: up.RxMetadata[0].Timestamp, + RssiChannelNegated: -up.RxMetadata[0].ChannelRssi, + Payload: up.RawPayload, + } + switch mod := up.Settings.DataRate.Modulation.(type) { + case *ttnpb.DataRate_Lora: + dr := &lorav1.UplinkMessage_Lora{ + RssiSignalNegated: -up.RxMetadata[0].SignalRssi.GetValue(), + SpreadingFactor: mod.Lora.SpreadingFactor, + CodeRate: map[string]lorav1.CodeRate{ + "4/5": lorav1.CodeRate_CODE_RATE_4_5, + "4/6": lorav1.CodeRate_CODE_RATE_4_6, + "4/7": lorav1.CodeRate_CODE_RATE_4_7, + "4/8": lorav1.CodeRate_CODE_RATE_4_8, + }[mod.Lora.CodingRate], + } + if up.RxMetadata[0].Snr < 0 { + dr.Snr = &lorav1.UplinkMessage_Lora_SnrNegative{ + SnrNegative: -up.RxMetadata[0].Snr, + } + } else { + dr.Snr = &lorav1.UplinkMessage_Lora_SnrPositive{ + SnrPositive: up.RxMetadata[0].Snr, + } + } + uplink.DataRate = &lorav1.UplinkMessage_Lora_{ + Lora: dr, + } + if mod.Lora.Bandwidth == 125000 { // Assume multi-SF. + uplink.IfChain = multiSFIFChains[up.Settings.Frequency] + } else { + uplink.IfChain = 9 // LoRa service channel + } + case *ttnpb.DataRate_Fsk: + uplink.DataRate = &lorav1.UplinkMessage_Fsk{ + Fsk: &lorav1.UplinkMessage_FSK{}, + } + uplink.IfChain = 8 // FSK + } + messages = append(messages, uplink) + } + if err := writeMessage(ctx, conn, &lorav1.GatewayMessage{ + Message: &lorav1.GatewayMessage_UplinkMessagesNotification{ + UplinkMessagesNotification: &lorav1.UplinkMessagesNotification{ + Messages: messages, + }, + }, + }); err != nil { + cancel(err) + return + } + } + if msg.TxAcknowledgment != nil { + } + case reply := <-replyCh: + data, err := proto.Marshal(reply) + if err != nil { + cancel(err) + return + } + if err := conn.Write(ctx, websocket.MessageBinary, data); err != nil { + cancel(err) + return + } + } + } + }() + // Read downstream. + go func() { + for { + msg, err := readMessage(ctx, conn) + if err != nil { + cancel(err) + return + } + switch msg.Message.(type) { + case *lorav1.NetworkServerMessage_ServerHelloNotification: + case *lorav1.NetworkServerMessage_ConfigureLoraGatewayRequest: + select { + case <-ctx.Done(): + return + case replyCh <- &lorav1.GatewayMessage{ + TransactionId: msg.TransactionId, + Message: &lorav1.GatewayMessage_ConfigureLoraGatewayResponse{ + ConfigureLoraGatewayResponse: &lorav1.ConfigureLoraGatewayResponse{}, + }, + }: + } + case *lorav1.NetworkServerMessage_ErrorNotification: + } + } + }() <-ctx.Done() return ctx.Err() },