Skip to content

Commit

Permalink
Artifactory Multi-Part Upload support (#900)
Browse files Browse the repository at this point in the history
  • Loading branch information
yahavi authored Feb 22, 2024
1 parent fec9a7c commit 3cea121
Show file tree
Hide file tree
Showing 18 changed files with 1,028 additions and 88 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ jobs:
- name: artifactory tests
run: go test -v github.com/jfrog/jfrog-client-go/tests --timeout 0 --test.${{ matrix.suite }} --ci.runId=${{ runner.os }}-${{ matrix.suite }}

JFrog-Client-Go-Ds-Xr-Tests:
JFrog-Client-Go-Ds-Xr-MPU-Tests:
needs: Pretest
name: ${{ matrix.suite }} ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
suite: [ distribution, xray ]
suite: [ distribution, xray, mpu ]
os: [ ubuntu, windows, macos ]
runs-on: ${{ matrix.os }}-latest
steps:
Expand Down
24 changes: 15 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,15 @@ content of this repository is deleted.

#### Test Types

| Type | Description | Prerequisites |
| -------------------- | ------------------ | ----------------------------- |
| `-test.artifactory` | Artifactory tests | Artifactory Pro |
| `-test.distribution` | Distribution tests | Artifactory with Distribution |
| `-test.xray` | Xray tests | Artifactory with Xray |
| `-test.pipelines` | Pipelines tests | JFrog Pipelines |
| `-test.access` | Access tests | Artifactory Pro |
| `-test.repositories` | Access tests | Artifactory Pro |
| Type | Description | Prerequisites |
| -------------------- | ---------------------- | ------------------------------- |
| `-test.artifactory` | Artifactory tests | Artifactory Pro |
| `-test.distribution` | Distribution tests | Artifactory with Distribution |
| `-test.xray` | Xray tests | Artifactory with Xray |
| `-test.pipelines` | Pipelines tests | JFrog Pipelines |
| `-test.access` | Access tests | Artifactory Pro |
| `-test.repositories` | Repositories tests | Artifactory Pro |
| `-test.mpu` | Multipart upload tests | Artifactory Pro with S3 storage |

#### Connection Details

Expand Down Expand Up @@ -400,6 +401,12 @@ params.Symlink = false
params.Exclusions = "(.*)a.zip"
// Retries default value: 3
params.Retries = 5
// The maximum number of parts that can be concurrently uploaded per file during a multi-part upload. Set to 0 to disable multi-part upload.
// SplitCount default value: 5
params.SplitCount = 10
// The minimum file size in MiB required to attempt a multi-part upload.
// MinSplitSize default value: 200
params.MinSplitSize = 100
// The min file size in bytes for "checksum deploy".
// "Checksum deploy" is the action of calculating the file checksum locally, before
// the upload, and skipping the actual file transfer if the file already
Expand Down Expand Up @@ -2230,7 +2237,6 @@ xscVersion, err := scanService.IsXscEnabled()
multiScanId, err := scanService.SendScanGitInfoContext(details)
```
## Pipelines APIs
### Creating Pipelines Service Manager
Expand Down
2 changes: 2 additions & 0 deletions artifactory/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ func (sm *ArtifactoryServicesManagerImp) initUploadService() *services.UploadSer
uploadService.ArtDetails = sm.config.GetServiceDetails()
uploadService.DryRun = sm.config.IsDryRun()
uploadService.Progress = sm.progress
httpClientDetails := uploadService.ArtDetails.CreateHttpClientDetails()
uploadService.MultipartUpload = utils.NewMultipartUpload(sm.client, &httpClientDetails, uploadService.ArtDetails.GetUrl())
return uploadService
}

Expand Down
16 changes: 4 additions & 12 deletions artifactory/services/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,19 +440,11 @@ func createLocalSymlink(localPath, localFileName, symlinkArtifact string, symlin
if !fileutils.IsPathExists(symlinkArtifact, false) {
return errorutils.CheckErrorf("symlink validation failed, target doesn't exist: " + symlinkArtifact)
}
file, err := os.Open(symlinkArtifact)
if err = errorutils.CheckError(err); err != nil {
return err
}
defer func() {
err = errors.Join(err, errorutils.CheckError(file.Close()))
}()
checksumInfo, err := biutils.CalcChecksums(file, biutils.SHA1)
if err = errorutils.CheckError(err); err != nil {
return err
var checksums map[biutils.Algorithm]string
if checksums, err = biutils.GetFileChecksums(symlinkArtifact, biutils.SHA1); err != nil {
return errorutils.CheckError(err)
}
sha1 := checksumInfo[biutils.SHA1]
if sha1 != symlinkContentChecksum {
if checksums[biutils.SHA1] != symlinkContentChecksum {
return errorutils.CheckErrorf("symlink validation failed for target: " + symlinkArtifact)
}
}
Expand Down
8 changes: 4 additions & 4 deletions artifactory/services/fspatterns/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,15 @@ func GetRootPath(pattern, target, archiveTarget string, patternType utils.Patter

// When handling symlink we want to simulate the creation of empty file
func CreateSymlinkFileDetails() (*fileutils.FileDetails, error) {
checksumInfo, err := biutils.CalcChecksums(bytes.NewBuffer([]byte(fileutils.SymlinkFileContent)))
checksums, err := biutils.CalcChecksums(bytes.NewBuffer([]byte(fileutils.SymlinkFileContent)))
if err != nil {
return nil, errorutils.CheckError(err)
}

details := new(fileutils.FileDetails)
details.Checksum.Md5 = checksumInfo[biutils.MD5]
details.Checksum.Sha1 = checksumInfo[biutils.SHA1]
details.Checksum.Sha256 = checksumInfo[biutils.SHA256]
details.Checksum.Md5 = checksums[biutils.MD5]
details.Checksum.Sha1 = checksums[biutils.SHA1]
details.Checksum.Sha256 = checksums[biutils.SHA256]
details.Size = int64(0)
return details, nil
}
129 changes: 81 additions & 48 deletions artifactory/services/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,24 @@ import (
"github.com/jfrog/jfrog-client-go/utils/log"
)

const (
// 10 KiB
DefaultMinChecksumDeploy = utils.SizeKib * 10
// The default minimum file size for attempting multi-part upload
defaultUploadMinSplit = utils.SizeMiB * 200
// The default maximum number of parts that can be concurrently uploaded per file during a multi-part upload
defaultUploadSplitCount = 5
)

type UploadService struct {
client *jfroghttpclient.JfrogHttpClient
Progress ioutils.ProgressMgr
ArtDetails auth.ServiceDetails
DryRun bool
Threads int
saveSummary bool
resultsManager *resultsManager
client *jfroghttpclient.JfrogHttpClient
Progress ioutils.ProgressMgr
ArtDetails auth.ServiceDetails
MultipartUpload *utils.MultipartUpload
DryRun bool
Threads int
saveSummary bool
resultsManager *resultsManager
}

const JfrogCliUploadEmptyArchiveEnv = "JFROG_CLI_UPLOAD_EMPTY_ARCHIVE"
Expand Down Expand Up @@ -179,19 +189,12 @@ func createProperties(artifact clientutils.Artifact, uploadParams UploadParams)
}
// If Symlink target exists -> get SHA1 if isn't a directory
} else if !fileInfo.IsDir() {
file, err := os.Open(artifact.LocalPath)
if err != nil {
return nil, errorutils.CheckError(err)
}
defer func() {
err = errors.Join(err, errorutils.CheckError(file.Close()))
}()
checksumInfo, err := biutils.CalcChecksums(file, biutils.SHA1)
var checksums map[biutils.Algorithm]string
checksums, err := biutils.GetFileChecksums(artifact.LocalPath, biutils.SHA1)
if err != nil {
return nil, errorutils.CheckError(err)
}
sha1 := checksumInfo[biutils.SHA1]
artifactProps.AddProperty(utils.SymlinkSha1, sha1)
artifactProps.AddProperty(utils.SymlinkSha1, checksums[biutils.SHA1])
}
artifactProps.AddProperty(utils.ArtifactorySymlink, artifactSymlink)
}
Expand Down Expand Up @@ -486,7 +489,7 @@ func (us *UploadService) uploadFile(artifact UploadData, uploadParams UploadPara
if uploadParams.IsSymlink() && fileutils.IsFileSymlink(fileInfo) {
resp, details, body, err = us.uploadSymlink(targetPathWithProps, logMsgPrefix, httpClientsDetails, uploadParams)
} else {
resp, details, body, checksumDeployed, err = us.doUpload(artifact.Artifact.LocalPath, targetPathWithProps, logMsgPrefix, httpClientsDetails, fileInfo, uploadParams)
resp, details, body, checksumDeployed, err = us.doUpload(artifact, targetPathWithProps, logMsgPrefix, httpClientsDetails, fileInfo, uploadParams)
}
if err != nil {
return nil, false, err
Expand All @@ -503,6 +506,21 @@ func (us *UploadService) shouldTryChecksumDeploy(fileSize int64, uploadParams Up
return uploadParams.ChecksumsCalcEnabled && fileSize >= uploadParams.MinChecksumDeploy && !uploadParams.IsExplodeArchive()
}

func (us *UploadService) shouldDoMultipartUpload(fileSize int64, uploadParams UploadParams) (bool, error) {
if uploadParams.SplitCount == 0 || fileSize < uploadParams.MinSplitSize {
return false, nil
}
if fileSize > utils.MaxMultipartUploadFileSize {
log.Debug(fmt.Sprintf("Max file size for multipart upload exceeded: %d>%d", fileSize, utils.MaxMultipartUploadFileSize))
return false, nil
}
if uploadParams.IsExplodeArchive() {
// Explode archives is not supported in multipart uploads
return false, nil
}
return us.MultipartUpload.IsSupported(us.ArtDetails)
}

// Reads a file from a Reader that is given from a function (getReaderFunc) and uploads it to the specified target path.
// getReaderFunc is called only if checksum deploy was successful.
// Returns true if the file was successfully uploaded.
Expand All @@ -514,7 +532,7 @@ func (us *UploadService) uploadFileFromReader(getReaderFunc func() (io.Reader, e
httpClientsDetails := us.ArtDetails.CreateHttpClientDetails()
if !us.DryRun {
if us.shouldTryChecksumDeploy(details.Size, uploadParams) {
resp, body, e = us.tryChecksumDeploy(details, targetUrlWithProps, httpClientsDetails, us.client)
resp, body, e = us.doChecksumDeploy(details, targetUrlWithProps, httpClientsDetails, us.client)
if e != nil {
return false, e
}
Expand Down Expand Up @@ -570,37 +588,50 @@ func (us *UploadService) uploadSymlink(targetPath, logMsgPrefix string, httpClie
return
}

func (us *UploadService) doUpload(localPath, targetUrlWithProps, logMsgPrefix string, httpClientsDetails httputils.HttpClientDetails, fileInfo os.FileInfo, uploadParams UploadParams) (*http.Response, *fileutils.FileDetails, []byte, bool, error) {
var details *fileutils.FileDetails
var checksumDeployed bool
var resp *http.Response
var body []byte
var err error
addExplodeHeader(&httpClientsDetails, uploadParams.IsExplodeArchive())
if !us.DryRun {
if us.shouldTryChecksumDeploy(fileInfo.Size(), uploadParams) {
details, err = fileutils.GetFileDetails(localPath, uploadParams.ChecksumsCalcEnabled)
if err != nil {
return resp, details, body, checksumDeployed, err
}
resp, body, err = us.tryChecksumDeploy(details, targetUrlWithProps, httpClientsDetails, us.client)
if err != nil {
return resp, details, body, checksumDeployed, err
}
checksumDeployed = isSuccessfulUploadStatusCode(resp.StatusCode)
func (us *UploadService) doUpload(artifact UploadData, targetUrlWithProps, logMsgPrefix string, httpClientsDetails httputils.HttpClientDetails, fileInfo os.FileInfo, uploadParams UploadParams) (
resp *http.Response, details *fileutils.FileDetails, body []byte, checksumDeployed bool, err error) {
// Get local file details
details, err = fileutils.GetFileDetails(artifact.Artifact.LocalPath, uploadParams.ChecksumsCalcEnabled)
if err != nil {
return
}

// Return if dry run
if us.DryRun {
return
}

// Try checksum deploy
if us.shouldTryChecksumDeploy(fileInfo.Size(), uploadParams) {
resp, body, err = us.doChecksumDeploy(details, targetUrlWithProps, httpClientsDetails, us.client)
if err != nil {
return resp, details, body, checksumDeployed, err
}
if !checksumDeployed {
resp, body, err = utils.UploadFile(localPath, targetUrlWithProps, logMsgPrefix, &us.ArtDetails, details,
httpClientsDetails, us.client, uploadParams.ChecksumsCalcEnabled, us.Progress)
if err != nil {
return resp, details, body, checksumDeployed, err
}
if isSuccessfulUploadStatusCode(resp.StatusCode) {
checksumDeployed = true
return
}
}
if details == nil {
details, err = fileutils.GetFileDetails(localPath, uploadParams.ChecksumsCalcEnabled)

// Try multipart upload
var shouldTryMultipart bool
if shouldTryMultipart, err = us.shouldDoMultipartUpload(fileInfo.Size(), uploadParams); err != nil {
return
}
if shouldTryMultipart {
if err = us.MultipartUpload.UploadFileConcurrently(artifact.Artifact.LocalPath, artifact.Artifact.TargetPath, fileInfo.Size(), details.Checksum.Sha1, us.Progress, uploadParams.SplitCount); err != nil {
return
}
// Once the file is uploaded to the storage, we finalize the multipart upload by performing a checksum deployment to save the file in Artifactory.
resp, body, err = us.doChecksumDeploy(details, targetUrlWithProps, httpClientsDetails, us.client)
return
}
return resp, details, body, checksumDeployed, err

// Do regular upload
addExplodeHeader(&httpClientsDetails, uploadParams.IsExplodeArchive())
resp, body, err = utils.UploadFile(artifact.Artifact.LocalPath, targetUrlWithProps, logMsgPrefix, &us.ArtDetails, details,
httpClientsDetails, us.client, uploadParams.ChecksumsCalcEnabled, us.Progress)
return
}

func (us *UploadService) doUploadFromReader(fileReader io.Reader, targetUrlWithProps string, httpClientsDetails httputils.HttpClientDetails, uploadParams UploadParams, details *fileutils.FileDetails) (*http.Response, *fileutils.FileDetails, []byte, error) {
Expand Down Expand Up @@ -643,7 +674,7 @@ func addExplodeHeader(httpClientsDetails *httputils.HttpClientDetails, isExplode
}
}

func (us *UploadService) tryChecksumDeploy(details *fileutils.FileDetails, targetPath string, httpClientsDetails httputils.HttpClientDetails,
func (us *UploadService) doChecksumDeploy(details *fileutils.FileDetails, targetPath string, httpClientsDetails httputils.HttpClientDetails,
client *jfroghttpclient.JfrogHttpClient) (resp *http.Response, body []byte, err error) {
requestClientDetails := httpClientsDetails.Clone()
utils.AddHeader("X-Checksum-Deploy", "true", &requestClientDetails.Headers)
Expand Down Expand Up @@ -676,14 +707,16 @@ type UploadParams struct {
Flat bool
AddVcsProps bool
MinChecksumDeploy int64
MinSplitSize int64
SplitCount int
ChecksumsCalcEnabled bool
Archive string
// When using the 'archive' option for upload, we can control the target path inside the uploaded archive using placeholders. This operation determines the TargetPathInArchive value.
TargetPathInArchive string
}

func NewUploadParams() UploadParams {
return UploadParams{CommonParams: &utils.CommonParams{}, MinChecksumDeploy: 10240, ChecksumsCalcEnabled: true}
return UploadParams{CommonParams: &utils.CommonParams{}, MinChecksumDeploy: DefaultMinChecksumDeploy, ChecksumsCalcEnabled: true, MinSplitSize: defaultUploadMinSplit, SplitCount: defaultUploadSplitCount}
}

func DeepCopyUploadParams(params *UploadParams) UploadParams {
Expand Down
Loading

0 comments on commit 3cea121

Please sign in to comment.