Skip to content

Commit

Permalink
Multipart upload - Set the the checksum deploy token (#989)
Browse files Browse the repository at this point in the history
  • Loading branch information
yahavi authored Jul 31, 2024
1 parent 235dd0e commit 904807c
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 23 deletions.
4 changes: 3 additions & 1 deletion artifactory/services/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,11 +624,13 @@ func (us *UploadService) doUpload(artifact UploadData, targetUrlWithProps, logMs
return
}
if shouldTryMultipart {
if err = us.MultipartUpload.UploadFileConcurrently(artifact.Artifact.LocalPath, artifact.Artifact.TargetPath,
var checksumToken string
if checksumToken, err = us.MultipartUpload.UploadFileConcurrently(artifact.Artifact.LocalPath, artifact.Artifact.TargetPath,
fileInfo.Size(), details.Checksum.Sha1, us.Progress, uploadParams.SplitCount, uploadParams.ChunkSize); err != nil {
return
}
// Once the file is uploaded to the storage, we finalize the multipart upload by performing a checksum deployment to save the file in Artifactory.
utils.AddChecksumTokenHeader(httpClientsDetails.Headers, checksumToken)
resp, body, err = us.doChecksumDeploy(details, targetUrlWithProps, httpClientsDetails, us.client)
return
}
Expand Down
6 changes: 6 additions & 0 deletions artifactory/services/utils/artifactoryutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ func AddChecksumHeaders(headers map[string]string, fileDetails *fileutils.FileDe
}
}

// Add the checksum token header to the headers map.
// This header enables Artifactory to accept the Checksum Deployment of a file uploaded via multipart upload.
func AddChecksumTokenHeader(headers map[string]string, checksumToken string) {
AddHeader("X-Checksum-Deploy-Token", checksumToken, &headers)
}

func AddAuthHeaders(headers map[string]string, artifactoryDetails auth.ServiceDetails) {
if headers == nil {
headers = make(map[string]string)
Expand Down
20 changes: 11 additions & 9 deletions artifactory/services/utils/multipartupload.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ type getConfigResponse struct {
}

func (mu *MultipartUpload) UploadFileConcurrently(localPath, targetPath string, fileSize int64, sha1 string,
progress ioutils.ProgressMgr, splitCount int, chunkSize int64) (err error) {
progress ioutils.ProgressMgr, splitCount int, chunkSize int64) (checksumToken string, err error) {
repoAndPath := strings.SplitN(targetPath, "/", 2)
repoKey := repoAndPath[0]
repoPath := repoAndPath[1]
Expand Down Expand Up @@ -302,17 +302,17 @@ type urlPartResponse struct {
Url string `json:"url,omitempty"`
}

func (mu *MultipartUpload) completeAndPollForStatus(logMsgPrefix string, completionAttemptsLeft uint, sha1 string, multipartUploadClient *httputils.HttpClientDetails, progressReader ioutils.Progress) (err error) {
func (mu *MultipartUpload) completeAndPollForStatus(logMsgPrefix string, completionAttemptsLeft uint, sha1 string, multipartUploadClient *httputils.HttpClientDetails, progressReader ioutils.Progress) (checksumToken string, err error) {
err = mu.completeMultipartUpload(logMsgPrefix, sha1, multipartUploadClient)
if err != nil {
return
}

err = mu.pollCompletionStatus(logMsgPrefix, completionAttemptsLeft, sha1, multipartUploadClient, progressReader)
checksumToken, err = mu.pollCompletionStatus(logMsgPrefix, completionAttemptsLeft, sha1, multipartUploadClient, progressReader)
return
}

func (mu *MultipartUpload) pollCompletionStatus(logMsgPrefix string, completionAttemptsLeft uint, sha1 string, multipartUploadClient *httputils.HttpClientDetails, progressReader ioutils.Progress) error {
func (mu *MultipartUpload) pollCompletionStatus(logMsgPrefix string, completionAttemptsLeft uint, sha1 string, multipartUploadClient *httputils.HttpClientDetails, progressReader ioutils.Progress) (checksumToken string, err error) {
multipartUploadClientWithNodeId := multipartUploadClient.Clone()

lastMergeLog := time.Now()
Expand All @@ -338,7 +338,7 @@ func (mu *MultipartUpload) pollCompletionStatus(logMsgPrefix string, completionA
if completionAttemptsLeft == 0 {
return false, errorutils.CheckErrorf("multipart upload failed after %d attempts", mu.client.GetHttpClient().GetRetries())
}
err = mu.completeAndPollForStatus(logMsgPrefix, completionAttemptsLeft-1, sha1, multipartUploadClient, progressReader)
checksumToken, err = mu.completeAndPollForStatus(logMsgPrefix, completionAttemptsLeft-1, sha1, multipartUploadClient, progressReader)
}

// Log status
Expand All @@ -351,10 +351,11 @@ func (mu *MultipartUpload) pollCompletionStatus(logMsgPrefix string, completionA
lastMergeLog = time.Now()
}
}
checksumToken = status.ChecksumToken
return
},
}
return pollingExecutor.Execute()
return checksumToken, pollingExecutor.Execute()
}

func (mu *MultipartUpload) completeMultipartUpload(logMsgPrefix, sha1 string, multipartUploadClient *httputils.HttpClientDetails) error {
Expand Down Expand Up @@ -387,9 +388,10 @@ func (mu *MultipartUpload) status(logMsgPrefix string, multipartUploadClientWith
}

type statusResponse struct {
Status completionStatus `json:"status,omitempty"`
Error string `json:"error,omitempty"`
Progress *int `json:"progress,omitempty"`
Status completionStatus `json:"status,omitempty"`
Error string `json:"error,omitempty"`
Progress *int `json:"progress,omitempty"`
ChecksumToken string `json:"checksumToken,omitempty"`
}

func (mu *MultipartUpload) abort(logMsgPrefix string, multipartUploadClient *httputils.HttpClientDetails) (err error) {
Expand Down
27 changes: 14 additions & 13 deletions artifactory/services/utils/multipartupload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ import (
)

const (
localPath = "localPath"
repoKey = "repoKey"
repoPath = "repoPath"
partSize = SizeGiB
partSizeMB = 1024
partNumber = 2
splitCount = 3
token = "token"
partUrl = "http://dummy-url-part"
sha1 = "sha1"
nodeId = "nodeId"
localPath = "localPath"
repoKey = "repoKey"
repoPath = "repoPath"
partSize = SizeGiB
partSizeMB = 1024
partNumber = 2
splitCount = 3
token = "token"
partUrl = "http://dummy-url-part"
sha1 = "sha1"
nodeId = "nodeId"
checksumToken = "checksumToken"
)

func TestIsSupported(t *testing.T) {
Expand Down Expand Up @@ -209,7 +210,7 @@ func TestStatus(t *testing.T) {

// Send response 200 OK
w.WriteHeader(http.StatusOK)
response, err := json.Marshal(statusResponse{Status: finished, Progress: utils.Pointer(100)})
response, err := json.Marshal(statusResponse{Status: finished, Progress: utils.Pointer(100), ChecksumToken: checksumToken})
assert.NoError(t, err)
_, err = w.Write(response)
assert.NoError(t, err)
Expand All @@ -222,7 +223,7 @@ func TestStatus(t *testing.T) {
// Execute status
status, err := multipartUpload.status("", &httputils.HttpClientDetails{})
assert.NoError(t, err)
assert.Equal(t, statusResponse{Status: finished, Progress: utils.Pointer(100)}, status)
assert.Equal(t, statusResponse{Status: finished, Progress: utils.Pointer(100), ChecksumToken: checksumToken}, status)
}

func TestStatusServiceUnavailable(t *testing.T) {
Expand Down

0 comments on commit 904807c

Please sign in to comment.