From 3cea121c58c7ea95f27283fef2ca9373502d738b Mon Sep 17 00:00:00 2001 From: Yahav Itzhak Date: Thu, 22 Feb 2024 15:53:41 +0200 Subject: [PATCH] Artifactory Multi-Part Upload support (#900) --- .github/workflows/tests.yml | 4 +- README.md | 24 +- artifactory/manager.go | 2 + artifactory/services/download.go | 16 +- artifactory/services/fspatterns/utils.go | 8 +- artifactory/services/upload.go | 129 ++++-- artifactory/services/utils/multipartupload.go | 436 ++++++++++++++++++ .../services/utils/multipartupload_test.go | 367 +++++++++++++++ artifactory/services/utils/storageutils.go | 7 + go.mod | 2 +- go.sum | 4 +- http/httpclient/client.go | 5 +- tests/artifactorymultipartupload_test.go | 89 ++++ tests/jfrogclient_test.go | 2 +- tests/utils_test.go | 9 +- utils/io/fileutils/files.go | 4 +- utils/io/progress.go | 6 +- utils/retryexecutor.go | 2 +- 18 files changed, 1028 insertions(+), 88 deletions(-) create mode 100644 artifactory/services/utils/multipartupload.go create mode 100644 artifactory/services/utils/multipartupload_test.go create mode 100644 tests/artifactorymultipartupload_test.go diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index d2f95187e..dc9fb5f2d 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -83,13 +83,13 @@ jobs: - name: artifactory tests run: go test -v github.com/jfrog/jfrog-client-go/tests --timeout 0 --test.${{ matrix.suite }} --ci.runId=${{ runner.os }}-${{ matrix.suite }} - JFrog-Client-Go-Ds-Xr-Tests: + JFrog-Client-Go-Ds-Xr-MPU-Tests: needs: Pretest name: ${{ matrix.suite }} ${{ matrix.os }} strategy: fail-fast: false matrix: - suite: [ distribution, xray ] + suite: [ distribution, xray, mpu ] os: [ ubuntu, windows, macos ] runs-on: ${{ matrix.os }}-latest steps: diff --git a/README.md b/README.md index 4e761e7e3..feac593cc 100644 --- a/README.md +++ b/README.md @@ -254,14 +254,15 @@ content of this repository is deleted. #### Test Types -| Type | Description | Prerequisites | -| -------------------- | ------------------ | ----------------------------- | -| `-test.artifactory` | Artifactory tests | Artifactory Pro | -| `-test.distribution` | Distribution tests | Artifactory with Distribution | -| `-test.xray` | Xray tests | Artifactory with Xray | -| `-test.pipelines` | Pipelines tests | JFrog Pipelines | -| `-test.access` | Access tests | Artifactory Pro | -| `-test.repositories` | Access tests | Artifactory Pro | +| Type | Description | Prerequisites | +| -------------------- | ---------------------- | ------------------------------- | +| `-test.artifactory` | Artifactory tests | Artifactory Pro | +| `-test.distribution` | Distribution tests | Artifactory with Distribution | +| `-test.xray` | Xray tests | Artifactory with Xray | +| `-test.pipelines` | Pipelines tests | JFrog Pipelines | +| `-test.access` | Access tests | Artifactory Pro | +| `-test.repositories` | Repositories tests | Artifactory Pro | +| `-test.mpu` | Multipart upload tests | Artifactory Pro with S3 storage | #### Connection Details @@ -400,6 +401,12 @@ params.Symlink = false params.Exclusions = "(.*)a.zip" // Retries default value: 3 params.Retries = 5 +// The maximum number of parts that can be concurrently uploaded per file during a multi-part upload. Set to 0 to disable multi-part upload. +// SplitCount default value: 5 +params.SplitCount = 10 +// The minimum file size in MiB required to attempt a multi-part upload. +// MinSplitSize default value: 200 +params.MinSplitSize = 100 // The min file size in bytes for "checksum deploy". // "Checksum deploy" is the action of calculating the file checksum locally, before // the upload, and skipping the actual file transfer if the file already @@ -2230,7 +2237,6 @@ xscVersion, err := scanService.IsXscEnabled() multiScanId, err := scanService.SendScanGitInfoContext(details) ``` - ## Pipelines APIs ### Creating Pipelines Service Manager diff --git a/artifactory/manager.go b/artifactory/manager.go index 8827abd28..9d9e627c3 100644 --- a/artifactory/manager.go +++ b/artifactory/manager.go @@ -321,6 +321,8 @@ func (sm *ArtifactoryServicesManagerImp) initUploadService() *services.UploadSer uploadService.ArtDetails = sm.config.GetServiceDetails() uploadService.DryRun = sm.config.IsDryRun() uploadService.Progress = sm.progress + httpClientDetails := uploadService.ArtDetails.CreateHttpClientDetails() + uploadService.MultipartUpload = utils.NewMultipartUpload(sm.client, &httpClientDetails, uploadService.ArtDetails.GetUrl()) return uploadService } diff --git a/artifactory/services/download.go b/artifactory/services/download.go index 8445e76ec..61bf3242c 100644 --- a/artifactory/services/download.go +++ b/artifactory/services/download.go @@ -440,19 +440,11 @@ func createLocalSymlink(localPath, localFileName, symlinkArtifact string, symlin if !fileutils.IsPathExists(symlinkArtifact, false) { return errorutils.CheckErrorf("symlink validation failed, target doesn't exist: " + symlinkArtifact) } - file, err := os.Open(symlinkArtifact) - if err = errorutils.CheckError(err); err != nil { - return err - } - defer func() { - err = errors.Join(err, errorutils.CheckError(file.Close())) - }() - checksumInfo, err := biutils.CalcChecksums(file, biutils.SHA1) - if err = errorutils.CheckError(err); err != nil { - return err + var checksums map[biutils.Algorithm]string + if checksums, err = biutils.GetFileChecksums(symlinkArtifact, biutils.SHA1); err != nil { + return errorutils.CheckError(err) } - sha1 := checksumInfo[biutils.SHA1] - if sha1 != symlinkContentChecksum { + if checksums[biutils.SHA1] != symlinkContentChecksum { return errorutils.CheckErrorf("symlink validation failed for target: " + symlinkArtifact) } } diff --git a/artifactory/services/fspatterns/utils.go b/artifactory/services/fspatterns/utils.go index 5cfc3d578..01427d4d0 100644 --- a/artifactory/services/fspatterns/utils.go +++ b/artifactory/services/fspatterns/utils.go @@ -178,15 +178,15 @@ func GetRootPath(pattern, target, archiveTarget string, patternType utils.Patter // When handling symlink we want to simulate the creation of empty file func CreateSymlinkFileDetails() (*fileutils.FileDetails, error) { - checksumInfo, err := biutils.CalcChecksums(bytes.NewBuffer([]byte(fileutils.SymlinkFileContent))) + checksums, err := biutils.CalcChecksums(bytes.NewBuffer([]byte(fileutils.SymlinkFileContent))) if err != nil { return nil, errorutils.CheckError(err) } details := new(fileutils.FileDetails) - details.Checksum.Md5 = checksumInfo[biutils.MD5] - details.Checksum.Sha1 = checksumInfo[biutils.SHA1] - details.Checksum.Sha256 = checksumInfo[biutils.SHA256] + details.Checksum.Md5 = checksums[biutils.MD5] + details.Checksum.Sha1 = checksums[biutils.SHA1] + details.Checksum.Sha256 = checksums[biutils.SHA256] details.Size = int64(0) return details, nil } diff --git a/artifactory/services/upload.go b/artifactory/services/upload.go index 6ac6a81c7..0fbd9a077 100644 --- a/artifactory/services/upload.go +++ b/artifactory/services/upload.go @@ -30,14 +30,24 @@ import ( "github.com/jfrog/jfrog-client-go/utils/log" ) +const ( + // 10 KiB + DefaultMinChecksumDeploy = utils.SizeKib * 10 + // The default minimum file size for attempting multi-part upload + defaultUploadMinSplit = utils.SizeMiB * 200 + // The default maximum number of parts that can be concurrently uploaded per file during a multi-part upload + defaultUploadSplitCount = 5 +) + type UploadService struct { - client *jfroghttpclient.JfrogHttpClient - Progress ioutils.ProgressMgr - ArtDetails auth.ServiceDetails - DryRun bool - Threads int - saveSummary bool - resultsManager *resultsManager + client *jfroghttpclient.JfrogHttpClient + Progress ioutils.ProgressMgr + ArtDetails auth.ServiceDetails + MultipartUpload *utils.MultipartUpload + DryRun bool + Threads int + saveSummary bool + resultsManager *resultsManager } const JfrogCliUploadEmptyArchiveEnv = "JFROG_CLI_UPLOAD_EMPTY_ARCHIVE" @@ -179,19 +189,12 @@ func createProperties(artifact clientutils.Artifact, uploadParams UploadParams) } // If Symlink target exists -> get SHA1 if isn't a directory } else if !fileInfo.IsDir() { - file, err := os.Open(artifact.LocalPath) - if err != nil { - return nil, errorutils.CheckError(err) - } - defer func() { - err = errors.Join(err, errorutils.CheckError(file.Close())) - }() - checksumInfo, err := biutils.CalcChecksums(file, biutils.SHA1) + var checksums map[biutils.Algorithm]string + checksums, err := biutils.GetFileChecksums(artifact.LocalPath, biutils.SHA1) if err != nil { return nil, errorutils.CheckError(err) } - sha1 := checksumInfo[biutils.SHA1] - artifactProps.AddProperty(utils.SymlinkSha1, sha1) + artifactProps.AddProperty(utils.SymlinkSha1, checksums[biutils.SHA1]) } artifactProps.AddProperty(utils.ArtifactorySymlink, artifactSymlink) } @@ -486,7 +489,7 @@ func (us *UploadService) uploadFile(artifact UploadData, uploadParams UploadPara if uploadParams.IsSymlink() && fileutils.IsFileSymlink(fileInfo) { resp, details, body, err = us.uploadSymlink(targetPathWithProps, logMsgPrefix, httpClientsDetails, uploadParams) } else { - resp, details, body, checksumDeployed, err = us.doUpload(artifact.Artifact.LocalPath, targetPathWithProps, logMsgPrefix, httpClientsDetails, fileInfo, uploadParams) + resp, details, body, checksumDeployed, err = us.doUpload(artifact, targetPathWithProps, logMsgPrefix, httpClientsDetails, fileInfo, uploadParams) } if err != nil { return nil, false, err @@ -503,6 +506,21 @@ func (us *UploadService) shouldTryChecksumDeploy(fileSize int64, uploadParams Up return uploadParams.ChecksumsCalcEnabled && fileSize >= uploadParams.MinChecksumDeploy && !uploadParams.IsExplodeArchive() } +func (us *UploadService) shouldDoMultipartUpload(fileSize int64, uploadParams UploadParams) (bool, error) { + if uploadParams.SplitCount == 0 || fileSize < uploadParams.MinSplitSize { + return false, nil + } + if fileSize > utils.MaxMultipartUploadFileSize { + log.Debug(fmt.Sprintf("Max file size for multipart upload exceeded: %d>%d", fileSize, utils.MaxMultipartUploadFileSize)) + return false, nil + } + if uploadParams.IsExplodeArchive() { + // Explode archives is not supported in multipart uploads + return false, nil + } + return us.MultipartUpload.IsSupported(us.ArtDetails) +} + // Reads a file from a Reader that is given from a function (getReaderFunc) and uploads it to the specified target path. // getReaderFunc is called only if checksum deploy was successful. // Returns true if the file was successfully uploaded. @@ -514,7 +532,7 @@ func (us *UploadService) uploadFileFromReader(getReaderFunc func() (io.Reader, e httpClientsDetails := us.ArtDetails.CreateHttpClientDetails() if !us.DryRun { if us.shouldTryChecksumDeploy(details.Size, uploadParams) { - resp, body, e = us.tryChecksumDeploy(details, targetUrlWithProps, httpClientsDetails, us.client) + resp, body, e = us.doChecksumDeploy(details, targetUrlWithProps, httpClientsDetails, us.client) if e != nil { return false, e } @@ -570,37 +588,50 @@ func (us *UploadService) uploadSymlink(targetPath, logMsgPrefix string, httpClie return } -func (us *UploadService) doUpload(localPath, targetUrlWithProps, logMsgPrefix string, httpClientsDetails httputils.HttpClientDetails, fileInfo os.FileInfo, uploadParams UploadParams) (*http.Response, *fileutils.FileDetails, []byte, bool, error) { - var details *fileutils.FileDetails - var checksumDeployed bool - var resp *http.Response - var body []byte - var err error - addExplodeHeader(&httpClientsDetails, uploadParams.IsExplodeArchive()) - if !us.DryRun { - if us.shouldTryChecksumDeploy(fileInfo.Size(), uploadParams) { - details, err = fileutils.GetFileDetails(localPath, uploadParams.ChecksumsCalcEnabled) - if err != nil { - return resp, details, body, checksumDeployed, err - } - resp, body, err = us.tryChecksumDeploy(details, targetUrlWithProps, httpClientsDetails, us.client) - if err != nil { - return resp, details, body, checksumDeployed, err - } - checksumDeployed = isSuccessfulUploadStatusCode(resp.StatusCode) +func (us *UploadService) doUpload(artifact UploadData, targetUrlWithProps, logMsgPrefix string, httpClientsDetails httputils.HttpClientDetails, fileInfo os.FileInfo, uploadParams UploadParams) ( + resp *http.Response, details *fileutils.FileDetails, body []byte, checksumDeployed bool, err error) { + // Get local file details + details, err = fileutils.GetFileDetails(artifact.Artifact.LocalPath, uploadParams.ChecksumsCalcEnabled) + if err != nil { + return + } + + // Return if dry run + if us.DryRun { + return + } + + // Try checksum deploy + if us.shouldTryChecksumDeploy(fileInfo.Size(), uploadParams) { + resp, body, err = us.doChecksumDeploy(details, targetUrlWithProps, httpClientsDetails, us.client) + if err != nil { + return resp, details, body, checksumDeployed, err } - if !checksumDeployed { - resp, body, err = utils.UploadFile(localPath, targetUrlWithProps, logMsgPrefix, &us.ArtDetails, details, - httpClientsDetails, us.client, uploadParams.ChecksumsCalcEnabled, us.Progress) - if err != nil { - return resp, details, body, checksumDeployed, err - } + if isSuccessfulUploadStatusCode(resp.StatusCode) { + checksumDeployed = true + return } } - if details == nil { - details, err = fileutils.GetFileDetails(localPath, uploadParams.ChecksumsCalcEnabled) + + // Try multipart upload + var shouldTryMultipart bool + if shouldTryMultipart, err = us.shouldDoMultipartUpload(fileInfo.Size(), uploadParams); err != nil { + return + } + if shouldTryMultipart { + if err = us.MultipartUpload.UploadFileConcurrently(artifact.Artifact.LocalPath, artifact.Artifact.TargetPath, fileInfo.Size(), details.Checksum.Sha1, us.Progress, uploadParams.SplitCount); err != nil { + return + } + // Once the file is uploaded to the storage, we finalize the multipart upload by performing a checksum deployment to save the file in Artifactory. + resp, body, err = us.doChecksumDeploy(details, targetUrlWithProps, httpClientsDetails, us.client) + return } - return resp, details, body, checksumDeployed, err + + // Do regular upload + addExplodeHeader(&httpClientsDetails, uploadParams.IsExplodeArchive()) + resp, body, err = utils.UploadFile(artifact.Artifact.LocalPath, targetUrlWithProps, logMsgPrefix, &us.ArtDetails, details, + httpClientsDetails, us.client, uploadParams.ChecksumsCalcEnabled, us.Progress) + return } func (us *UploadService) doUploadFromReader(fileReader io.Reader, targetUrlWithProps string, httpClientsDetails httputils.HttpClientDetails, uploadParams UploadParams, details *fileutils.FileDetails) (*http.Response, *fileutils.FileDetails, []byte, error) { @@ -643,7 +674,7 @@ func addExplodeHeader(httpClientsDetails *httputils.HttpClientDetails, isExplode } } -func (us *UploadService) tryChecksumDeploy(details *fileutils.FileDetails, targetPath string, httpClientsDetails httputils.HttpClientDetails, +func (us *UploadService) doChecksumDeploy(details *fileutils.FileDetails, targetPath string, httpClientsDetails httputils.HttpClientDetails, client *jfroghttpclient.JfrogHttpClient) (resp *http.Response, body []byte, err error) { requestClientDetails := httpClientsDetails.Clone() utils.AddHeader("X-Checksum-Deploy", "true", &requestClientDetails.Headers) @@ -676,6 +707,8 @@ type UploadParams struct { Flat bool AddVcsProps bool MinChecksumDeploy int64 + MinSplitSize int64 + SplitCount int ChecksumsCalcEnabled bool Archive string // When using the 'archive' option for upload, we can control the target path inside the uploaded archive using placeholders. This operation determines the TargetPathInArchive value. @@ -683,7 +716,7 @@ type UploadParams struct { } func NewUploadParams() UploadParams { - return UploadParams{CommonParams: &utils.CommonParams{}, MinChecksumDeploy: 10240, ChecksumsCalcEnabled: true} + return UploadParams{CommonParams: &utils.CommonParams{}, MinChecksumDeploy: DefaultMinChecksumDeploy, ChecksumsCalcEnabled: true, MinSplitSize: defaultUploadMinSplit, SplitCount: defaultUploadSplitCount} } func DeepCopyUploadParams(params *UploadParams) UploadParams { diff --git a/artifactory/services/utils/multipartupload.go b/artifactory/services/utils/multipartupload.go new file mode 100644 index 000000000..5ac06b3e3 --- /dev/null +++ b/artifactory/services/utils/multipartupload.go @@ -0,0 +1,436 @@ +package utils + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "os" + "strings" + "sync" + "sync/atomic" + "time" + + biUtils "github.com/jfrog/build-info-go/utils" + "github.com/jfrog/gofrog/parallel" + "github.com/jfrog/jfrog-client-go/auth" + "github.com/jfrog/jfrog-client-go/http/jfroghttpclient" + "github.com/jfrog/jfrog-client-go/utils" + "github.com/jfrog/jfrog-client-go/utils/errorutils" + ioutils "github.com/jfrog/jfrog-client-go/utils/io" + "github.com/jfrog/jfrog-client-go/utils/io/httputils" + "github.com/jfrog/jfrog-client-go/utils/log" +) + +type supportedStatus int +type completionStatus string + +const ( + // TODO - Update version + minArtifactoryVersion = "8.0.0" + + // Supported status + // Multipart upload support is not yet determined + undetermined supportedStatus = iota + // Multipart upload is supported + multipartSupported + // Multipart upload is not supported + multipartNotSupported + + // Completion status + parts completionStatus = "PARTS" + queued completionStatus = "QUEUED" + processing completionStatus = "PROCESSING" + finished completionStatus = "FINISHED" + retryableError completionStatus = "RETRYABLE_ERROR" + nonRetryableError completionStatus = "NON_RETRYABLE_ERROR" + aborted completionStatus = "ABORTED" + + // API constants + uploadsApi = "/api/v1/uploads/" + artifactoryNodeIdHeader = "X-Artifactory-Node-Id" + + // Sizes and limits constants + MaxMultipartUploadFileSize = SizeTiB * 5 + uploadPartSize int64 = SizeMiB * 20 + + // Retries and polling constants + retriesInterval = time.Second * 5 + // A week of retries + maxPollingRetries = time.Hour * 168 / retriesInterval + mergingLoggingInterval = time.Minute +) + +var ( + errTooManyAttempts = errors.New("too many upload attempts failed") + supportedMutex sync.Mutex +) + +type MultipartUpload struct { + client *jfroghttpclient.JfrogHttpClient + httpClientsDetails *httputils.HttpClientDetails + artifactoryUrl string + supportedStatus supportedStatus +} + +func NewMultipartUpload(client *jfroghttpclient.JfrogHttpClient, httpClientsDetails *httputils.HttpClientDetails, artifactoryUrl string) *MultipartUpload { + return &MultipartUpload{client, httpClientsDetails, strings.TrimSuffix(artifactoryUrl, "/"), undetermined} +} + +func (mu *MultipartUpload) IsSupported(serviceDetails auth.ServiceDetails) (supported bool, err error) { + supportedMutex.Lock() + defer supportedMutex.Unlock() + if mu.supportedStatus != undetermined { + // If the supported status was determined earlier, return true if multipart upload is supported or false if not + return mu.supportedStatus == multipartSupported, nil + } + + artifactoryVersion, err := serviceDetails.GetVersion() + if err != nil { + return + } + + if versionErr := utils.ValidateMinimumVersion(utils.Artifactory, artifactoryVersion, minArtifactoryVersion); versionErr != nil { + log.Debug("Multipart upload is not supported in versions below " + minArtifactoryVersion + ". Proceeding with regular upload...") + mu.supportedStatus = multipartNotSupported + return + } + + url := fmt.Sprintf("%s%sconfig", mu.artifactoryUrl, uploadsApi) + resp, body, _, err := mu.client.SendGet(url, true, mu.httpClientsDetails) + if err != nil { + return + } + log.Debug("Artifactory response:", string(body), resp.Status) + if err = errorutils.CheckResponseStatusWithBody(resp, body, http.StatusOK); err != nil { + return + } + + var getConfigResponse getConfigResponse + err = errorutils.CheckError(json.Unmarshal(body, &getConfigResponse)) + if getConfigResponse.Supported { + mu.supportedStatus = multipartSupported + } else { + mu.supportedStatus = multipartNotSupported + } + return getConfigResponse.Supported, err +} + +type getConfigResponse struct { + Supported bool `json:"supported,omitempty"` +} + +func (mu *MultipartUpload) UploadFileConcurrently(localPath, targetPath string, fileSize int64, sha1 string, progress ioutils.ProgressMgr, splitCount int) (err error) { + repoAndPath := strings.SplitN(targetPath, "/", 2) + repoKey := repoAndPath[0] + repoPath := repoAndPath[1] + logMsgPrefix := fmt.Sprintf("[Multipart upload %s] ", repoPath) + + token, err := mu.createMultipartUpload(repoKey, repoPath, calculatePartSize(fileSize, 0)) + if err != nil { + return + } + + multipartUploadClient := &httputils.HttpClientDetails{ + AccessToken: token, + Transport: mu.httpClientsDetails.Transport, + DialTimeout: mu.httpClientsDetails.DialTimeout, + OverallRequestTimeout: mu.httpClientsDetails.OverallRequestTimeout, + } + + var progressReader ioutils.Progress + if progress != nil { + progressReader = progress.NewProgressReader(fileSize, "Multipart upload", targetPath) + defer progress.RemoveProgress(progressReader.GetId()) + } + + defer func() { + if err == nil { + log.Info(logMsgPrefix + "Upload completed successfully!") + } else { + err = errors.Join(err, mu.abort(logMsgPrefix, multipartUploadClient)) + } + }() + + if err = mu.uploadPartsConcurrently(logMsgPrefix, fileSize, splitCount, localPath, progressReader, multipartUploadClient); err != nil { + return + } + + if sha1 == "" { + var checksums map[biUtils.Algorithm]string + if checksums, err = biUtils.GetFileChecksums(localPath); errorutils.CheckError(err) != nil { + return + } + sha1 = checksums[biUtils.SHA1] + } + + if progress != nil { + progressReader = progress.SetMergingState(progressReader.GetId(), false) + } + + log.Info(logMsgPrefix + "Starting parts merge...") + // The total number of attempts is determined by the number of retries + 1 + return mu.completeAndPollForStatus(logMsgPrefix, uint(mu.client.GetHttpClient().GetRetries())+1, sha1, multipartUploadClient, progressReader) +} + +func (mu *MultipartUpload) uploadPartsConcurrently(logMsgPrefix string, fileSize int64, splitCount int, localPath string, progressReader ioutils.Progress, multipartUploadClient *httputils.HttpClientDetails) (err error) { + numberOfParts := calculateNumberOfParts(fileSize) + log.Info(fmt.Sprintf("%sSplitting file to %d parts, using %d working threads for uploading...", logMsgPrefix, numberOfParts, splitCount)) + producerConsumer := parallel.NewRunner(splitCount, uint(numberOfParts), false) + + wg := new(sync.WaitGroup) + wg.Add(int(numberOfParts)) + attemptsAllowed := new(atomic.Uint64) + attemptsAllowed.Add(uint64(numberOfParts) * uint64(mu.client.GetHttpClient().GetRetries())) + go func() { + for i := 0; i < int(numberOfParts); i++ { + if err = mu.produceUploadTask(producerConsumer, logMsgPrefix, localPath, fileSize, numberOfParts, int64(i), progressReader, multipartUploadClient, attemptsAllowed, wg); err != nil { + return + } + } + }() + go func() { + defer producerConsumer.Done() + wg.Wait() + }() + producerConsumer.Run() + if attemptsAllowed.Load() == 0 { + return errorutils.CheckError(errTooManyAttempts) + } + return +} + +func (mu *MultipartUpload) produceUploadTask(producerConsumer parallel.Runner, logMsgPrefix, localPath string, fileSize, numberOfParts, partId int64, progressReader ioutils.Progress, multipartUploadClient *httputils.HttpClientDetails, attemptsAllowed *atomic.Uint64, wg *sync.WaitGroup) (retErr error) { + _, retErr = producerConsumer.AddTaskWithError(func(int) error { + uploadErr := mu.uploadPart(logMsgPrefix, localPath, fileSize, partId, progressReader, multipartUploadClient) + if uploadErr == nil { + log.Info(fmt.Sprintf("%sCompleted uploading part %d/%d", logMsgPrefix, partId+1, numberOfParts)) + wg.Done() + } + return uploadErr + }, func(uploadErr error) { + if attemptsAllowed.Load() == 0 { + wg.Done() + return + } + log.Warn(fmt.Sprintf("%sPart %d/%d - %s", logMsgPrefix, partId+1, numberOfParts, uploadErr.Error())) + attemptsAllowed.Add(^uint64(0)) + + // Sleep before trying again + time.Sleep(retriesInterval) + if err := mu.produceUploadTask(producerConsumer, logMsgPrefix, localPath, fileSize, numberOfParts, partId, progressReader, multipartUploadClient, attemptsAllowed, wg); err != nil { + retErr = err + } + }) + return +} + +func (mu *MultipartUpload) uploadPart(logMsgPrefix, localPath string, fileSize, partId int64, progressReader ioutils.Progress, multipartUploadClient *httputils.HttpClientDetails) (err error) { + file, err := os.Open(localPath) + if err != nil { + return errorutils.CheckError(err) + } + defer func() { + err = errors.Join(err, errorutils.CheckError(file.Close())) + }() + if _, err = file.Seek(partId*uploadPartSize, io.SeekStart); err != nil { + return errorutils.CheckError(err) + } + partSize := calculatePartSize(fileSize, partId) + + limitReader := io.LimitReader(file, partSize) + limitReader = bufio.NewReader(limitReader) + if progressReader != nil { + limitReader = progressReader.ActionWithProgress(limitReader) + } + + urlPart, err := mu.generateUrlPart(logMsgPrefix, partId, multipartUploadClient) + if err != nil { + return + } + + resp, body, err := mu.client.GetHttpClient().UploadFileFromReader(limitReader, urlPart, httputils.HttpClientDetails{}, partSize) + if err != nil { + return + } + log.Debug("Artifactory response:", string(body), resp.Status) + return errorutils.CheckResponseStatusWithBody(resp, body, http.StatusOK) +} + +func (mu *MultipartUpload) createMultipartUpload(repoKey, repoPath string, partSize int64) (token string, err error) { + url := fmt.Sprintf("%s%screate?repoKey=%s&repoPath=%s&partSizeMB=%d", mu.artifactoryUrl, uploadsApi, repoKey, repoPath, partSize/SizeMiB) + resp, body, err := mu.client.SendPost(url, []byte{}, mu.httpClientsDetails) + if err != nil { + return + } + // We don't log the response body because it includes credentials + + if err = errorutils.CheckResponseStatusWithBody(resp, body, http.StatusOK); err != nil { + return + } + + var createMultipartUploadResponse createMultipartUploadResponse + err = json.Unmarshal(body, &createMultipartUploadResponse) + return createMultipartUploadResponse.Token, err +} + +type createMultipartUploadResponse struct { + Token string `json:"token,omitempty"` +} + +func (mu *MultipartUpload) generateUrlPart(logMsgPrefix string, partNumber int64, multipartUploadClient *httputils.HttpClientDetails) (partUrl string, err error) { + url := fmt.Sprintf("%s%surlPart?partNumber=%d", mu.artifactoryUrl, uploadsApi, partNumber+1) + resp, body, err := mu.client.GetHttpClient().SendPost(url, []byte{}, *multipartUploadClient, logMsgPrefix) + if err != nil { + return "", err + } + // We don't log the response body because it includes credentials + + if err = errorutils.CheckResponseStatusWithBody(resp, body, http.StatusOK); err != nil { + return + } + var urlPartResponse urlPartResponse + err = json.Unmarshal(body, &urlPartResponse) + return urlPartResponse.Url, errorutils.CheckError(err) +} + +type urlPartResponse struct { + Url string `json:"url,omitempty"` +} + +func (mu *MultipartUpload) completeAndPollForStatus(logMsgPrefix string, completionAttemptsLeft uint, sha1 string, multipartUploadClient *httputils.HttpClientDetails, progressReader ioutils.Progress) (err error) { + nodeId, err := mu.completeMultipartUpload(logMsgPrefix, sha1, multipartUploadClient) + if err != nil { + return + } + + err = mu.pollCompletionStatus(logMsgPrefix, completionAttemptsLeft, sha1, nodeId, multipartUploadClient, progressReader) + return +} + +func (mu *MultipartUpload) pollCompletionStatus(logMsgPrefix string, completionAttemptsLeft uint, sha1, nodeId string, multipartUploadClient *httputils.HttpClientDetails, progressReader ioutils.Progress) error { + multipartUploadClientWithNodeId := multipartUploadClient.Clone() + multipartUploadClientWithNodeId.Headers = map[string]string{artifactoryNodeIdHeader: nodeId} + + lastMergeLog := time.Now() + pollingExecutor := &utils.RetryExecutor{ + MaxRetries: int(maxPollingRetries), + RetriesIntervalMilliSecs: int(retriesInterval.Milliseconds()), + LogMsgPrefix: logMsgPrefix, + ExecutionHandler: func() (shouldRetry bool, err error) { + // Get completion status + status, err := mu.status(logMsgPrefix, multipartUploadClientWithNodeId) + if err != nil { + return false, err + } + + // Parse status + shouldRetry, shouldRerunComplete, err := parseMultipartUploadStatus(status) + if err != nil { + return false, err + } + + // Rerun complete if needed + if shouldRerunComplete { + if completionAttemptsLeft == 0 { + return false, errorutils.CheckErrorf("multipart upload failed after %d attempts", mu.client.GetHttpClient().GetRetries()) + } + err = mu.completeAndPollForStatus(logMsgPrefix, completionAttemptsLeft-1, sha1, multipartUploadClient, progressReader) + } + + // Log status + if status.Progress != nil { + if progressReader != nil { + progressReader.SetProgress(int64(*status.Progress)) + } + if time.Since(lastMergeLog) > mergingLoggingInterval { + log.Info(fmt.Sprintf("%sMerging progress: %d%%", logMsgPrefix, *status.Progress)) + lastMergeLog = time.Now() + } + } + return + }, + } + return pollingExecutor.Execute() +} + +func (mu *MultipartUpload) completeMultipartUpload(logMsgPrefix, sha1 string, multipartUploadClient *httputils.HttpClientDetails) (string, error) { + url := fmt.Sprintf("%s%scomplete?sha1=%s", mu.artifactoryUrl, uploadsApi, sha1) + resp, body, err := mu.client.GetHttpClient().SendPost(url, []byte{}, *multipartUploadClient, logMsgPrefix) + if err != nil { + return "", err + } + log.Debug("Artifactory response:", string(body), resp.Status) + return resp.Header.Get(artifactoryNodeIdHeader), errorutils.CheckResponseStatusWithBody(resp, body, http.StatusAccepted) +} + +func (mu *MultipartUpload) status(logMsgPrefix string, multipartUploadClientWithNodeId *httputils.HttpClientDetails) (status statusResponse, err error) { + url := fmt.Sprintf("%s%sstatus", mu.artifactoryUrl, uploadsApi) + resp, body, err := mu.client.GetHttpClient().SendPost(url, []byte{}, *multipartUploadClientWithNodeId, logMsgPrefix) + if err != nil { + return + } + log.Debug("Artifactory response:", string(body), resp.Status) + if err = errorutils.CheckResponseStatusWithBody(resp, body, http.StatusOK); err != nil { + return + } + err = errorutils.CheckError(json.Unmarshal(body, &status)) + return +} + +type statusResponse struct { + Status completionStatus `json:"status,omitempty"` + Error string `json:"error,omitempty"` + Progress *int `json:"progress,omitempty"` +} + +func (mu *MultipartUpload) abort(logMsgPrefix string, multipartUploadClient *httputils.HttpClientDetails) (err error) { + log.Info("Aborting multipart upload...") + url := fmt.Sprintf("%s%sabort", mu.artifactoryUrl, uploadsApi) + resp, body, err := mu.client.GetHttpClient().SendPost(url, []byte{}, *multipartUploadClient, logMsgPrefix) + if err != nil { + return + } + log.Debug("Artifactory response:", string(body), resp.Status) + return errorutils.CheckResponseStatusWithBody(resp, body, http.StatusNoContent) +} + +// Calculates the part size based on the file size and the part number. +// fileSize - the file size +// partNumber - the current part number +func calculatePartSize(fileSize int64, partNumber int64) int64 { + partOffset := partNumber * uploadPartSize + if partOffset+uploadPartSize > fileSize { + return fileSize - partOffset + } + return uploadPartSize +} + +// Calculates the number of parts based on the file size and the default part size. +// fileSize - the file size +func calculateNumberOfParts(fileSize int64) int64 { + return (fileSize + uploadPartSize - 1) / uploadPartSize +} + +func parseMultipartUploadStatus(status statusResponse) (shouldKeepPolling, shouldRerunComplete bool, err error) { + switch status.Status { + case queued, processing: + // File merging had not yet been completed - keep polling + return true, false, nil + case retryableError: + // Retryable error was received - stop polling and rerun the /complete API again + log.Warn("received error upon multipart upload completion process: '%s', retrying...", status.Error) + return false, true, nil + case finished, aborted: + // Upload finished or aborted + return false, false, nil + case nonRetryableError: + // Fatal error occurred - stop the entire process + return false, false, errorutils.CheckErrorf("received non retryable error upon multipart upload completion process: '%s'", status.Error) + default: + // Unexpected status - stop the entire process + return false, false, errorutils.CheckErrorf("received unexpected status upon multipart upload completion process: '%s', error: '%s'", status.Status, status.Error) + } +} diff --git a/artifactory/services/utils/multipartupload_test.go b/artifactory/services/utils/multipartupload_test.go new file mode 100644 index 000000000..76c45b3b7 --- /dev/null +++ b/artifactory/services/utils/multipartupload_test.go @@ -0,0 +1,367 @@ +package utils + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "os" + "testing" + + "github.com/jfrog/jfrog-client-go/auth" + "github.com/jfrog/jfrog-client-go/http/jfroghttpclient" + "github.com/jfrog/jfrog-client-go/utils" + "github.com/jfrog/jfrog-client-go/utils/io/fileutils" + "github.com/jfrog/jfrog-client-go/utils/io/httputils" + "github.com/jfrog/jfrog-client-go/utils/log" + "github.com/jfrog/jfrog-client-go/utils/tests" + "github.com/stretchr/testify/assert" + "golang.org/x/exp/rand" +) + +const ( + localPath = "localPath" + repoKey = "repoKey" + repoPath = "repoPath" + partSize = SizeGiB + partSizeMB = 1024 + partNumber = 2 + splitCount = 3 + token = "token" + partUrl = "http://dummy-url-part" + sha1 = "sha1" + nodeId = "nodeId" +) + +func TestIsSupported(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Check method + assert.Equal(t, http.MethodGet, r.Method) + + // Check URL + assert.Equal(t, "/api/v1/uploads/config", r.URL.Path) + + // Send response 200 OK + w.WriteHeader(http.StatusOK) + response, err := json.Marshal(getConfigResponse{Supported: true}) + assert.NoError(t, err) + _, err = w.Write(response) + assert.NoError(t, err) + }) + + // Create mock multipart upload with server + multipartUpload, cleanUp := createMockMultipartUpload(t, handler) + defer cleanUp() + + // Create Artifactory service details + rtDetails := &dummyArtifactoryServiceDetails{version: minArtifactoryVersion} + + // Execute IsSupported + supported, err := multipartUpload.IsSupported(rtDetails) + assert.NoError(t, err) + assert.True(t, supported) +} + +func TestUnsupportedVersion(t *testing.T) { + // Create Artifactory service details with unsupported Artifactory version + rtDetails := &dummyArtifactoryServiceDetails{version: "6.0.0"} + + // Create mock multipart upload with server + client, err := jfroghttpclient.JfrogClientBuilder().Build() + assert.NoError(t, err) + + // Execute IsSupported + supported, err := NewMultipartUpload(client, nil, "").IsSupported(rtDetails) + assert.NoError(t, err) + assert.False(t, supported) +} + +func TestCreateMultipartUpload(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Check method + assert.Equal(t, http.MethodPost, r.Method) + + // Check URL + assert.Equal(t, "/api/v1/uploads/create", r.URL.Path) + assert.Equal(t, fmt.Sprintf("repoKey=%s&repoPath=%s&partSizeMB=%d", repoKey, repoPath, partSizeMB), r.URL.RawQuery) + + // Send response 200 OK + w.WriteHeader(http.StatusOK) + response, err := json.Marshal(createMultipartUploadResponse{Token: token}) + assert.NoError(t, err) + _, err = w.Write(response) + assert.NoError(t, err) + }) + + // Create mock multipart upload with server + multipartUpload, cleanUp := createMockMultipartUpload(t, handler) + defer cleanUp() + + // Execute CreateMultipartUpload + actualToken, err := multipartUpload.createMultipartUpload(repoKey, repoPath, partSize) + assert.NoError(t, err) + assert.Equal(t, token, actualToken) +} + +func TestUploadPartsConcurrentlyTooManyAttempts(t *testing.T) { + // Create temp file + tempFile, cleanUp := createTempFile(t) + defer cleanUp() + + // Write something to the file + buf := make([]byte, uploadPartSize*3) + _, err := rand.Read(buf) + assert.NoError(t, err) + _, err = tempFile.Write(buf) + assert.NoError(t, err) + + var multipartUpload *MultipartUpload + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + // Generate part URL for upload + case http.MethodPost: + // Check URL + assert.Equal(t, "/api/v1/uploads/urlPart", r.URL.Path) + + // Send response 200 OK + w.WriteHeader(http.StatusOK) + response, unmarshalErr := json.Marshal(urlPartResponse{Url: multipartUpload.artifactoryUrl}) + assert.NoError(t, unmarshalErr) + _, err = w.Write(response) + assert.NoError(t, err) + // Fail the upload to trigger retry + case http.MethodPut: + assert.Equal(t, "/", r.URL.Path) + + // Send response 502 OK + w.WriteHeader(http.StatusBadGateway) + default: + assert.Fail(t, "unexpected method "+r.Method) + } + }) + + // Create mock multipart upload with server + multipartUpload, cleanUp = createMockMultipartUpload(t, handler) + defer cleanUp() + + // Execute uploadPartsConcurrently + fileSize := int64(len(buf)) + err = multipartUpload.uploadPartsConcurrently("", fileSize, splitCount, tempFile.Name(), nil, &httputils.HttpClientDetails{}) + assert.ErrorIs(t, err, errTooManyAttempts) +} + +func TestGenerateUrlPart(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Check method + assert.Equal(t, http.MethodPost, r.Method) + + // Check URL + assert.Equal(t, "/api/v1/uploads/urlPart", r.URL.Path) + assert.Equal(t, fmt.Sprintf("partNumber=%d", partNumber+1), r.URL.RawQuery) + + // Send response 200 OK + w.WriteHeader(http.StatusOK) + response, err := json.Marshal(urlPartResponse{Url: partUrl}) + assert.NoError(t, err) + _, err = w.Write(response) + assert.NoError(t, err) + }) + + // Create mock multipart upload with server + multipartUpload, cleanUp := createMockMultipartUpload(t, handler) + defer cleanUp() + + // Execute GenerateUrlPart + actualPartUrl, err := multipartUpload.generateUrlPart("", partNumber, &httputils.HttpClientDetails{}) + assert.NoError(t, err) + assert.Equal(t, partUrl, actualPartUrl) +} + +func TestCompleteMultipartUpload(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Check method + assert.Equal(t, http.MethodPost, r.Method) + + // Check URL + assert.Equal(t, "/api/v1/uploads/complete", r.URL.Path) + assert.Equal(t, fmt.Sprintf("sha1=%s", sha1), r.URL.RawQuery) + + // Add the "X-Artifactory-Node-Id" header to the response + w.Header().Add(artifactoryNodeIdHeader, nodeId) + + // Send response 202 Accepted + w.WriteHeader(http.StatusAccepted) + }) + + // Create mock multipart upload with server + multipartUpload, cleanUp := createMockMultipartUpload(t, handler) + defer cleanUp() + + // Execute completeMultipartUpload + actualNodeId, err := multipartUpload.completeMultipartUpload("", sha1, &httputils.HttpClientDetails{}) + assert.NoError(t, err) + assert.Equal(t, nodeId, actualNodeId) +} + +func TestStatus(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Check method + assert.Equal(t, http.MethodPost, r.Method) + + // Check URL + assert.Equal(t, "/api/v1/uploads/status", r.URL.Path) + + // Check "X-Artifactory-Node-Id" header + assert.Equal(t, nodeId, r.Header.Get(artifactoryNodeIdHeader)) + + // Send response 200 OK + w.WriteHeader(http.StatusOK) + response, err := json.Marshal(statusResponse{Status: finished, Progress: utils.Pointer(100)}) + assert.NoError(t, err) + _, err = w.Write(response) + assert.NoError(t, err) + }) + + // Create mock multipart upload with server + multipartUpload, cleanUp := createMockMultipartUpload(t, handler) + defer cleanUp() + + // Execute status + clientDetails := &httputils.HttpClientDetails{Headers: map[string]string{artifactoryNodeIdHeader: nodeId}} + status, err := multipartUpload.status("", clientDetails) + assert.NoError(t, err) + assert.Equal(t, statusResponse{Status: finished, Progress: utils.Pointer(100)}, status) +} + +func TestAbort(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Check method + assert.Equal(t, http.MethodPost, r.Method) + + // Check URL + assert.Equal(t, "/api/v1/uploads/abort", r.URL.Path) + + // Send response 204 No Content + w.WriteHeader(http.StatusNoContent) + }) + + // Create mock multipart upload with server + multipartUpload, cleanUp := createMockMultipartUpload(t, handler) + defer cleanUp() + + // Execute status + clientDetails := &httputils.HttpClientDetails{} + err := multipartUpload.abort("", clientDetails) + assert.NoError(t, err) +} + +var calculatePartSizeProvider = []struct { + fileSize int64 + partNumber int64 + expectedPartSize int64 +}{ + {uploadPartSize - 1, 0, uploadPartSize - 1}, + {uploadPartSize, 0, uploadPartSize}, + {uploadPartSize + 1, 0, uploadPartSize}, + + {uploadPartSize*2 - 1, 1, uploadPartSize - 1}, + {uploadPartSize * 2, 1, uploadPartSize}, + {uploadPartSize*2 + 1, 1, uploadPartSize}, +} + +func TestCalculatePartSize(t *testing.T) { + for _, testCase := range calculatePartSizeProvider { + t.Run(fmt.Sprintf("fileSize: %d partNumber: %d", testCase.fileSize, testCase.partNumber), func(t *testing.T) { + assert.Equal(t, testCase.expectedPartSize, calculatePartSize(testCase.fileSize, testCase.partNumber)) + }) + } +} + +var calculateNumberOfPartsProvider = []struct { + fileSize int64 + expectedNumberOfParts int64 +}{ + {0, 0}, + {1, 1}, + {uploadPartSize - 1, 1}, + {uploadPartSize, 1}, + {uploadPartSize + 1, 2}, + + {uploadPartSize*2 - 1, 2}, + {uploadPartSize * 2, 2}, + {uploadPartSize*2 + 1, 3}, +} + +func TestCalculateNumberOfParts(t *testing.T) { + for _, testCase := range calculateNumberOfPartsProvider { + t.Run(fmt.Sprintf("fileSize: %d", testCase.fileSize), func(t *testing.T) { + assert.Equal(t, testCase.expectedNumberOfParts, calculateNumberOfParts(testCase.fileSize)) + }) + } +} + +var parseMultipartUploadStatusProvider = []struct { + status completionStatus + shouldKeepPolling bool + shouldRerunComplete bool + expectedError string +}{ + {queued, true, false, ""}, + {processing, true, false, ""}, + {parts, false, false, "received unexpected status upon multipart upload completion process: 'PARTS', error: 'Some error'"}, + {finished, false, false, ""}, + {aborted, false, false, ""}, + {retryableError, false, true, ""}, + {nonRetryableError, false, false, "received non retryable error upon multipart upload completion process: 'Some error'"}, +} + +func TestParseMultipartUploadStatus(t *testing.T) { + previousLog := tests.RedirectLogOutputToNil() + defer func() { + log.SetLogger(previousLog) + }() + + for _, testCase := range parseMultipartUploadStatusProvider { + t.Run(string(testCase.status), func(t *testing.T) { + + shouldKeepPolling, shouldRerunComplete, err := parseMultipartUploadStatus(statusResponse{Status: testCase.status, Error: "Some error"}) + if testCase.expectedError != "" { + assert.EqualError(t, err, testCase.expectedError) + return + } + assert.NoError(t, err) + assert.Equal(t, testCase.shouldKeepPolling, shouldKeepPolling) + assert.Equal(t, testCase.shouldRerunComplete, shouldRerunComplete) + }) + } +} + +func createTempFile(t *testing.T) (tempFile *os.File, cleanUp func()) { + // Create a temporary file + tempFile, err := fileutils.CreateTempFile() + assert.NoError(t, err) + cleanUp = func() { + assert.NoError(t, tempFile.Close()) + assert.NoError(t, fileutils.RemovePath(localPath)) + } + return +} + +func createMockMultipartUpload(t *testing.T, handler http.Handler) (multipartUpload *MultipartUpload, cleanUp func()) { + ts := httptest.NewServer(handler) + cleanUp = ts.Close + + client, err := jfroghttpclient.JfrogClientBuilder().Build() + assert.NoError(t, err) + multipartUpload = NewMultipartUpload(client, &httputils.HttpClientDetails{}, ts.URL) + return +} + +type dummyArtifactoryServiceDetails struct { + auth.CommonConfigFields + version string +} + +func (dasd *dummyArtifactoryServiceDetails) GetVersion() (string, error) { + return dasd.version, nil +} diff --git a/artifactory/services/utils/storageutils.go b/artifactory/services/utils/storageutils.go index bb5f2504d..6b87946ce 100644 --- a/artifactory/services/utils/storageutils.go +++ b/artifactory/services/utils/storageutils.go @@ -5,6 +5,13 @@ import ( "errors" ) +const ( + SizeKib int64 = 1 << 10 + SizeMiB int64 = 1 << 20 + SizeGiB int64 = 1 << 30 + SizeTiB int64 = 1 << 40 +) + type FolderInfo struct { Uri string `json:"uri,omitempty"` Repo string `json:"repo,omitempty"` diff --git a/go.mod b/go.mod index cdf60162e..706f9d012 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,6 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) -// replace github.com/jfrog/build-info-go => github.com/jfrog/build-info-go v1.8.9-0.20231220102935-c8776c613ad8 +replace github.com/jfrog/build-info-go => github.com/jfrog/build-info-go v1.8.9-0.20240222124058-bd9687a8666e // replace github.com/jfrog/gofrog => github.com/jfrog/gofrog dev diff --git a/go.sum b/go.sum index d1a5a201e..234252eba 100644 --- a/go.sum +++ b/go.sum @@ -52,8 +52,8 @@ github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOl github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= github.com/jfrog/archiver/v3 v3.6.0 h1:OVZ50vudkIQmKMgA8mmFF9S0gA47lcag22N13iV3F1w= github.com/jfrog/archiver/v3 v3.6.0/go.mod h1:fCAof46C3rAXgZurS8kNRNdSVMKBbZs+bNNhPYxLldI= -github.com/jfrog/build-info-go v1.9.23 h1:+TwUIBEJwRvz9skR8xBfY5ti8Vl4Z6iMCkFbkclnEN0= -github.com/jfrog/build-info-go v1.9.23/go.mod h1:QHcKuesY4MrBVBuEwwBz4uIsX6mwYuMEDV09ng4AvAU= +github.com/jfrog/build-info-go v1.8.9-0.20240222124058-bd9687a8666e h1:NzB2yvEojIhP5KIX9SeCqSljZmoiE98hBzXYvvi52D0= +github.com/jfrog/build-info-go v1.8.9-0.20240222124058-bd9687a8666e/go.mod h1:QHcKuesY4MrBVBuEwwBz4uIsX6mwYuMEDV09ng4AvAU= github.com/jfrog/gofrog v1.6.0 h1:jOwb37nHY2PnxePNFJ6e6279Pgkr3di05SbQQw47Mq8= github.com/jfrog/gofrog v1.6.0/go.mod h1:SZ1EPJUruxrVGndOzHd+LTiwWYKMlHqhKD+eu+v5Hqg= github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4= diff --git a/http/httpclient/client.go b/http/httpclient/client.go index 7c70a4532..658bd7f6e 100644 --- a/http/httpclient/client.go +++ b/http/httpclient/client.go @@ -286,6 +286,7 @@ func (jc *HttpClient) doUploadFile(localPath, url string, httpClientsDetails htt } else { reader = reqContent } + resp, body, err = jc.UploadFileFromReader(reader, url, httpClientsDetails, size) return } @@ -481,7 +482,7 @@ func (jc *HttpClient) DownloadFileConcurrently(flags ConcurrentDownloadFlags, lo var downloadProgressId int if progress != nil { - downloadProgress := progress.NewProgressReader(flags.FileSize, "Downloading", flags.RelativePath) + downloadProgress := progress.NewProgressReader(flags.FileSize, "Multipart download", flags.RelativePath) downloadProgressId = downloadProgress.GetId() // Aborting order matters. mergingProgress depends on the existence of downloadingProgress defer progress.RemoveProgress(downloadProgressId) @@ -511,7 +512,7 @@ func (jc *HttpClient) DownloadFileConcurrently(flags ConcurrentDownloadFlags, lo } } if progress != nil { - progress.SetProgressState(downloadProgressId, "Merging") + progress.SetMergingState(downloadProgressId, true) } err = mergeChunks(chunksPaths, flags) if errorutils.CheckError(err) != nil { diff --git a/tests/artifactorymultipartupload_test.go b/tests/artifactorymultipartupload_test.go new file mode 100644 index 000000000..78a4fe11a --- /dev/null +++ b/tests/artifactorymultipartupload_test.go @@ -0,0 +1,89 @@ +package tests + +import ( + "os" + "path/filepath" + "testing" + + "github.com/jfrog/jfrog-client-go/artifactory/services" + "github.com/jfrog/jfrog-client-go/artifactory/services/utils" + "github.com/jfrog/jfrog-client-go/utils/io/fileutils" + "github.com/stretchr/testify/assert" +) + +const ( + bigFileSize = 100 << 20 + propertyKey = "prop-key" + propertyValue = "prop-value" +) + +func initArtifactoryMultipartUploadTest(t *testing.T) { + if !*TestMultipartUpload { + t.Skip("Skipping multipart upload test. To run artifactory test add the '-test.mpu=true' option.") + } + + supported, err := testsUploadService.MultipartUpload.IsSupported(testsUploadService.ArtDetails) + assert.NoError(t, err) + if !supported { + t.Skip("Skipping multipart upload test. Multipart upload test is not supported in the provided Artifactory server.") + } +} + +func TestArtifactoryMultipartUpload(t *testing.T) { + initArtifactoryMultipartUploadTest(t) + t.Run("multipartUpload", multipartUpload) +} + +func multipartUpload(t *testing.T) { + bigFile, cleanup := createBigFile(t) + defer cleanup() + + // Create upload parameters + up := services.NewUploadParams() + props := utils.NewProperties() + props.AddProperty(propertyKey, propertyValue) + up.CommonParams = &utils.CommonParams{Pattern: bigFile.Name(), Target: getRtTargetRepo(), TargetProps: props} + up.Flat = true + up.MinChecksumDeploy = bigFileSize + 1 + up.MinSplitSize = bigFileSize + + // Upload file and verify success + summary, err := testsUploadService.UploadFiles(up) + assert.NoError(t, err) + assert.Equal(t, 1, summary.TotalSucceeded) + assert.Zero(t, summary.TotalFailed) + + // Search for the uploaded file in Artifactory + searchParams := services.NewSearchParams() + searchParams.Pattern = getRtTargetRepo() + reader, err := testsSearchService.Search(searchParams) + defer readerCloseAndAssert(t, reader) + assert.NoError(t, err) + length, err := reader.Length() + assert.NoError(t, err) + assert.Equal(t, 1, length) + + // Ensure existence of the uploaded file and verify properties + for item := new(utils.ResultItem); reader.NextRecord(item) == nil; item = new(utils.ResultItem) { + assert.Equal(t, filepath.Base(bigFile.Name()), item.Name) + assert.Equal(t, propertyValue, item.GetProperty(propertyKey)) + } + readerGetErrorAndAssert(t, reader) + + // Cleanup + artifactoryCleanup(t) +} + +func createBigFile(t *testing.T) (bigFile *os.File, cleanUp func()) { + bigFile, err := fileutils.CreateTempFile() + assert.NoError(t, err) + + cleanUp = func() { + assert.NoError(t, os.Remove(bigFile.Name())) + } + + data := make([]byte, int(bigFileSize)) + _, err = bigFile.Write(data) + assert.NoError(t, err) + return +} diff --git a/tests/jfrogclient_test.go b/tests/jfrogclient_test.go index cdba5cc94..ceda8bd8a 100644 --- a/tests/jfrogclient_test.go +++ b/tests/jfrogclient_test.go @@ -28,7 +28,7 @@ func TestMain(m *testing.M) { func setupIntegrationTests() { flag.Parse() log.SetLogger(log.NewLogger(log.DEBUG, nil)) - if *TestArtifactory || *TestDistribution || *TestXray || *TestRepositories { + if *TestArtifactory || *TestDistribution || *TestXray || *TestRepositories || *TestMultipartUpload { createArtifactoryUploadManager() createArtifactorySearchManager() createArtifactoryDeleteManager() diff --git a/tests/utils_test.go b/tests/utils_test.go index 6f38ea735..7a0ff6829 100644 --- a/tests/utils_test.go +++ b/tests/utils_test.go @@ -52,6 +52,7 @@ var ( TestPipelines *bool TestAccess *bool TestRepositories *bool + TestMultipartUpload *bool RtUrl *string DistUrl *string XrayUrl *string @@ -147,6 +148,7 @@ func init() { TestPipelines = flag.Bool("test.pipelines", false, "Test pipelines") TestAccess = flag.Bool("test.access", false, "Test access") TestRepositories = flag.Bool("test.repositories", false, "Test repositories in Artifactory") + TestMultipartUpload = flag.Bool("test.mpu", false, "Test Artifactory multipart upload") RtUrl = flag.String("rt.url", "http://localhost:8081/artifactory", "Artifactory url") DistUrl = flag.String("ds.url", "", "Distribution url") XrayUrl = flag.String("xr.url", "", "Xray url") @@ -216,6 +218,8 @@ func createArtifactoryUploadManager() { testsUploadService = services.NewUploadService(client) testsUploadService.ArtDetails = artDetails testsUploadService.Threads = 3 + httpClientDetails := testsUploadService.ArtDetails.CreateHttpClientDetails() + testsUploadService.MultipartUpload = utils.NewMultipartUpload(client, &httpClientDetails, testsUploadService.ArtDetails.GetUrl()) } func createArtifactoryUserManager() { @@ -427,6 +431,7 @@ func createArtifactoryAqlManager() { func createJfrogHttpClient(artDetailsPtr *auth.ServiceDetails) (*jfroghttpclient.JfrogHttpClient, error) { artDetails := *artDetailsPtr return jfroghttpclient.JfrogClientBuilder(). + SetRetries(3). SetClientCertPath(artDetails.GetClientCertPath()). SetClientCertKeyPath(artDetails.GetClientCertKeyPath()). AppendPreRequestInterceptor(artDetails.RunPreRequestFunctions). @@ -667,7 +672,7 @@ func artifactoryCleanup(t *testing.T) { } func createRepo() error { - if !(*TestArtifactory || *TestDistribution || *TestXray || *TestRepositories) { + if !(*TestArtifactory || *TestDistribution || *TestXray || *TestRepositories || *TestMultipartUpload) { return nil } var err error @@ -684,7 +689,7 @@ func createRepo() error { } func teardownIntegrationTests() { - if !(*TestArtifactory || *TestDistribution || *TestXray || *TestRepositories) { + if !(*TestArtifactory || *TestDistribution || *TestXray || *TestRepositories || *TestMultipartUpload) { return } repo := getRtTargetRepoKey() diff --git a/utils/io/fileutils/files.go b/utils/io/fileutils/files.go index 3b64a4b04..119d5b8d3 100644 --- a/utils/io/fileutils/files.go +++ b/utils/io/fileutils/files.go @@ -404,11 +404,11 @@ func GetFileDetailsFromReader(reader io.Reader, includeChecksums bool) (details } func calcChecksumDetailsFromReader(reader io.Reader) (entities.Checksum, error) { - checksumInfo, err := biutils.CalcChecksums(reader) + checksums, err := biutils.CalcChecksums(reader) if err != nil { return entities.Checksum{}, errorutils.CheckError(err) } - return entities.Checksum{Md5: checksumInfo[biutils.MD5], Sha1: checksumInfo[biutils.SHA1], Sha256: checksumInfo[biutils.SHA256]}, nil + return entities.Checksum{Md5: checksums[biutils.MD5], Sha1: checksums[biutils.SHA1], Sha256: checksums[biutils.SHA256]}, nil } type FileDetails struct { diff --git a/utils/io/progress.go b/utils/io/progress.go index e8335ec68..d67a2dd07 100644 --- a/utils/io/progress.go +++ b/utils/io/progress.go @@ -8,8 +8,8 @@ type ProgressMgr interface { // Input: 'total' - file size, 'label' - the title of the operation, 'path' - the path of the file being processed. // Output: progress indicator id NewProgressReader(total int64, label, path string) (progress Progress) - // Changes progress indicator state. - SetProgressState(id int, state string) + // Changes progress indicator state to merging. + SetMergingState(id int, useSpinner bool) (bar Progress) // Returns the requested progress indicator. GetProgress(id int) (progress Progress) // Aborts a progress indicator. Called on both successful and unsuccessful operations. @@ -32,6 +32,8 @@ type ProgressMgr interface { type Progress interface { // Used for updating the progress indicator progress. ActionWithProgress(reader io.Reader) (results io.Reader) + // Used for setting the progress indicator progress. + SetProgress(progress int64) // Aborts a progress indicator. Called on both successful and unsuccessful operations Abort() // Returns the Progress ID diff --git a/utils/retryexecutor.go b/utils/retryexecutor.go index e3b2faeb9..bad3dc16e 100644 --- a/utils/retryexecutor.go +++ b/utils/retryexecutor.go @@ -10,7 +10,7 @@ import ( "github.com/jfrog/jfrog-client-go/utils/log" ) -type ExecutionHandlerFunc func() (bool, error) +type ExecutionHandlerFunc func() (shouldRetry bool, err error) type RetryExecutor struct { // The context