Skip to content

Commit

Permalink
fix(s3): improve error handling and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
phwissmann committed Dec 19, 2024
1 parent 647da44 commit 3110222
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 23 deletions.
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU=
github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A=
github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-jose/go-jose/v4 v4.0.2 h1:R3l3kkBds16bO7ZFAEEcofK0MkrAJt3jlJznWZG0nvk=
github.com/go-jose/go-jose/v4 v4.0.2/go.mod h1:WVf9LFMHh/QVrmqrOfqun0C45tMe3RoiKJMPvgWwLfY=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
Expand Down
43 changes: 26 additions & 17 deletions internal/s3upload/httpuploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"encoding/base64"
"fmt"
"io"
"log/slog"
"math"
"net/http"
"os"
Expand Down Expand Up @@ -62,23 +61,31 @@ func GetPresignedUrlServer(endpoint string) *ClientWithResponses {
// is initiated
func getPresignedUrls(object_name string, part int, endpoint string) (string, []string, error) {

r, err := GetPresignedUrlServer(endpoint).GetPresignedUrlsWithResponse(context.Background(), PresignedUrlBody{
response, err := GetPresignedUrlServer(endpoint).GetPresignedUrlsWithResponse(context.Background(), PresignedUrlBody{
ObjectName: object_name,
Parts: part,
})

if err != nil {
return "", []string{}, err
}
if r.StatusCode() != http.StatusOK {
return "", []string{}, fmt.Errorf(r.Status())

if response.StatusCode() == http.StatusInternalServerError {
return "", []string{}, fmt.Errorf("%s: %s", response.JSON500.Message, *response.JSON500.Details)
}
if response.StatusCode() == http.StatusUnprocessableEntity {
err_string := ""
for _, d := range *response.JSON422.Detail {
err_string += " " + d.Msg
}

return r.JSON200.UploadID, r.JSON200.Urls, err
return "", []string{}, fmt.Errorf("%s", err_string)
}
return response.JSON200.UploadID, response.JSON200.Urls, err
}

func completeMultiPartUpload(object_name string, uploadID string, endpoint string, parts []CompletePart, full_file_checksum string) error {
r, err := GetPresignedUrlServer(endpoint).CompleteUploadWithResponse(context.Background(), CompleteUploadBody{
response, err := GetPresignedUrlServer(endpoint).CompleteUploadWithResponse(context.Background(), CompleteUploadBody{
ObjectName: object_name,
UploadID: uploadID,
Parts: parts,
Expand All @@ -88,30 +95,35 @@ func completeMultiPartUpload(object_name string, uploadID string, endpoint strin
if err != nil {
return err
}
if r.StatusCode() != http.StatusOK {
return fmt.Errorf("")

if response.StatusCode() == http.StatusInternalServerError {
return fmt.Errorf("%s: %s", response.JSON500.Message, *response.JSON500.Details)
}
if response.StatusCode() == http.StatusUnprocessableEntity {
err_string := ""
for _, d := range *response.JSON422.Detail {
err_string += " " + d.Msg
}

return fmt.Errorf("%s", err_string)
}
return nil
}

func uploadFile(ctx context.Context, filePath string, objectName string, options task.S3TransferConfig, notifier *TransferNotifier) error {
// Open the file
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("error opening file: %w", err)
}

defer file.Close()

// Get the file size
fileInfo, err := file.Stat()
if err != nil {
return fmt.Errorf("error getting file info: %w", err)
}

totalSize := fileInfo.Size()
fmt.Printf("Uploading file: %s (%d bytes)\n", filePath, totalSize)

httpClient := GetHttpUploader(options.PoolSize)

if totalSize < options.ChunkSizeMB*MiB {
Expand All @@ -127,6 +139,7 @@ func uploadFile(ctx context.Context, filePath string, objectName string, options
if err_abort != nil {
return fmt.Errorf("while aborting a multipart upload an error occured: %s. Previous error: %s", err_abort.Error(), err_upload.Error())
}
return err_upload
}
return err
}
Expand All @@ -144,7 +157,6 @@ func abortMultipartUpload(uploadID string, objectName string, endpoint string) e
return fmt.Errorf("")
}
return nil

}

func doUploadSingleFile(ctx context.Context, objectName string, file *os.File, httpClient *HttpUploader, endpoint string, notifier *TransferNotifier) error {
Expand Down Expand Up @@ -204,7 +216,6 @@ func doUploadMultipart(ctx context.Context, totalSize int64, objectName string,

parts[partNumber] = CompletePart{ETag: etag, PartNumber: partNumber + 1, ChecksumSHA256: base64hash}

fmt.Printf("Uploaded part %d\n", partNumber+1)
return nil
})
}
Expand All @@ -218,14 +229,12 @@ func doUploadMultipart(ctx context.Context, totalSize int64, objectName string,
c := strings.Join(partChecksums, "")
n := sha256.Sum256([]byte(c))
base64hash := base64.StdEncoding.EncodeToString(n[:])
slog.Info("Calculated file digest", "file", file.Name(), "sha256", base64hash)

err = completeMultiPartUpload(objectName, uploadID, options.Endpoint, parts, base64hash)
if err != nil {
return uploadID, fmt.Errorf("error completing multipart upload: %w", err)
return uploadID, fmt.Errorf("error completing multipart upload: %s", err.Error())
}

fmt.Println("Multipart upload completed successfully.")
return uploadID, nil
}

Expand Down
8 changes: 4 additions & 4 deletions internal/s3upload/s3upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func UploadS3(ctx context.Context, datasetPID string, datasetSourceFolder string
}

func uploadFiles(ctx context.Context, s3Objects *S3Objects, options task.S3TransferConfig, transferNotifier *TransferNotifier, uploadId uuid.UUID) error {
errorGroup, ctx := errgroup.WithContext(ctx)
errorGroup, context := errgroup.WithContext(ctx)
objectsChannel := make(chan int, len(s3Objects.Files))

nWorkers := max(options.ConcurrentFiles, len(s3Objects.Files))
Expand All @@ -71,11 +71,11 @@ func uploadFiles(ctx context.Context, s3Objects *S3Objects, options task.S3Trans
func() error {
for idx := range objectsChannel {
select {
case <-ctx.Done():
case <-context.Done():
transferNotifier.notifier.OnTaskCanceled(uploadId)
return ctx.Err()
return context.Err()
default:
err := uploadFile(ctx, s3Objects.Files[idx], s3Objects.ObjectNames[idx], options, transferNotifier)
err := uploadFile(context, s3Objects.Files[idx], s3Objects.ObjectNames[idx], options, transferNotifier)
if err != nil {
return err
}
Expand Down

0 comments on commit 3110222

Please sign in to comment.