Skip to content

Commit

Permalink
Feature: job tags
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Jun 9, 2024
1 parent 2020071 commit e237183
Show file tree
Hide file tree
Showing 10 changed files with 233 additions and 36 deletions.
30 changes: 28 additions & 2 deletions datastore/inmemory/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/runabol/tork/datastore"
"github.com/runabol/tork/internal/cache"
"github.com/runabol/tork/internal/slices"
)

const (
Expand Down Expand Up @@ -255,11 +256,36 @@ func (ds *InMemoryDatastore) GetActiveTasks(ctx context.Context, jobID string) (
}

func (ds *InMemoryDatastore) GetJobs(ctx context.Context, q string, page, size int) (*datastore.Page[*tork.JobSummary], error) {
parseQuery := func(query string) (string, []string) {
terms := []string{}
tags := []string{}
parts := strings.Fields(query)
for _, part := range parts {
if strings.HasPrefix(part, "tag:") {
tags = append(tags, strings.TrimPrefix(part, "tag:"))
} else if strings.HasPrefix(part, "tags:") {
tags = append(tags, strings.Split(strings.TrimPrefix(part, "tag:"), ",")...)
} else {
terms = append(terms, part)
}
}
return strings.Join(terms, " "), tags
}

searchTerm, tags := parseQuery(q)
offset := (page - 1) * size
filtered := make([]*tork.Job, 0)
ds.jobs.Iterate(func(_ string, j *tork.Job) {
if strings.Contains(strings.ToLower(j.Name), strings.ToLower(q)) ||
strings.Contains(strings.ToLower(string(j.State)), strings.ToLower(q)) {
if searchTerm != "" {
if strings.Contains(strings.ToLower(j.Name), strings.ToLower(searchTerm)) ||
strings.Contains(strings.ToLower(string(j.State)), strings.ToLower(searchTerm)) {
filtered = append(filtered, j)
}
} else if len(tags) > 0 {
if slices.Intersect(j.Tags, tags) {
filtered = append(filtered, j)
}
} else {
filtered = append(filtered, j)
}
})
Expand Down
64 changes: 64 additions & 0 deletions datastore/inmemory/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,3 +508,67 @@ func TestInMemoryGetJobLogParts(t *testing.T) {
assert.Equal(t, "line 2", logs.Items[0].Contents)
assert.Equal(t, 1, logs.TotalPages)
}

func TestInMemorySearchJobs(t *testing.T) {
ctx := context.Background()
ds := inmemory.NewInMemoryDatastore()
for i := 0; i < 101; i++ {
j1 := tork.Job{
ID: uuid.NewUUID(),
Name: fmt.Sprintf("Job %d", (i + 1)),
State: tork.JobStateRunning,
Tasks: []*tork.Task{
{
Name: "some task",
},
},
Tags: []string{fmt.Sprintf("tag-%d", i)},
}
err := ds.CreateJob(ctx, &j1)
assert.NoError(t, err)

now := time.Now().UTC()
err = ds.CreateTask(ctx, &tork.Task{
ID: uuid.NewUUID(),
JobID: j1.ID,
State: tork.TaskStateRunning,
CreatedAt: &now,
})
assert.NoError(t, err)
}

p1, err := ds.GetJobs(ctx, "", 1, 10)
assert.NoError(t, err)
assert.Equal(t, 10, p1.Size)
assert.Equal(t, 101, p1.TotalItems)

p1, err = ds.GetJobs(ctx, "101", 1, 10)
assert.NoError(t, err)
assert.Equal(t, 1, p1.Size)
assert.Equal(t, 1, p1.TotalItems)

p1, err = ds.GetJobs(ctx, "tag:tag-1", 1, 10)
assert.NoError(t, err)
assert.Equal(t, 1, p1.Size)
assert.Equal(t, 1, p1.TotalItems)

p1, err = ds.GetJobs(ctx, "tag:not-a-tag", 1, 10)
assert.NoError(t, err)
assert.Equal(t, 0, p1.Size)
assert.Equal(t, 0, p1.TotalItems)

p1, err = ds.GetJobs(ctx, "tags:not-a-tag,tag-1", 1, 10)
assert.NoError(t, err)
assert.Equal(t, 1, p1.Size)
assert.Equal(t, 1, p1.TotalItems)

p1, err = ds.GetJobs(ctx, "Job", 1, 10)
assert.NoError(t, err)
assert.Equal(t, 10, p1.Size)
assert.Equal(t, 101, p1.TotalItems)

p1, err = ds.GetJobs(ctx, "running", 1, 10)
assert.NoError(t, err)
assert.Equal(t, 10, p1.Size)
assert.Equal(t, 101, p1.TotalItems)
}
48 changes: 36 additions & 12 deletions datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,16 @@ func (ds *PostgresDatastore) CreateJob(ctx context.Context, j *tork.Job) error {
if err != nil {
return errors.Wrapf(err, "failed to serialize job.webhooks")
}
if j.Tags == nil {
j.Tags = make([]string, 0)
}
q := `insert into jobs
(id,name,description,state,created_at,started_at,tasks,position,
inputs,context,parent_id,task_count,output_,result,error_,defaults,webhooks,created_by)
inputs,context,parent_id,task_count,output_,result,error_,defaults,webhooks,created_by,tags)
values
($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18)`
($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19)`
_, err = ds.exec(q, j.ID, j.Name, j.Description, j.State, j.CreatedAt, j.StartedAt, tasks, j.Position,
inputs, c, j.ParentID, j.TaskCount, j.Output, j.Result, j.Error, defaults, webhooks, j.CreatedBy.ID)
inputs, c, j.ParentID, j.TaskCount, j.Output, j.Result, j.Error, defaults, webhooks, j.CreatedBy.ID, pq.StringArray(j.Tags))
if err != nil {
return errors.Wrapf(err, "error inserting job to the db")
}
Expand Down Expand Up @@ -746,16 +749,36 @@ func (ds *PostgresDatastore) GetJobLogParts(ctx context.Context, jobID string, p
}

func (ds *PostgresDatastore) GetJobs(ctx context.Context, q string, page, size int) (*datastore.Page[*tork.JobSummary], error) {
parseQuery := func(query string) (string, []string) {
terms := []string{}
tags := []string{}
parts := strings.Fields(query)
for _, part := range parts {
if strings.HasPrefix(part, "tag:") {
tags = append(tags, strings.TrimPrefix(part, "tag:"))
} else if strings.HasPrefix(part, "tags:") {
tags = append(tags, strings.Split(strings.TrimPrefix(part, "tags:"), ",")...)
} else {
terms = append(terms, part)
}
}
return strings.Join(terms, " "), tags
}

searchTerm, tags := parseQuery(q)

offset := (page - 1) * size
rs := make([]jobRecord, 0)
qry := fmt.Sprintf(`
SELECT *
FROM jobs
where
case when $1 != '' then ts @@ plainto_tsquery('english', $1) else true end
WHERE
CASE WHEN $1 != '' THEN ts @@ plainto_tsquery('english', $1) ELSE TRUE END
AND
CASE WHEN array_length($2::text[], 1) > 0 THEN tags && $2 ELSE TRUE END
ORDER BY created_at DESC
OFFSET %d LIMIT %d`, offset, size)
if err := ds.select_(&rs, qry, q); err != nil {
if err := ds.select_(&rs, qry, searchTerm, pq.StringArray(tags)); err != nil {
return nil, errors.Wrapf(err, "error getting a page of jobs")
}
result := make([]*tork.JobSummary, len(rs))
Expand All @@ -773,12 +796,13 @@ func (ds *PostgresDatastore) GetJobs(ctx context.Context, q string, page, size i

var count *int
if err := ds.get(&count, `
select count(*)
from jobs
where case when $1 != ''
then ts @@ plainto_tsquery('english', $1)
else true
end`, q); err != nil {
SELECT count(*)
FROM jobs
WHERE
CASE WHEN $1 != '' THEN ts @@ plainto_tsquery('english', $1) ELSE TRUE
AND
CASE WHEN array_length($2::text[], 1) > 0 THEN tags && $2 ELSE TRUE END
END`, searchTerm, pq.StringArray(tags)); err != nil {
return nil, errors.Wrapf(err, "error getting the jobs count")
}

Expand Down
31 changes: 30 additions & 1 deletion datastore/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestPostgresCreateJob(t *testing.T) {
j1 := tork.Job{
ID: uuid.NewUUID(),
CreatedBy: u,
Tags: []string{"tag-a", "tag-b"},
}
err = ds.CreateJob(ctx, &j1)
assert.NoError(t, err)
Expand All @@ -91,6 +92,7 @@ func TestPostgresCreateJob(t *testing.T) {
j2, err := ds.GetJobByID(ctx, j1.ID)
assert.NoError(t, err)
assert.Equal(t, u.Username, j2.CreatedBy.Username)
assert.Equal(t, []string{"tag-a", "tag-b"}, j2.Tags)
}

func TestPostgresCreateAndGetParallelTask(t *testing.T) {
Expand Down Expand Up @@ -655,6 +657,7 @@ func TestPostgresSearchJobs(t *testing.T) {
Name: "some task",
},
},
Tags: []string{fmt.Sprintf("tag-%d", i)},
}
err := ds.CreateJob(ctx, &j1)
assert.NoError(t, err)
Expand All @@ -668,7 +671,33 @@ func TestPostgresSearchJobs(t *testing.T) {
})
assert.NoError(t, err)
}
p1, err := ds.GetJobs(ctx, "101", 1, 10)

p1, err := ds.GetJobs(ctx, "", 1, 10)
assert.NoError(t, err)
assert.Equal(t, 10, p1.Size)
assert.Equal(t, 101, p1.TotalItems)

p1, err = ds.GetJobs(ctx, "101", 1, 10)
assert.NoError(t, err)
assert.Equal(t, 1, p1.Size)
assert.Equal(t, 1, p1.TotalItems)

p1, err = ds.GetJobs(ctx, "tag:tag-1", 1, 10)
assert.NoError(t, err)
assert.Equal(t, 1, p1.Size)
assert.Equal(t, 1, p1.TotalItems)

p1, err = ds.GetJobs(ctx, "tag:not-a-tag", 1, 10)
assert.NoError(t, err)
assert.Equal(t, 0, p1.Size)
assert.Equal(t, 0, p1.TotalItems)

p1, err = ds.GetJobs(ctx, "tags:not-a-tag,tag-1", 1, 10)
assert.NoError(t, err)
assert.Equal(t, 1, p1.Size)
assert.Equal(t, 1, p1.TotalItems)

p1, err = ds.GetJobs(ctx, "tags:tag-1", 1, 10)
assert.NoError(t, err)
assert.Equal(t, 1, p1.Size)
assert.Equal(t, 1, p1.TotalItems)
Expand Down
44 changes: 23 additions & 21 deletions datastore/postgres/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,28 @@ type taskRecord struct {
}

type jobRecord struct {
ID string `db:"id"`
Name string `db:"name"`
Description string `db:"description"`
State string `db:"state"`
CreatedAt time.Time `db:"created_at"`
CreatedBy string `db:"created_by"`
StartedAt *time.Time `db:"started_at"`
CompletedAt *time.Time `db:"completed_at"`
FailedAt *time.Time `db:"failed_at"`
Tasks []byte `db:"tasks"`
Position int `db:"position"`
Inputs []byte `db:"inputs"`
Context []byte `db:"context"`
ParentID string `db:"parent_id"`
TaskCount int `db:"task_count"`
Output string `db:"output_"`
Result string `db:"result"`
Error string `db:"error_"`
TS string `db:"ts"`
Defaults []byte `db:"defaults"`
Webhooks []byte `db:"webhooks"`
ID string `db:"id"`
Name string `db:"name"`
Description string `db:"description"`
Tags pq.StringArray `db:"tags"`
State string `db:"state"`
CreatedAt time.Time `db:"created_at"`
CreatedBy string `db:"created_by"`
StartedAt *time.Time `db:"started_at"`
CompletedAt *time.Time `db:"completed_at"`
FailedAt *time.Time `db:"failed_at"`
Tasks []byte `db:"tasks"`
Position int `db:"position"`
Inputs []byte `db:"inputs"`
Context []byte `db:"context"`
ParentID string `db:"parent_id"`
TaskCount int `db:"task_count"`
Output string `db:"output_"`
Result string `db:"result"`
Error string `db:"error_"`
TS string `db:"ts"`
Defaults []byte `db:"defaults"`
Webhooks []byte `db:"webhooks"`
}

type nodeRecord struct {
Expand Down Expand Up @@ -275,6 +276,7 @@ func (r jobRecord) toJob(tasks, execution []*tork.Task, createdBy *tork.User) (*
return &tork.Job{
ID: r.ID,
Name: r.Name,
Tags: r.Tags,
State: tork.JobState(r.State),
CreatedAt: r.CreatedAt,
CreatedBy: createdBy,
Expand Down
2 changes: 2 additions & 0 deletions db/postgres/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ insert into users (SELECT REPLACE(gen_random_uuid()::text, '-', ''),'Guest','gue
CREATE TABLE jobs (
id varchar(32) not null primary key,
name varchar(256),
tags text[] not null default '{}',
state varchar(10) not null,
created_at timestamp not null,
created_by varchar(32) not null references users(id),
Expand Down Expand Up @@ -62,6 +63,7 @@ ALTER TABLE jobs ADD COLUMN ts tsvector NOT NULL
CREATE INDEX jobs_ts_idx ON jobs USING GIN (ts);
create index jobs_tags_idx on jobs using gin (tags);
CREATE TABLE tasks (
id varchar(32) not null primary key,
Expand Down
2 changes: 2 additions & 0 deletions input/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Job struct {
id string
Name string `json:"name,omitempty" yaml:"name,omitempty" validate:"required"`
Description string `json:"description,omitempty" yaml:"description,omitempty"`
Tags []string `json:"tags,omitempty" yaml:"tags,omitempty"`
Tasks []Task `json:"tasks,omitempty" yaml:"tasks,omitempty" validate:"required,min=1,dive"`
Inputs map[string]string `json:"inputs,omitempty" yaml:"inputs,omitempty"`
Output string `json:"output,omitempty" yaml:"output,omitempty" validate:"expr"`
Expand Down Expand Up @@ -46,6 +47,7 @@ func (ji *Job) ToJob() *tork.Job {
j.ID = ji.ID()
j.Description = ji.Description
j.Inputs = ji.Inputs
j.Tags = ji.Tags
j.Name = ji.Name
tasks := make([]*tork.Task, len(ji.Tasks))
for i, ti := range ji.Tasks {
Expand Down
17 changes: 17 additions & 0 deletions internal/slices/slices.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package slices

func Intersect[T comparable](a []T, b []T) bool {
elements := make(map[T]struct{})

for _, item := range a {
elements[item] = struct{}{}
}

for _, item := range b {
if _, found := elements[item]; found {
return true
}
}

return false
}
27 changes: 27 additions & 0 deletions internal/slices/slirces_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package slices

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestHasIntersection(t *testing.T) {
tests := []struct {
slice1 []int
slice2 []int
want bool
}{
{slice1: []int{1, 2, 3, 4, 5}, slice2: []int{4, 5, 6, 7, 8}, want: true},
{slice1: []int{1, 2, 3, 4, 5}, slice2: []int{6, 7, 8, 9, 10}, want: false},
{slice1: []int{}, slice2: []int{1, 2, 3}, want: false},
{slice1: []int{1, 2, 3}, slice2: []int{}, want: false},
{slice1: []int{}, slice2: []int{}, want: false},
{slice1: []int{1, 2, 3}, slice2: []int{3, 4, 5}, want: true},
}

for _, tt := range tests {
got := Intersect(tt.slice1, tt.slice2)
assert.Equal(t, tt.want, got)
}
}
Loading

0 comments on commit e237183

Please sign in to comment.