Skip to content

Commit

Permalink
implement task queue waiting for resources
Browse files Browse the repository at this point in the history
  • Loading branch information
neolynx committed Jun 6, 2024
1 parent 5ff4ec7 commit 2f20f12
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 50 deletions.
123 changes: 80 additions & 43 deletions task/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type List struct {
// resources currently used by running tasks
usedResources *ResourcesSet
idCounter int

queue chan *Task

Check failure on line 21 in task/list.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofmt`-ed with `-s` (gofmt)
queueWg *sync.WaitGroup
queueDone chan bool
}

// NewList creates empty task list
Expand All @@ -27,10 +31,79 @@ func NewList() *List {
wgTasks: make(map[int]*sync.WaitGroup),
wg: &sync.WaitGroup{},
usedResources: NewResourcesSet(),
queue: make(chan *Task, 0),
queueWg: &sync.WaitGroup{},
queueDone: make(chan bool),
}
go list.consumer()
return list
}

// consumer is processing the queue
func (list *List) consumer() {
for {
select {
case task := <-list.queue:
list.Lock()
{
list.usedResources.MarkInUse(task.resources, task)

list.wg.Add(1)
task.wgTask.Add(1)
}
list.Unlock()

go func() {
list.Lock()
{
task.State = RUNNING
}
list.Unlock()

retValue, err := task.process(aptly.Progress(task.output), task.detail)

list.Lock()
{
task.processReturnValue = retValue
if err != nil {
task.output.Printf("Task failed with error: %v", err)
task.State = FAILED
} else {
task.output.Print("Task succeeded")
task.State = SUCCEEDED
}

list.usedResources.Free(task.resources)

task.wgTask.Done()
list.wg.Done()

for _, t := range list.tasks {
if t.State == IDLE {
// check resources
blockingTasks := list.usedResources.UsedBy(t.resources)
if len(blockingTasks) == 0 {
list.queue <- t
break
}
}
}
}
list.Unlock()
}()

case <-list.queueDone:
return
}
}
}

// Stop signals the consumer to stop processing tasks and waits for it to finish
func (list *List) Stop() {
close(list.queueDone)
list.queueWg.Wait()
}

// GetTasks gets complete list of tasks
func (list *List) GetTasks() []Task {
tasks := []Task{}
Expand Down Expand Up @@ -123,55 +196,19 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P
list.Lock()
defer list.Unlock()

tasks := list.usedResources.UsedBy(resources)
for len(tasks) > 0 {
for _, task := range tasks {
list.Unlock()
list.wgTasks[task.ID].Wait()
list.Lock()
}
tasks = list.usedResources.UsedBy(resources)
}

list.idCounter++
wgTask := &sync.WaitGroup{}
task := NewTask(process, name, list.idCounter)
task := NewTask(process, name, list.idCounter, resources, wgTask)

list.tasks = append(list.tasks, task)
list.wgTasks[task.ID] = wgTask
list.usedResources.MarkInUse(resources, task)

list.wg.Add(1)
wgTask.Add(1)

go func() {

list.Lock()
{
task.State = RUNNING
}
list.Unlock()

retValue, err := process(aptly.Progress(task.output), task.detail)

list.Lock()
{
task.processReturnValue = retValue
if err != nil {
task.output.Printf("Task failed with error: %v", err)
task.State = FAILED
} else {
task.output.Print("Task succeeded")
task.State = SUCCEEDED
}

list.usedResources.Free(resources)

wgTask.Done()
list.wg.Done()
}
list.Unlock()
}()
// add task to queue for processing if resources are available
// if not, task will be queued by the consumer once resources are available
tasks := list.usedResources.UsedBy(resources)
if len(tasks) == 0 {
list.queue <- task
}

return *task, nil
}
Expand Down
19 changes: 12 additions & 7 deletions task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package task

import (
"sync/atomic"

Check failure on line 4 in task/task.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofmt`-ed with `-s` (gofmt)
"sync"

"github.com/aptly-dev/aptly/aptly"
)
Expand Down Expand Up @@ -49,17 +50,21 @@ type Task struct {
Name string
ID int
State State
resources []string
wgTask *sync.WaitGroup
}

// NewTask creates new task
func NewTask(process Process, name string, ID int) *Task {
func NewTask(process Process, name string, ID int, resources []string, wgTask *sync.WaitGroup) *Task {
task := &Task{
output: NewOutput(),
detail: &Detail{},
process: process,
Name: name,
ID: ID,
State: IDLE,
output: NewOutput(),
detail: &Detail{},
process: process,
Name: name,
ID: ID,
State: IDLE,
resources: resources,
wgTask: wgTask,
}
return task
}

0 comments on commit 2f20f12

Please sign in to comment.