Skip to content

Commit

Permalink
feat: STUN implementation (#126)
Browse files Browse the repository at this point in the history
Several changes to implement STUN and general NAT navigation:
- Added a STUN server (from pion/turn) to `locator`
- Implemented a STUN client (from pion/stun) within the `daemon`
- Added logic to `server` to relay API requests to the `daemon` to
configure the STUN and `locator` clients
  • Loading branch information
jgkawell authored Jan 7, 2025
1 parent a00c9a7 commit 54859a7
Show file tree
Hide file tree
Showing 27 changed files with 1,294 additions and 391 deletions.
140 changes: 109 additions & 31 deletions services/platform/daemon/communicate/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import (
type (
Client interface {
Listen()
Send(*v1.DaemonMessage) error
Send(*v1.DaemonMessage)
}
client struct {
mutex sync.Mutex
logger chassis.Logger
stream *connect.BidiStreamForClient[v1.DaemonMessage, v1.ServerMessage]
mdns host.DNSPublisher
mutex sync.Mutex
logger chassis.Logger
stream *connect.BidiStreamForClient[v1.DaemonMessage, v1.ServerMessage]
mdns host.DNSPublisher
stun host.STUNClient
locatorController host.LocatorController
}
)

Expand All @@ -46,11 +48,13 @@ var (
ErrNoStream = fmt.Errorf("no stream")
)

func NewClient(logger chassis.Logger, mdns host.DNSPublisher) Client {
func NewClient(logger chassis.Logger, mdns host.DNSPublisher, stun host.STUNClient, locatorController host.LocatorController) Client {
clientSingleton = &client{
mutex: sync.Mutex{},
logger: logger,
mdns: mdns,
mutex: sync.Mutex{},
logger: logger,
mdns: mdns,
stun: stun,
locatorController: locatorController,
}
return clientSingleton
}
Expand Down Expand Up @@ -84,11 +88,12 @@ func (c *client) Listen() {
})
// send the SettingsSaved event to cover the case where the daemon could be restarted while running the `nixos-rebuild switch` command
g.Go(func() error {
return c.Send(&v1.DaemonMessage{
c.Send(&v1.DaemonMessage{
Message: &v1.DaemonMessage_SettingsSaved{
SettingsSaved: &v1.SettingsSaved{},
},
})
return nil
})

// wait on errors
Expand Down Expand Up @@ -116,7 +121,21 @@ func newInsecureClient() *http.Client {
}
}

func (c *client) Send(message *v1.DaemonMessage) error {
func (c *client) Send(message *v1.DaemonMessage) {
if c.stream == nil {
c.logger.WithError(ErrNoStream).Error("failed to send message to server")
return
}
c.mutex.Lock()
defer c.mutex.Unlock()
err := c.stream.Send(message)
if err != nil {
c.logger.WithError(ErrNoStream).Error("failed to send message to server")
return
}
}

func (c *client) SendWithError(message *v1.DaemonMessage) error {
if c.stream == nil {
return ErrNoStream
}
Expand Down Expand Up @@ -164,6 +183,14 @@ func (c *client) listen(ctx context.Context) error {
go c.addWireguardInterface(ctx, message.GetAddWireguardInterface())
case *v1.ServerMessage_RemoveWireguardInterface:
go c.removeWireguardInterface(ctx, message.GetRemoveWireguardInterface())
case *v1.ServerMessage_SetStunServerCommand:
go c.setSTUNServer(ctx, message.GetSetStunServerCommand())
case *v1.ServerMessage_AddLocatorServerCommand:
go c.addLocatorServer(ctx, message.GetAddLocatorServerCommand())
case *v1.ServerMessage_RemoveLocatorServerCommand:
go c.removeLocatorServer(ctx, message.GetRemoveLocatorServerCommand())
case *v1.ServerMessage_DisableAllLocatorsCommand:
go c.disableAllLocators(ctx, message.GetDisableAllLocatorsCommand())
case *v1.ServerMessage_AddWireguardPeer:
go c.addWireguardPeer(ctx, message.GetAddWireguardPeer().GetPeer())
default:
Expand All @@ -174,7 +201,7 @@ func (c *client) listen(ctx context.Context) error {

func (c *client) heartbeat() error {
for {
err := c.Send(&v1.DaemonMessage{
err := c.SendWithError(&v1.DaemonMessage{
Message: &v1.DaemonMessage_Heartbeat{},
})
if err != nil {
Expand All @@ -194,14 +221,11 @@ func (c *client) systemStats(ctx context.Context) error {
if err != nil {
c.logger.WithError(err).Error("failed to collect system stats")
}
err = c.Send(&v1.DaemonMessage{
c.Send(&v1.DaemonMessage{
Message: &v1.DaemonMessage_SystemStats{
SystemStats: stats,
},
})
if err != nil {
c.logger.WithError(err).Error("failed to send system stats message")
}
}()
time.Sleep(host.ComputeMeasurementDuration)
}
Expand Down Expand Up @@ -251,16 +275,13 @@ func (c *client) osUpdateDiff(ctx context.Context) {
})
return
} else {
err := c.Send(&v1.DaemonMessage{
c.Send(&v1.DaemonMessage{
Message: &v1.DaemonMessage_OsUpdateDiff{
OsUpdateDiff: &v1.OSUpdateDiff{
Description: osUpdateDiff,
},
},
})
if err != nil {
c.logger.WithError(err).Error("failed to send os update diff to server")
}
}
c.logger.Info("finished generating os version diff successfully")
}
Expand All @@ -281,14 +302,11 @@ func (c *client) currentDaemonVersion() {
})
return
} else {
err := c.Send(&v1.DaemonMessage{
c.Send(&v1.DaemonMessage{
Message: &v1.DaemonMessage_CurrentDaemonVersion{
CurrentDaemonVersion: current,
},
})
if err != nil {
c.logger.WithError(err).Error("failed to send current daemon version to server")
}
}
c.logger.Info("finished getting current daemon version successfully")
}
Expand Down Expand Up @@ -346,7 +364,7 @@ func (c *client) removeMdnsHost(_ context.Context, def *v1.RemoveMdnsHostCommand
func (c *client) saveSettings(ctx context.Context, def *v1.SaveSettingsCommand) {
err := host.SaveSettings(ctx, c.logger, def)
if err != nil {
err = c.Send(&v1.DaemonMessage{
c.Send(&v1.DaemonMessage{
Message: &v1.DaemonMessage_SettingsSaved{
SettingsSaved: &v1.SettingsSaved{
Error: &v1.DaemonError{
Expand All @@ -355,19 +373,79 @@ func (c *client) saveSettings(ctx context.Context, def *v1.SaveSettingsCommand)
},
},
})
if err != nil {
c.logger.WithError(err).Error("failed to send error")
}
return
}

err = c.Send(&v1.DaemonMessage{
c.Send(&v1.DaemonMessage{
Message: &v1.DaemonMessage_SettingsSaved{
SettingsSaved: &v1.SettingsSaved{},
},
})
}

func (c *client) setSTUNServer(_ context.Context, def *v1.SetSTUNServerCommand) {
resp := &v1.DaemonMessage{
Message: &v1.DaemonMessage_StunServerSet{
StunServerSet: &v1.STUNServerSet{},
},
}

_, err := c.stun.Bind(def.Server)
if err != nil {
c.logger.WithError(err).Error("failed to send success")
return
c.logger.WithError(err).Error("failed to bind to new stun server")
resp.GetStunServerSet().Error = &v1.DaemonError{Error: err.Error()}
}

c.Send(resp)
}

func (c *client) addLocatorServer(ctx context.Context, cmd *v1.AddLocatorServerCommand) {
resp := &v1.DaemonMessage{
Message: &v1.DaemonMessage_LocatorServerAdded{
LocatorServerAdded: &v1.LocatorServerAdded{},
},
}

locator, err := c.locatorController.AddLocator(ctx, cmd.LocatorAddress)
if err != nil {
c.logger.WithError(err).Error("failed to add locator server")
resp.GetLocatorServerAdded().Error = &v1.DaemonError{Error: err.Error()}
}
resp.GetLocatorServerAdded().Locator = locator

c.Send(resp)
}

func (c *client) removeLocatorServer(ctx context.Context, cmd *v1.RemoveLocatorServerCommand) {
resp := &v1.DaemonMessage{
Message: &v1.DaemonMessage_LocatorServerRemoved{
LocatorServerRemoved: &v1.LocatorServerRemoved{
Address: cmd.LocatorAddress,
},
},
}

err := c.locatorController.RemoveLocator(ctx, cmd.LocatorAddress)
if err != nil {
c.logger.WithError(err).Error("failed to remove locator server")
resp.GetLocatorServerRemoved().Error = &v1.DaemonError{Error: err.Error()}
}

c.Send(resp)
}

func (c *client) disableAllLocators(ctx context.Context, cmd *v1.DisableAllLocatorsCommand) {
resp := &v1.DaemonMessage{
Message: &v1.DaemonMessage_AllLocatorsDisabled{
AllLocatorsDisabled: &v1.AllLocatorsDisabled{},
},
}

err := c.locatorController.Disable(ctx)
if err != nil {
c.logger.WithError(err).Error("failed to disable all locators")
resp.GetAllLocatorsDisabled().Error = &v1.DaemonError{Error: err.Error()}
}

c.Send(resp)
}
4 changes: 2 additions & 2 deletions services/platform/daemon/communicate/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (c *client) uploadFile(_ context.Context, def *v1.UploadFileRequest) {
}

// repond to server with ready
err = c.Send(&v1.DaemonMessage{
err = c.SendWithError(&v1.DaemonMessage{
Message: &v1.DaemonMessage_UploadFileReady{
UploadFileReady: &v1.UploadFileReady{
Id: info.FileId,
Expand Down Expand Up @@ -78,7 +78,7 @@ func (c *client) uploadFile(_ context.Context, def *v1.UploadFileRequest) {
log.Debug("wrote temp file")

// repond to server with chunk completion
err = c.Send(&v1.DaemonMessage{
err = c.SendWithError(&v1.DaemonMessage{
Message: &v1.DaemonMessage_UploadFileChunkCompleted{
UploadFileChunkCompleted: &v1.UploadFileChunkCompleted{
FileId: chunk.FileId,
Expand Down
3 changes: 3 additions & 0 deletions services/platform/daemon/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ service:
port: 9000
bind_address: localhost
env: test
logging:
level: debug

daemon:
# server: http://home-cloud.local
server: http://localhost:8000
domain: local
hostnames: []
63 changes: 51 additions & 12 deletions services/platform/daemon/go.mod
Original file line number Diff line number Diff line change
@@ -1,61 +1,100 @@
module github.com/home-cloud-io/core/services/platform/daemon

go 1.22.5
go 1.23.0

toolchain go1.23.4

// replace github.com/home-cloud-io/core/api => ../../../api

// replace github.com/steady-bytes/draft/pkg/chassis => ../../../../../steady-bytes/draft/pkg/chassis

require (
connectrpc.com/connect v1.16.2
github.com/google/uuid v1.6.0
github.com/home-cloud-io/core/api v0.8.1
github.com/mackerelio/go-osstat v0.2.5
github.com/spf13/viper v1.18.2
github.com/steady-bytes/draft/pkg/chassis v0.3.0
github.com/netbirdio/netbird v0.34.1
github.com/pion/stun/v2 v2.0.0
github.com/steady-bytes/draft/pkg/chassis v0.3.5
github.com/steady-bytes/draft/pkg/loggers v0.2.3
golang.org/x/mod v0.13.0
golang.org/x/net v0.25.0
golang.org/x/sync v0.7.0
golang.org/x/mod v0.18.0
golang.org/x/net v0.30.0
golang.org/x/sync v0.8.0
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20230429144221-925a1e7659e6
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
)

require (
connectrpc.com/grpcreflect v1.2.0 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/aws/aws-sdk-go-v2 v1.30.3 // indirect
github.com/aws/aws-sdk-go-v2/config v1.27.27 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.27 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 // indirect
github.com/aws/aws-sdk-go-v2/service/route53 v1.42.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.22.4 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 // indirect
github.com/aws/smithy-go v1.20.3 // indirect
github.com/boltdb/bolt v1.3.1 // indirect
github.com/caddyserver/certmagic v0.21.3 // indirect
github.com/caddyserver/zerossl v0.1.3 // indirect
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.15.0 // indirect
github.com/envoyproxy/go-control-plane v0.12.0 // indirect
github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-msgpack/v2 v2.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/raft v1.6.0 // indirect
github.com/hashicorp/raft-boltdb/v2 v2.3.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/libdns/libdns v0.2.2 // indirect
github.com/libdns/route53 v1.5.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mholt/acmez/v2 v2.0.1 // indirect
github.com/miekg/dns v1.1.59 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pion/dtls/v2 v2.2.10 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/transport/v2 v2.2.4 // indirect
github.com/pion/transport/v3 v3.0.1 // indirect
github.com/rs/cors v1.10.1 // indirect
github.com/rs/zerolog v1.32.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/steady-bytes/draft/api v0.3.1 // indirect
github.com/spf13/viper v1.18.2 // indirect
github.com/steady-bytes/draft/api v0.6.1 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.etcd.io/bbolt v1.3.8 // indirect
github.com/zeebo/blake3 v0.2.3 // indirect
go.etcd.io/bbolt v1.3.10 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/tools v0.22.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.65.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)
Loading

0 comments on commit 54859a7

Please sign in to comment.