Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
johanstokking committed Jul 11, 2024
1 parent 2281cfd commit 3f08f48
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 19 deletions.
5 changes: 4 additions & 1 deletion pkg/gatewayserver/io/ttigw/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ var (
}
)

func toUplinkMessage(fp *frequencyplans.FrequencyPlan, msg *lorav1.UplinkMessage) (*ttnpb.UplinkMessage, error) {
func toUplinkMessage(
ids *ttnpb.GatewayIdentifiers, fp *frequencyplans.FrequencyPlan, msg *lorav1.UplinkMessage,
) (*ttnpb.UplinkMessage, error) {
if msg.Board != 0 {
return nil, errInvalidBoard.WithAttributes("board", msg.Board)
}
Expand All @@ -161,6 +163,7 @@ func toUplinkMessage(fp *frequencyplans.FrequencyPlan, msg *lorav1.UplinkMessage
frequency uint64
dataRate = &ttnpb.DataRate{}
rxMetadata = &ttnpb.RxMetadata{
GatewayIds: ids,
Timestamp: msg.Timestamp,
Rssi: -msg.RssiChannelNegated,
ChannelRssi: -msg.RssiChannelNegated,
Expand Down
7 changes: 4 additions & 3 deletions pkg/gatewayserver/io/ttigw/ttigw.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (f *Frontend) handleGet(w http.ResponseWriter, r *http.Request) {
return
}

f.handleConnection(ctx, wsConn, srvConn)
f.handleConnection(wsConn, srvConn)
}

var errInvalidGatewayEUI = errors.Define("invalid_gateway_eui", "invalid gateway EUI", "common_name")
Expand Down Expand Up @@ -232,7 +232,8 @@ func sendMessages(
}
}

func (f *Frontend) handleConnection(ctx context.Context, wsConn *websocket.Conn, srvConn *io.Connection) {
func (f *Frontend) handleConnection(wsConn *websocket.Conn, srvConn *io.Connection) {
ctx := srvConn.Context()
logger := log.FromContext(ctx)

gwConfig, err := buildLoRaGatewayConfig(srvConn.PrimaryFrequencyPlan())
Expand Down Expand Up @@ -349,7 +350,7 @@ func processGatewayMessage(
logger.WithField("count", len(msg.UplinkMessagesNotification.Messages)).Debug("Received uplink messages")
uplinkMessages := make([]*ttnpb.UplinkMessage, 0, len(msg.UplinkMessagesNotification.Messages))
for _, uplinkMsg := range msg.UplinkMessagesNotification.Messages {
up, err := toUplinkMessage(srvConn.PrimaryFrequencyPlan(), uplinkMsg)
up, err := toUplinkMessage(srvConn.Gateway().Ids, srvConn.PrimaryFrequencyPlan(), uplinkMsg)
if err != nil {
logger.WithError(err).Warn("Failed to convert uplink message")
continue
Expand Down
208 changes: 193 additions & 15 deletions pkg/gatewayserver/io/ttigw/ttigw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,58 @@ package ttigw_test
import (
"context"
"crypto/tls"
"crypto/x509"
_ "embed"
"fmt"
"net/http"
"testing"

lorav1 "go.thethings.industries/pkg/api/gen/tti/gateway/data/lora/v1"
"go.thethings.network/lorawan-stack/v3/pkg/component"
"go.thethings.network/lorawan-stack/v3/pkg/config"
"go.thethings.network/lorawan-stack/v3/pkg/config/tlsconfig"
"go.thethings.network/lorawan-stack/v3/pkg/errorcontext"
"go.thethings.network/lorawan-stack/v3/pkg/gatewayserver"
"go.thethings.network/lorawan-stack/v3/pkg/gatewayserver/io/iotest"
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
"go.thethings.network/lorawan-stack/v3/pkg/types"
"go.thethings.network/lorawan-stack/v3/pkg/util/test"
"google.golang.org/protobuf/proto"
"nhooyr.io/websocket"
)

func TestFrontend(t *testing.T) {
//go:embed testdata/serverca.pem
var serverCAPEM []byte

func writeMessage(ctx context.Context, conn *websocket.Conn, msg *lorav1.GatewayMessage) error {
data, err := proto.Marshal(msg)
if err != nil {
return err
}
return conn.Write(ctx, websocket.MessageBinary, data)
}

func readMessage(ctx context.Context, conn *websocket.Conn) (*lorav1.NetworkServerMessage, error) {
_, data, err := conn.Read(ctx)
if err != nil {
return nil, err
}
msg := &lorav1.NetworkServerMessage{}
if err := proto.Unmarshal(data, msg); err != nil {
return nil, err
}
return msg, nil
}

func TestFrontend(t *testing.T) { //nolint:gocyclo
t.Parallel()

gatewayCerts := map[types.EUI64]tls.Certificate{
{0xaa, 0xee, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}: test.Must(
tls.LoadX509KeyPair("testdata/aaee00000000000000.pem", "testdata/aaee00000000000000-key.pem"),
tls.LoadX509KeyPair("testdata/aaee000000000000.pem", "testdata/aaee000000000000-key.pem"),
),
{0xbb, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}: test.Must(
tls.LoadX509KeyPair("testdata/bbff00000000000000.pem", "testdata/bbff00000000000000-key.pem"),
tls.LoadX509KeyPair("testdata/bbff000000000000.pem", "testdata/bbff000000000000-key.pem"),
),
}

Expand All @@ -50,17 +79,13 @@ func TestFrontend(t *testing.T) {
AuthenticatesWithEUI: true,
IsAuthenticated: true,
DeduplicatesUplinks: true,
UsesGatewayToken: true,
CustomComponentConfig: func(componentConfig *component.Config) {
componentConfig.TLS = tlsconfig.Config{
ServerAuth: tlsconfig.ServerAuth{
Source: "file",
Certificate: "testdata/server.pem",
},
}
componentConfig.KeyVault = config.KeyVault{
Provider: "static",
Static: map[string][]byte{
"test": {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f},
Certificate: "testdata/servercert.pem",
Key: "testdata/serverkey.pem",
},
}
componentConfig.MTLSAuth = config.MTLSAuthConfig{
Expand All @@ -70,21 +95,23 @@ func TestFrontend(t *testing.T) {
},
CustomGatewayServerConfig: func(gsConfig *gatewayserver.Config) {
gsConfig.TheThingsIndustriesGateway.ListenTLS = ":8888"
gsConfig.GatewayTokenHashKeyID = "test"
},
Link: func(
ctx context.Context,
t *testing.T,
gs *gatewayserver.GatewayServer,
_ *testing.T,
_ *gatewayserver.GatewayServer,
ids *ttnpb.GatewayIdentifiers,
key string,
_ string,
upCh <-chan *ttnpb.GatewayUp,
downCh chan<- *ttnpb.GatewayDown,
_ chan<- *ttnpb.GatewayDown,
) error {
rootCAs := x509.NewCertPool()
rootCAs.AppendCertsFromPEM(serverCAPEM)
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = &tls.Config{
MinVersion: tls.VersionTLS12,
Certificates: []tls.Certificate{gatewayCerts[types.MustEUI64(ids.Eui).OrZero()]},
RootCAs: rootCAs,
}
conn, _, err := websocket.Dial( //nolint:bodyclose
ctx,
Expand All @@ -101,6 +128,157 @@ func TestFrontend(t *testing.T) {
}
defer conn.CloseNow() //nolint:errcheck

if err := writeMessage(ctx, conn, &lorav1.GatewayMessage{
Message: &lorav1.GatewayMessage_ClientHelloNotification{
ClientHelloNotification: &lorav1.ClientHelloNotification{
DeviceManufacturer: 0x42,
DeviceModel: "test",
HardwareVersion: "test",
RuntimeVersion: "test",
FirmwareVersion: "test",
},
},
}); err != nil {
return err
}
if serverHello, err := readMessage(ctx, conn); err != nil {
return err
} else if serverHello.GetServerHelloNotification() == nil {
return fmt.Errorf("expected server hello, got %T", serverHello.Message)
}
gwConfig, err := readMessage(ctx, conn)
if err != nil {
return err
} else if gwConfig.GetConfigureLoraGatewayRequest() == nil {
return fmt.Errorf("expected configure LoRa gateway request, got %T", gwConfig.Message)
}
multiSFIFChains := map[uint64]uint32{}
for _, b := range gwConfig.GetConfigureLoraGatewayRequest().Config.Boards {
rfChainFreqs := []int64{int64(b.RfChain0.GetFrequency()), int64(b.RfChain1.GetFrequency())}
for i, multiSF := range []*lorav1.Board_IntermediateFrequencies_MultipleSF{
b.Ifs.GetMultipleSf0(),
b.Ifs.GetMultipleSf1(),
b.Ifs.GetMultipleSf2(),
b.Ifs.GetMultipleSf3(),
b.Ifs.GetMultipleSf4(),
b.Ifs.GetMultipleSf5(),
b.Ifs.GetMultipleSf6(),
b.Ifs.GetMultipleSf7(),
} {
if multiSF == nil {
continue
}
freq := uint64(rfChainFreqs[multiSF.RfChain] + int64(multiSF.Frequency))
multiSFIFChains[freq] = uint32(i)
}
}

ctx, cancel := errorcontext.New(ctx)
replyCh := make(chan *lorav1.GatewayMessage, 1)
// Write upstream.
go func() {
for {
select {
case <-ctx.Done():
return
case msg := <-upCh:
if len(msg.UplinkMessages) > 0 {
messages := make([]*lorav1.UplinkMessage, 0, len(msg.UplinkMessages))
for _, up := range msg.UplinkMessages {
uplink := &lorav1.UplinkMessage{
Board: 0,
Timestamp: up.RxMetadata[0].Timestamp,
RssiChannelNegated: -up.RxMetadata[0].ChannelRssi,
Payload: up.RawPayload,
}
switch mod := up.Settings.DataRate.Modulation.(type) {
case *ttnpb.DataRate_Lora:
dr := &lorav1.UplinkMessage_Lora{
RssiSignalNegated: -up.RxMetadata[0].SignalRssi.GetValue(),
SpreadingFactor: mod.Lora.SpreadingFactor,
CodeRate: map[string]lorav1.CodeRate{
"4/5": lorav1.CodeRate_CODE_RATE_4_5,
"4/6": lorav1.CodeRate_CODE_RATE_4_6,
"4/7": lorav1.CodeRate_CODE_RATE_4_7,
"4/8": lorav1.CodeRate_CODE_RATE_4_8,
}[mod.Lora.CodingRate],
}
if up.RxMetadata[0].Snr < 0 {
dr.Snr = &lorav1.UplinkMessage_Lora_SnrNegative{
SnrNegative: -up.RxMetadata[0].Snr,
}
} else {
dr.Snr = &lorav1.UplinkMessage_Lora_SnrPositive{
SnrPositive: up.RxMetadata[0].Snr,
}
}
uplink.DataRate = &lorav1.UplinkMessage_Lora_{
Lora: dr,
}
if mod.Lora.Bandwidth == 125000 { // Assume multi-SF.
uplink.IfChain = multiSFIFChains[up.Settings.Frequency]
} else {
uplink.IfChain = 9 // LoRa service channel
}
case *ttnpb.DataRate_Fsk:
uplink.DataRate = &lorav1.UplinkMessage_Fsk{
Fsk: &lorav1.UplinkMessage_FSK{},
}
uplink.IfChain = 8 // FSK
}
messages = append(messages, uplink)
}
if err := writeMessage(ctx, conn, &lorav1.GatewayMessage{
Message: &lorav1.GatewayMessage_UplinkMessagesNotification{
UplinkMessagesNotification: &lorav1.UplinkMessagesNotification{
Messages: messages,
},
},
}); err != nil {
cancel(err)
return
}
}
if msg.TxAcknowledgment != nil {
}
case reply := <-replyCh:
data, err := proto.Marshal(reply)
if err != nil {
cancel(err)
return
}
if err := conn.Write(ctx, websocket.MessageBinary, data); err != nil {
cancel(err)
return
}
}
}
}()
// Read downstream.
go func() {
for {
msg, err := readMessage(ctx, conn)
if err != nil {
cancel(err)
return
}
switch msg.Message.(type) {
case *lorav1.NetworkServerMessage_ServerHelloNotification:
case *lorav1.NetworkServerMessage_ConfigureLoraGatewayRequest:
select {
case <-ctx.Done():
return
case replyCh <- &lorav1.GatewayMessage{
TransactionId: msg.TransactionId,
Message: &lorav1.GatewayMessage_ConfigureLoraGatewayResponse{
ConfigureLoraGatewayResponse: &lorav1.ConfigureLoraGatewayResponse{},
},
}:
}
case *lorav1.NetworkServerMessage_ErrorNotification:
}
}
}()
<-ctx.Done()
return ctx.Err()
},
Expand Down

0 comments on commit 3f08f48

Please sign in to comment.