Skip to content

Commit

Permalink
fix: improvements to file upload (#116)
Browse files Browse the repository at this point in the history
While there's still some networking/envoy issue causing uploads to fail
sporadically, this PR includes a bunch of improvements to the upload
process. There's a rework to the listener/broadcaster pattern that makes
it non-blocking, more efficient, and provides a buffering pattern for
slower event listeners. The daemon now handles file uploads in a
stateless manner that is also non-blocking.
  • Loading branch information
jgkawell authored Nov 26, 2024
1 parent b9ebff8 commit 5551501
Show file tree
Hide file tree
Showing 12 changed files with 221 additions and 228 deletions.
45 changes: 18 additions & 27 deletions services/platform/daemon/communicate/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,10 @@ type (
Send(*v1.DaemonMessage) error
}
client struct {
mutex sync.Mutex
logger chassis.Logger
stream *connect.BidiStreamForClient[v1.DaemonMessage, v1.ServerMessage]
mdns host.DNSPublisher
fileMetas map[string]fileMeta
chunkMetas sync.Map
}
fileMeta struct {
id string
filePath string
}
chunkMeta struct {
index uint32
fileName string
mutex sync.Mutex
logger chassis.Logger
stream *connect.BidiStreamForClient[v1.DaemonMessage, v1.ServerMessage]
mdns host.DNSPublisher
}
)

