Skip to content

Commit

Permalink
Feature: default task timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Dec 12, 2023
1 parent b5ce902 commit 19fa840
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 2 deletions.
5 changes: 3 additions & 2 deletions configs/sample.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ default = 1 # numbers of concurrent subscribers

# default task limits
[worker.limits]
cpus = "" # supports fractions
memory = "" # e.g. 100m
cpus = "" # supports fractions
memory = "" # e.g. 100m
timeout = "" # e.g. 3h


[mounts.bind]
Expand Down
1 change: 1 addition & 0 deletions engine/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (e *Engine) initWorker() error {
Limits: worker.Limits{
DefaultCPUsLimit: conf.String("worker.limits.cpus"),
DefaultMemoryLimit: conf.String("worker.limits.memory"),
DefaultTimeout: conf.String("worker.limits.timeout"),
},
Address: conf.String("worker.address"),
Middleware: e.cfg.Middleware.Task,
Expand Down
4 changes: 4 additions & 0 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Config struct {
type Limits struct {
DefaultCPUsLimit string
DefaultMemoryLimit string
DefaultTimeout string
}

type runningTask struct {
Expand Down Expand Up @@ -177,6 +178,9 @@ func (w *Worker) doRunTask(ctx context.Context, t *tork.Task) error {
if t.Limits != nil && t.Limits.Memory == "" {
t.Limits.Memory = w.limits.DefaultMemoryLimit
}
if t.Timeout == "" {
t.Timeout = w.limits.DefaultTimeout
}
// create timeout context -- if timeout is defined
rctx := ctx
if t.Timeout != "" {
Expand Down
113 changes: 113 additions & 0 deletions internal/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,3 +374,116 @@ func Test_sendHeartbeat(t *testing.T) {
<-heartbeats
assert.NoError(t, w.Stop())
}

func Test_handleTaskRunDefaultLimitExceeded(t *testing.T) {
rt, err := docker.NewDockerRuntime()
assert.NoError(t, err)

b := mq.NewInMemoryBroker()

w, err := NewWorker(Config{
Broker: b,
Runtime: rt,
Limits: Limits{
DefaultTimeout: "1s",
},
})
assert.NoError(t, err)
assert.NotNil(t, w)

errors := make(chan any)
err = b.SubscribeForTasks(mq.QUEUE_ERROR, func(tk *tork.Task) error {
assert.Contains(t, tk.Error, "context deadline exceeded")
close(errors)
return nil
})
assert.NoError(t, err)

starts := make(chan any)
err = b.SubscribeForTasks(mq.QUEUE_STARTED, func(tk *tork.Task) error {
assert.Equal(t, int32(1), atomic.LoadInt32(&w.taskCount))
close(starts)
return nil
})
assert.NoError(t, err)

err = w.Start()
assert.NoError(t, err)

t1 := &tork.Task{
ID: uuid.NewUUID(),
State: tork.TaskStateRunning,
Image: "ubuntu:mantic",
CMD: []string{"sleep", "5"},
Mounts: []tork.Mount{
{
Type: tork.MountTypeVolume,
Target: "/somevolume",
},
},
}

err = w.handleTask(t1)

<-starts
<-errors

assert.NoError(t, err)
assert.Equal(t, "/somevolume", t1.Mounts[0].Target)
}

func Test_handleTaskRunDefaultLimitOK(t *testing.T) {
rt, err := docker.NewDockerRuntime()
assert.NoError(t, err)

b := mq.NewInMemoryBroker()

w, err := NewWorker(Config{
Broker: b,
Runtime: rt,
Limits: Limits{
DefaultTimeout: "5s",
},
})
assert.NoError(t, err)
assert.NotNil(t, w)

completions := make(chan any)
err = b.SubscribeForTasks(mq.QUEUE_COMPLETED, func(tk *tork.Task) error {
close(completions)
return nil
})
assert.NoError(t, err)

starts := make(chan any)
err = b.SubscribeForTasks(mq.QUEUE_STARTED, func(tk *tork.Task) error {
assert.Equal(t, int32(1), atomic.LoadInt32(&w.taskCount))
close(starts)
return nil
})
assert.NoError(t, err)

err = w.Start()
assert.NoError(t, err)

t1 := &tork.Task{
ID: uuid.NewUUID(),
State: tork.TaskStateRunning,
Image: "ubuntu:mantic",
CMD: []string{"sleep", "1"},
Mounts: []tork.Mount{
{
Type: tork.MountTypeVolume,
Target: "/somevolume",
},
},
}

err = w.handleTask(t1)

<-starts
<-completions

assert.NoError(t, err)
assert.Equal(t, "/somevolume", t1.Mounts[0].Target)
}

0 comments on commit 19fa840

Please sign in to comment.