Skip to content

Commit

Permalink
Showing 6 changed files with 85 additions and 23 deletions.
6 changes: 3 additions & 3 deletions artifactory/commands/transferfiles/manager.go
Original file line number Diff line number Diff line change
@@ -205,8 +205,8 @@ type producerConsumerWrapper struct {
}

func newProducerConsumerWrapper() producerConsumerWrapper {
chunkUploaderProducerConsumer := parallel.NewRunner(GetThreads(), tasksMaxCapacity, false)
chunkBuilderProducerConsumer := parallel.NewRunner(GetThreads(), tasksMaxCapacity, false)
chunkUploaderProducerConsumer := parallel.NewRunner(GetChunkUploaderThreads(), tasksMaxCapacity, false)
chunkBuilderProducerConsumer := parallel.NewRunner(GetChunkBuilderThreads(), tasksMaxCapacity, false)
chunkUploaderProducerConsumer.SetFinishedNotification(true)
chunkBuilderProducerConsumer.SetFinishedNotification(true)
errorsQueue := clientUtils.NewErrorsQueue(1)
@@ -310,7 +310,7 @@ func pollUploads(phaseBase *phaseBase, srcUpService *srcUserPluginService, uploa

// Fill chunk data batch till full. Return if no new chunk data is available.
func fillChunkDataBatch(chunksLifeCycleManager *ChunksLifeCycleManager, uploadChunkChan chan UploadedChunk) {
for chunksLifeCycleManager.totalChunks < GetThreads() {
for chunksLifeCycleManager.totalChunks < GetChunkUploaderThreads() {
select {
case data := <-uploadChunkChan:
currentNodeId := nodeId(data.NodeId)
9 changes: 5 additions & 4 deletions artifactory/commands/transferfiles/transfer.go
Original file line number Diff line number Diff line change
@@ -588,19 +588,20 @@ func (tdc *TransferFilesCommand) getAllLocalRepos(serverDetails *config.ServerDe

func (tdc *TransferFilesCommand) initCurThreads(buildInfoRepo bool) error {
// Use default threads if settings file doesn't exist or an error occurred.
curThreads = utils.DefaultThreads
curChunkUploaderThreads = utils.DefaultThreads
curChunkBuilderThreads = utils.DefaultThreads
settings, err := utils.LoadTransferSettings()
if err != nil {
return err
}
if settings != nil {
curThreads = settings.CalcNumberOfThreads(buildInfoRepo)
if buildInfoRepo && curThreads < settings.ThreadsNumber {
curChunkBuilderThreads, curChunkUploaderThreads = settings.CalcNumberOfThreads(buildInfoRepo)
if buildInfoRepo && curChunkUploaderThreads < settings.ThreadsNumber {
log.Info("Build info transferring - using reduced number of threads")
}
}

log.Info("Running with maximum", strconv.Itoa(curThreads), "working threads...")
log.Info("Running with maximum", strconv.Itoa(curChunkUploaderThreads), "working threads...")
return nil
}

3 changes: 2 additions & 1 deletion artifactory/commands/transferfiles/transfer_test.go
Original file line number Diff line number Diff line change
@@ -203,7 +203,8 @@ func TestUploadChunkAndPollUploads(t *testing.T) {

// Sends chunk to upload, polls on chunk three times - once when it is still in progress, once after done received and once to notify back to the source.
func uploadChunkAndPollTwice(t *testing.T, phaseBase *phaseBase, fileSample api.FileRepresentation) {
curThreads = 8
curChunkUploaderThreads = coreUtils.DefaultThreads
curChunkBuilderThreads = coreUtils.DefaultThreads
uploadChunksChan := make(chan UploadedChunk, 3)
doneChan := make(chan bool, 1)
var runWaitGroup sync.WaitGroup
31 changes: 20 additions & 11 deletions artifactory/commands/transferfiles/utils.go
Original file line number Diff line number Diff line change
@@ -48,7 +48,8 @@ type (
)

var AqlPaginationLimit = DefaultAqlPaginationLimit
var curThreads int
var curChunkBuilderThreads int
var curChunkUploaderThreads int

type UploadedChunk struct {
api.UploadChunkResponse
@@ -190,7 +191,7 @@ var processedUploadChunksMutex sync.Mutex
func incrCurProcessedChunksWhenPossible() bool {
processedUploadChunksMutex.Lock()
defer processedUploadChunksMutex.Unlock()
if curProcessedUploadChunks < GetThreads() {
if curProcessedUploadChunks < GetChunkUploaderThreads() {
curProcessedUploadChunks++
return true
}
@@ -318,8 +319,12 @@ func newUploadedChunkStruct(uploadChunkResponse api.UploadChunkResponse, chunk a
}
}

func GetThreads() int {
return curThreads
func GetChunkBuilderThreads() int {
return curChunkBuilderThreads
}

func GetChunkUploaderThreads() int {
return curChunkUploaderThreads
}

// Periodically reads settings file and updates the number of threads.
@@ -349,16 +354,20 @@ func updateThreads(pcWrapper *producerConsumerWrapper, buildInfoRepo bool) error
if err != nil || settings == nil {
return err
}
calculatedNumberOfThreads := settings.CalcNumberOfThreads(buildInfoRepo)
if curThreads != calculatedNumberOfThreads {
calculatedChunkBuilderThreads, calculatedChunkUploaderThreads := settings.CalcNumberOfThreads(buildInfoRepo)
if curChunkUploaderThreads != calculatedChunkUploaderThreads {
if pcWrapper != nil {
updateProducerConsumerMaxParallel(pcWrapper.chunkBuilderProducerConsumer, calculatedNumberOfThreads)
updateProducerConsumerMaxParallel(pcWrapper.chunkUploaderProducerConsumer, calculatedNumberOfThreads)
if curChunkBuilderThreads != calculatedChunkBuilderThreads {
updateProducerConsumerMaxParallel(pcWrapper.chunkBuilderProducerConsumer, calculatedChunkBuilderThreads)
}
updateProducerConsumerMaxParallel(pcWrapper.chunkUploaderProducerConsumer, calculatedChunkUploaderThreads)
}
log.Info(fmt.Sprintf("Number of threads have been updated to %s (was %s).", strconv.Itoa(calculatedNumberOfThreads), strconv.Itoa(curThreads)))
curThreads = calculatedNumberOfThreads
log.Info(fmt.Sprintf("Number of threads has been updated to %s (was %s).", strconv.Itoa(calculatedChunkUploaderThreads), strconv.Itoa(curChunkUploaderThreads)))
curChunkBuilderThreads = calculatedChunkBuilderThreads
curChunkUploaderThreads = calculatedChunkUploaderThreads
} else {
log.Debug("No change to the number of threads have been detected.")
log.Debug(fmt.Sprintf("No change to the number of threads has been detected. Max chunks builder threads: %d. Max chunks uploader threads: %d.",
calculatedChunkBuilderThreads, calculatedChunkUploaderThreads))
}
return nil
}
42 changes: 42 additions & 0 deletions artifactory/commands/transferfiles/utils_test.go
Original file line number Diff line number Diff line change
@@ -15,10 +15,13 @@ import (

"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/api"
"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/state"
artifactoryutils "github.com/jfrog/jfrog-cli-core/v2/artifactory/utils"
"github.com/jfrog/jfrog-cli-core/v2/utils/config"
"github.com/jfrog/jfrog-cli-core/v2/utils/tests"
"github.com/jfrog/jfrog-client-go/artifactory/services"
"github.com/jfrog/jfrog-client-go/artifactory/services/utils"
"github.com/jfrog/jfrog-client-go/utils/log"
clientutilstests "github.com/jfrog/jfrog-client-go/utils/tests"
"github.com/stretchr/testify/assert"
)

@@ -391,3 +394,42 @@ func createUniqueFileAndAssertCounter(t *testing.T, tmpDir, prefix string, expec
assert.NoError(t, os.WriteFile(filePath, nil, 0644))
assert.True(t, strings.HasSuffix(filePath, strconv.Itoa(expectedCounter)+".json"))
}

var updateThreadsProvider = []struct {
threadsNumber int
expectedChunkBuilderThreads int
expectedChunkUploaderThreads int
buildInfo bool
}{
{artifactoryutils.DefaultThreads - 1, artifactoryutils.DefaultThreads - 1, artifactoryutils.DefaultThreads - 1, false},
{artifactoryutils.DefaultThreads, artifactoryutils.DefaultThreads, artifactoryutils.DefaultThreads, false},
{artifactoryutils.MaxBuildInfoThreads + 1, artifactoryutils.MaxBuildInfoThreads + 1, artifactoryutils.MaxBuildInfoThreads + 1, false},
{artifactoryutils.MaxChunkBuilderThreads + 1, artifactoryutils.MaxChunkBuilderThreads, artifactoryutils.MaxChunkBuilderThreads + 1, false},

{artifactoryutils.DefaultThreads - 1, artifactoryutils.DefaultThreads - 1, artifactoryutils.DefaultThreads - 1, true},
{artifactoryutils.DefaultThreads, artifactoryutils.DefaultThreads, artifactoryutils.DefaultThreads, true},
{artifactoryutils.MaxBuildInfoThreads + 1, artifactoryutils.MaxBuildInfoThreads, artifactoryutils.MaxBuildInfoThreads, true},
{artifactoryutils.MaxChunkBuilderThreads + 1, artifactoryutils.MaxBuildInfoThreads, artifactoryutils.MaxBuildInfoThreads, true},
}

func TestUpdateThreads(t *testing.T) {
cleanUpJfrogHome, err := tests.SetJfrogHome()
assert.NoError(t, err)
defer cleanUpJfrogHome()

previousLog := clientutilstests.RedirectLogOutputToNil()
defer func() {
log.SetLogger(previousLog)
}()

for _, testCase := range updateThreadsProvider {
t.Run(strconv.Itoa(testCase.threadsNumber)+" Build Info: "+strconv.FormatBool(testCase.buildInfo), func(t *testing.T) {
transferSettings := &artifactoryutils.TransferSettings{ThreadsNumber: testCase.threadsNumber}
assert.NoError(t, artifactoryutils.SaveTransferSettings(transferSettings))

assert.NoError(t, updateThreads(nil, testCase.buildInfo))
assert.Equal(t, testCase.expectedChunkBuilderThreads, curChunkBuilderThreads)
assert.Equal(t, testCase.expectedChunkUploaderThreads, curChunkUploaderThreads)
})
}
}
17 changes: 13 additions & 4 deletions artifactory/utils/transfersettings.go
Original file line number Diff line number Diff line change
@@ -14,8 +14,11 @@ import (

const (
// DefaultThreads is the default number of threads working while transferring Artifactory's data
DefaultThreads = 8
DefaultThreads = 8
// Maximum working threads allowed to execute the AQL queries and upload chunks for build-info repositories
MaxBuildInfoThreads = 8
// Maximum working threads allowed to execute the AQL queries
MaxChunkBuilderThreads = 16

transferSettingsFile = "transfer.conf"
transferSettingsLockFile = "transfer-settings"
@@ -25,11 +28,17 @@ type TransferSettings struct {
ThreadsNumber int `json:"threadsNumber,omitempty"`
}

func (ts *TransferSettings) CalcNumberOfThreads(buildInfoRepo bool) int {
func (ts *TransferSettings) CalcNumberOfThreads(buildInfoRepo bool) (chunkBuilderThreads, chunkUploaderThreads int) {
chunkBuilderThreads = ts.ThreadsNumber
chunkUploaderThreads = ts.ThreadsNumber
if buildInfoRepo && MaxBuildInfoThreads < ts.ThreadsNumber {
return MaxBuildInfoThreads
chunkBuilderThreads = MaxBuildInfoThreads
chunkUploaderThreads = MaxBuildInfoThreads
}
return ts.ThreadsNumber
if MaxChunkBuilderThreads < chunkBuilderThreads {
chunkBuilderThreads = MaxChunkBuilderThreads
}
return
}

func LoadTransferSettings() (settings *TransferSettings, err error) {

0 comments on commit 6325ab6

Please sign in to comment.