Expand All @@ -58,11 +48,9 @@ var (

func NewClient(logger chassis.Logger, mdns host.DNSPublisher) Client {
clientSingleton = &client{
mutex: sync.Mutex{},
logger: logger,
mdns: mdns,
fileMetas: map[string]fileMeta{},
chunkMetas: sync.Map{},
mutex: sync.Mutex{},
logger: logger,
mdns: mdns,
}
return clientSingleton
}
Expand All @@ -77,7 +65,10 @@ func (c *client) Listen() {
c.logger.Fatal("exhausted retries connecting to server - exiting")
os.Exit(1)
}
client := sdConnect.NewDaemonStreamServiceClient(newInsecureClient(), config.GetString("daemon.server"))
client := sdConnect.NewDaemonStreamServiceClient(
newInsecureClient(),
config.GetString("daemon.server"),
)
c.stream = client.Communicate(ctx)

// spin off workers
Expand All @@ -93,7 +84,7 @@ func (c *client) Listen() {
})
// send the SettingsSaved event to cover the case where the daemon could be restarted while running the `nixos-rebuild switch` command
g.Go(func() error {
return c.stream.Send(&v1.DaemonMessage{
return c.Send(&v1.DaemonMessage{
Message: &v1.DaemonMessage_SettingsSaved{
SettingsSaved: &v1.SettingsSaved{},
},
Expand Down Expand Up @@ -181,7 +172,7 @@ func (c *client) listen(ctx context.Context) error {

func (c *client) heartbeat() error {
for {
err := c.stream.Send(&v1.DaemonMessage{
err := c.Send(&v1.DaemonMessage{
Message: &v1.DaemonMessage_Heartbeat{},
})
if err != nil {
Expand All @@ -201,7 +192,7 @@ func (c *client) systemStats(ctx context.Context) error {
if err != nil {
c.logger.WithError(err).Error("failed to collect system stats")
}
err = c.stream.Send(&v1.DaemonMessage{
err = c.Send(&v1.DaemonMessage{
Message: &v1.DaemonMessage_SystemStats{
SystemStats: stats,
},
Expand Down Expand Up @@ -247,7 +238,7 @@ func (c *client) osUpdateDiff(ctx context.Context) {
osUpdateDiff, err := host.GetOSVersionDiff(ctx, c.logger)
if err != nil {
c.logger.WithError(err).Error("failed to get os version diff")
c.stream.Send(&v1.DaemonMessage{
c.Send(&v1.DaemonMessage{
Message: &v1.DaemonMessage_OsUpdateDiff{
OsUpdateDiff: &v1.OSUpdateDiff{
Error: &v1.DaemonError{
Expand All @@ -258,7 +249,7 @@ func (c *client) osUpdateDiff(ctx context.Context) {
})
return
} else {
err := c.stream.Send(&v1.DaemonMessage{
err := c.Send(&v1.DaemonMessage{
Message: &v1.DaemonMessage_OsUpdateDiff{
OsUpdateDiff: &v1.OSUpdateDiff{
Description: osUpdateDiff,
Expand All @@ -277,7 +268,7 @@ func (c *client) currentDaemonVersion() {
current, err := host.GetDaemonVersion(c.logger)
if err != nil {
c.logger.WithError(err).Error("failed to get current daemon version")
c.stream.Send(&v1.DaemonMessage{
c.Send(&v1.DaemonMessage{
Message: &v1.DaemonMessage_CurrentDaemonVersion{
CurrentDaemonVersion: &v1.CurrentDaemonVersion{
Error: &v1.DaemonError{
Expand All @@ -288,7 +279,7 @@ func (c *client) currentDaemonVersion() {
})
return
} else {
err := c.stream.Send(&v1.DaemonMessage{
err := c.Send(&v1.DaemonMessage{
Message: &v1.DaemonMessage_CurrentDaemonVersion{
CurrentDaemonVersion: current,
},
Expand Down
112 changes: 54 additions & 58 deletions services/platform/daemon/communicate/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,19 @@ func (c *client) uploadFile(_ context.Context, def *v1.UploadFileRequest) {
"file_id": info.FileId,
"file_path": info.FilePath,
})
// check if existing upload exists and ignore if so (only a single info message should be sent for a given file)
if _, ok := c.fileMetas[info.FileId]; ok {
log.Warn("info message recieved for already instantiated upload buffer")
return
}

// save metadata
c.fileMetas[info.FileId] = fileMeta{
id: info.FileId,
filePath: info.FilePath,
}

// make temporary chunk upload directory
err := os.MkdirAll(filepath.Join(host.ChunkPath(), info.FileId), 0777)
if err != nil {
log.WithError(err).Error("failed to create temp directory for file upload")
return
chunkPath := filepath.Join(host.ChunkPath(), info.FileId)

// create tmp chunk path if not exists
_, err := os.Stat(chunkPath)
if os.IsNotExist(err) {
// make temporary chunk upload directory
err := os.MkdirAll(filepath.Join(host.ChunkPath(), info.FileId), 0777)
if err != nil {
log.WithError(err).Error("failed to create temp directory for file upload")
cleanupFailedFileUpload(log, info.FileId)
return
}
}

// repond to server with ready
Expand All @@ -49,41 +45,38 @@ func (c *client) uploadFile(_ context.Context, def *v1.UploadFileRequest) {
},
})
if err != nil {
log.Error("failed to alert server with ready state for file upload")
log.WithError(err).Error("failed to alert server with ready state for file upload")
cleanupFailedFileUpload(log, info.FileId)
return
}
log.Info("completed file upload setup")
case *v1.UploadFileRequest_Chunk:
chunk := def.GetChunk()
log := c.logger.WithField("file_id", chunk.FileId).WithField("chunk_index", chunk.Index)
log := c.logger.WithFields(chassis.Fields{
"file_id": chunk.FileId,
"chunk_index": chunk.Index,
})
log.Debug("processing chunk")

chunkPath := filepath.Join(host.ChunkPath(), chunk.FileId)

// check if existing upload exists and ignore if not
meta, ok := c.fileMetas[chunk.FileId]
if !ok {
_, err := os.Stat(chunkPath)
if os.IsNotExist(err) {
log.Warn("chunk message received for uninitiated file upload")
return
}
log = log.WithFields(chassis.Fields{
"file_path": meta.filePath,
})

// write chunk as temp file
fileName := filepath.Join(host.ChunkPath(), meta.id, fmt.Sprintf("chunk.%d", chunk.Index))
err := os.WriteFile(fileName, chunk.Data, 0666)
fileName := filepath.Join(host.ChunkPath(), chunk.FileId, fmt.Sprintf("chunk.%d", chunk.Index))
err = os.WriteFile(fileName, chunk.Data, 0666)
if err != nil {
log.WithError(err).Error("failed to write uploaded file chunk")
cleanupFailedFileUpload(log, chunk.FileId)
return
}
log.Debug("wrote temp file")

// store chunk meta
c.chunkMetas.Store(fmt.Sprintf("%s.%d", chunk.FileId, chunk.Index), chunkMeta{
index: chunk.Index,
fileName: fileName,
})
log.Debug("saved chunk metadata")

// repond to server with chunk completion
err = c.Send(&v1.DaemonMessage{
Message: &v1.DaemonMessage_UploadFileChunkCompleted{
Expand All @@ -95,6 +88,7 @@ func (c *client) uploadFile(_ context.Context, def *v1.UploadFileRequest) {
})
if err != nil {
log.WithError(err).Error("failed to alert server with completed state for chunk upload")
cleanupFailedFileUpload(log, chunk.FileId)
return
}

Expand All @@ -105,63 +99,65 @@ func (c *client) uploadFile(_ context.Context, def *v1.UploadFileRequest) {
log := c.logger.WithField("file_id", done.FileId)

// check if existing upload exists and ignore if not
meta, ok := c.fileMetas[done.FileId]
if !ok {
log.Warn("done message received for uninitiated file upload")
chunkPath := filepath.Join(host.ChunkPath(), done.FileId)
_, err := os.Stat(chunkPath)
if os.IsNotExist(err) {
log.Warn("chunk message received for uninitiated file upload")
return
}
log = log.WithFields(chassis.Fields{
"file_path": meta.filePath,
})

err := c.reconstructFile(log, meta)
err = reconstructFile(log, done.FileId, done.FilePath)
if err != nil {
log.WithError(err).Error("failed to reconstruct file from chunks")
cleanupFailedFileUpload(log, done.FileId)
return
}

// delete file meta
delete(c.fileMetas, done.FileId)

log.Info("completed saving file")

default:
c.logger.Error("unknown UploadFileRequest type")
}
}

func cleanupFailedFileUpload(logger chassis.Logger, fileId string) {
// remove chunk directory
err := os.RemoveAll(filepath.Join(host.ChunkPath(), fileId))
if err != nil {
logger.WithError(err).Error("failed to remove chunk directory")
}
}

// reconstructFile reconstructs a file from its chunks using the provided metadata.
// It reads each chunk file, concatenates their content, and writes it to the output file.
func (c *client) reconstructFile(logger chassis.Logger, metadata fileMeta) error {
func reconstructFile(logger chassis.Logger, fileId string, filePath string) error {
filePath = host.FilePath(filePath)

// create parent directories if not exists
err := os.MkdirAll(filepath.Dir(metadata.filePath), 0777)
err := os.MkdirAll(filepath.Dir(filePath), 0777)
if err != nil {
return err
}

// create final file
outputFile, err := os.Create(metadata.filePath)
outputFile, err := os.Create(filePath)
if err != nil {
return err
}
defer outputFile.Close()

// extract (and sort) chunks from metadata
var chunks []chunkMeta
i := 0
for {
chunk, ok := c.chunkMetas.LoadAndDelete(fmt.Sprintf("%s.%d", metadata.id, i))
if !ok {
break
}
chunks = append(chunks, chunk.(chunkMeta))
i++
chunks, err := os.ReadDir(filepath.Join(host.ChunkPath(), fileId))
if err != nil {
logger.WithError(err).Error("failed to read chunk directory")
return err
}

// iterate through the sorted chunks and concatenate their content to build the final file
for _, chunk := range chunks {
chunkPath := filepath.Join(host.ChunkPath(), fileId, chunk.Name())

// open the chunk file
chunkFile, err := os.Open(chunk.fileName)
chunkFile, err := os.Open(chunkPath)
if err != nil {
return err
}
Expand All @@ -178,14 +174,14 @@ func (c *client) reconstructFile(logger chassis.Logger, metadata fileMeta) error
}

// remove the chunk file
err = os.Remove(chunk.fileName)
err = os.Remove(chunkPath)
if err != nil {
logger.WithError(err).Error("failed to remove chunk file")
}
}

// remove the chunk file directory
err = os.Remove(filepath.Join(host.ChunkPath(), metadata.id))
err = os.Remove(filepath.Join(host.ChunkPath(), fileId))
if err != nil {
logger.WithError(err).Error("failed to remove chunk directory")
}
Expand Down
4 changes: 2 additions & 2 deletions services/platform/daemon/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ go 1.22.5

require (
connectrpc.com/connect v1.16.2
github.com/home-cloud-io/core/api v0.4.25
github.com/home-cloud-io/core/api v0.4.26
github.com/mackerelio/go-osstat v0.2.5
github.com/spf13/viper v1.18.2
github.com/steady-bytes/draft/pkg/chassis v0.3.0
github.com/steady-bytes/draft/pkg/loggers v0.2.2
github.com/steady-bytes/draft/pkg/loggers v0.2.3
golang.org/x/mod v0.13.0
golang.org/x/net v0.25.0
golang.org/x/sync v0.7.0
Expand Down
8 changes: 4 additions & 4 deletions services/platform/daemon/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ github.com/hashicorp/raft-boltdb v0.0.0-20231115180007-027066e4d245 h1:Nyeelmxya
github.com/hashicorp/raft-boltdb v0.0.0-20231115180007-027066e4d245/go.mod h1:nTakvJ4XYq45UXtn0DbwR4aU9ZdjlnIenpbs6Cd+FM0=
github.com/hashicorp/raft-boltdb/v2 v2.3.0 h1:fPpQR1iGEVYjZ2OELvUHX600VAK5qmdnDEv3eXOwZUA=
github.com/hashicorp/raft-boltdb/v2 v2.3.0/go.mod h1:YHukhB04ChJsLHLJEUD6vjFyLX2L3dsX3wPBZcX4tmc=
github.com/home-cloud-io/core/api v0.4.25 h1:TVaX8SJho7u1CikLPZafFo0DDvcKGPPhjbbByKjbgzs=
github.com/home-cloud-io/core/api v0.4.25/go.mod h1:4G9DoubvZUuMFZOM63P6AOHyQ5Ulm9X2ilJ1EshIvjc=
github.com/home-cloud-io/core/api v0.4.26 h1:4D9wX57j3wmbXow+fwon4n0iTaLm6cQBxosQxXJ/Ens=
github.com/home-cloud-io/core/api v0.4.26/go.mod h1:4G9DoubvZUuMFZOM63P6AOHyQ5Ulm9X2ilJ1EshIvjc=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
Expand Down Expand Up @@ -157,8 +157,8 @@ github.com/steady-bytes/draft/api v0.3.1 h1:dv4EoLefhqfCFDEukjZlwqVl9GffLWLCQC7Z
github.com/steady-bytes/draft/api v0.3.1/go.mod h1:mlwxjvRiqvwySGdzVmF8voFhMffWq2F7dyd+xt6kENA=
github.com/steady-bytes/draft/pkg/chassis v0.3.0 h1:9uk+RRuSec2Sl7wy5ZSNv3IcqRqbU6vYiVSLj5Bo9co=
github.com/steady-bytes/draft/pkg/chassis v0.3.0/go.mod h1:5TQfgltb/008bcuLe6vAQva0Kzrq1zEDCFoCfQjzzLo=
github.com/steady-bytes/draft/pkg/loggers v0.2.2 h1:pVTfd/ES6XGLXg8WJ1xGe3YIdQJLJ6sJ6PHxVZcc4SU=
github.com/steady-bytes/draft/pkg/loggers v0.2.2/go.mod h1:nXeOQ6lXhsVWHzRqVcJz0JIeSW75ORVN+0izJwwnH+Y=
github.com/steady-bytes/draft/pkg/loggers v0.2.3 h1:tZadHH8f9fo+tRHVp3BaJlVYvKlrlX8Hd6LxncUVgAM=
github.com/steady-bytes/draft/pkg/loggers v0.2.3/go.mod h1:nXeOQ6lXhsVWHzRqVcJz0JIeSW75ORVN+0izJwwnH+Y=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand Down
21 changes: 15 additions & 6 deletions services/platform/server/async/broadcaster.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package async

import (
"fmt"
"reflect"
"sync"
)

Expand All @@ -19,7 +21,7 @@ type (
Deregister(id string)
}
broadcaster struct {
mu sync.Mutex
mu sync.RWMutex
subscribers map[string]chan any
}
)
Expand All @@ -43,9 +45,16 @@ func (b *broadcaster) Deregister(id string) {
}

func (b *broadcaster) Send(m any) {
for _, c := range b.subscribers {
go func(c chan any) {
c <- m
}(c)
}
go func() {
b.mu.RLock()
defer b.mu.RUnlock()
for _, c := range b.subscribers {
select {
case c <- m: // attempt to send message and drop if not able
default:
// TODO: convert to logger?
fmt.Printf("dropping message: %s\n", reflect.TypeOf(m).String())
}
}
}()
}
Loading

0 comments on commit 5551501

Please sign in to comment.