diff --git a/go.sum b/go.sum index 77e490d..77c9cd4 100644 --- a/go.sum +++ b/go.sum @@ -96,8 +96,6 @@ github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU= github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= -github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= -github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-jose/go-jose/v4 v4.0.2 h1:R3l3kkBds16bO7ZFAEEcofK0MkrAJt3jlJznWZG0nvk= github.com/go-jose/go-jose/v4 v4.0.2/go.mod h1:WVf9LFMHh/QVrmqrOfqun0C45tMe3RoiKJMPvgWwLfY= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= diff --git a/internal/s3upload/httpuploader.go b/internal/s3upload/httpuploader.go index cc7a646..f627f0c 100644 --- a/internal/s3upload/httpuploader.go +++ b/internal/s3upload/httpuploader.go @@ -7,7 +7,6 @@ import ( "encoding/base64" "fmt" "io" - "log/slog" "math" "net/http" "os" @@ -62,7 +61,7 @@ func GetPresignedUrlServer(endpoint string) *ClientWithResponses { // is initiated func getPresignedUrls(object_name string, part int, endpoint string) (string, []string, error) { - r, err := GetPresignedUrlServer(endpoint).GetPresignedUrlsWithResponse(context.Background(), PresignedUrlBody{ + response, err := GetPresignedUrlServer(endpoint).GetPresignedUrlsWithResponse(context.Background(), PresignedUrlBody{ ObjectName: object_name, Parts: part, }) @@ -70,15 +69,23 @@ func getPresignedUrls(object_name string, part int, endpoint string) (string, [] if err != nil { return "", []string{}, err } - if r.StatusCode() != http.StatusOK { - return "", []string{}, fmt.Errorf(r.Status()) + + if response.StatusCode() == http.StatusInternalServerError { + return "", []string{}, fmt.Errorf("%s: %s", response.JSON500.Message, *response.JSON500.Details) } + if response.StatusCode() == http.StatusUnprocessableEntity { + err_string := "" + for _, d := range *response.JSON422.Detail { + err_string += " " + d.Msg + } - return r.JSON200.UploadID, r.JSON200.Urls, err + return "", []string{}, fmt.Errorf("%s", err_string) + } + return response.JSON200.UploadID, response.JSON200.Urls, err } func completeMultiPartUpload(object_name string, uploadID string, endpoint string, parts []CompletePart, full_file_checksum string) error { - r, err := GetPresignedUrlServer(endpoint).CompleteUploadWithResponse(context.Background(), CompleteUploadBody{ + response, err := GetPresignedUrlServer(endpoint).CompleteUploadWithResponse(context.Background(), CompleteUploadBody{ ObjectName: object_name, UploadID: uploadID, Parts: parts, @@ -88,14 +95,22 @@ func completeMultiPartUpload(object_name string, uploadID string, endpoint strin if err != nil { return err } - if r.StatusCode() != http.StatusOK { - return fmt.Errorf("") + + if response.StatusCode() == http.StatusInternalServerError { + return fmt.Errorf("%s: %s", response.JSON500.Message, *response.JSON500.Details) + } + if response.StatusCode() == http.StatusUnprocessableEntity { + err_string := "" + for _, d := range *response.JSON422.Detail { + err_string += " " + d.Msg + } + + return fmt.Errorf("%s", err_string) } return nil } func uploadFile(ctx context.Context, filePath string, objectName string, options task.S3TransferConfig, notifier *TransferNotifier) error { - // Open the file file, err := os.Open(filePath) if err != nil { return fmt.Errorf("error opening file: %w", err) @@ -103,15 +118,12 @@ func uploadFile(ctx context.Context, filePath string, objectName string, options defer file.Close() - // Get the file size fileInfo, err := file.Stat() if err != nil { return fmt.Errorf("error getting file info: %w", err) } totalSize := fileInfo.Size() - fmt.Printf("Uploading file: %s (%d bytes)\n", filePath, totalSize) - httpClient := GetHttpUploader(options.PoolSize) if totalSize < options.ChunkSizeMB*MiB { @@ -127,6 +139,7 @@ func uploadFile(ctx context.Context, filePath string, objectName string, options if err_abort != nil { return fmt.Errorf("while aborting a multipart upload an error occured: %s. Previous error: %s", err_abort.Error(), err_upload.Error()) } + return err_upload } return err } @@ -144,7 +157,6 @@ func abortMultipartUpload(uploadID string, objectName string, endpoint string) e return fmt.Errorf("") } return nil - } func doUploadSingleFile(ctx context.Context, objectName string, file *os.File, httpClient *HttpUploader, endpoint string, notifier *TransferNotifier) error { @@ -204,7 +216,6 @@ func doUploadMultipart(ctx context.Context, totalSize int64, objectName string, parts[partNumber] = CompletePart{ETag: etag, PartNumber: partNumber + 1, ChecksumSHA256: base64hash} - fmt.Printf("Uploaded part %d\n", partNumber+1) return nil }) } @@ -218,14 +229,12 @@ func doUploadMultipart(ctx context.Context, totalSize int64, objectName string, c := strings.Join(partChecksums, "") n := sha256.Sum256([]byte(c)) base64hash := base64.StdEncoding.EncodeToString(n[:]) - slog.Info("Calculated file digest", "file", file.Name(), "sha256", base64hash) err = completeMultiPartUpload(objectName, uploadID, options.Endpoint, parts, base64hash) if err != nil { - return uploadID, fmt.Errorf("error completing multipart upload: %w", err) + return uploadID, fmt.Errorf("error completing multipart upload: %s", err.Error()) } - fmt.Println("Multipart upload completed successfully.") return uploadID, nil } diff --git a/internal/s3upload/s3upload.go b/internal/s3upload/s3upload.go index 5b3ae69..c22dca6 100644 --- a/internal/s3upload/s3upload.go +++ b/internal/s3upload/s3upload.go @@ -61,7 +61,7 @@ func UploadS3(ctx context.Context, datasetPID string, datasetSourceFolder string } func uploadFiles(ctx context.Context, s3Objects *S3Objects, options task.S3TransferConfig, transferNotifier *TransferNotifier, uploadId uuid.UUID) error { - errorGroup, ctx := errgroup.WithContext(ctx) + errorGroup, context := errgroup.WithContext(ctx) objectsChannel := make(chan int, len(s3Objects.Files)) nWorkers := max(options.ConcurrentFiles, len(s3Objects.Files)) @@ -71,11 +71,11 @@ func uploadFiles(ctx context.Context, s3Objects *S3Objects, options task.S3Trans func() error { for idx := range objectsChannel { select { - case <-ctx.Done(): + case <-context.Done(): transferNotifier.notifier.OnTaskCanceled(uploadId) - return ctx.Err() + return context.Err() default: - err := uploadFile(ctx, s3Objects.Files[idx], s3Objects.ObjectNames[idx], options, transferNotifier) + err := uploadFile(context, s3Objects.Files[idx], s3Objects.ObjectNames[idx], options, transferNotifier) if err != nil { return err }