diff --git a/datastore/inmemory/inmemory.go b/datastore/inmemory/inmemory.go index b306ae9e..d031205b 100644 --- a/datastore/inmemory/inmemory.go +++ b/datastore/inmemory/inmemory.go @@ -13,6 +13,7 @@ import ( "github.com/runabol/tork/datastore" "github.com/runabol/tork/internal/cache" + "github.com/runabol/tork/internal/slices" ) const ( @@ -255,11 +256,36 @@ func (ds *InMemoryDatastore) GetActiveTasks(ctx context.Context, jobID string) ( } func (ds *InMemoryDatastore) GetJobs(ctx context.Context, q string, page, size int) (*datastore.Page[*tork.JobSummary], error) { + parseQuery := func(query string) (string, []string) { + terms := []string{} + tags := []string{} + parts := strings.Fields(query) + for _, part := range parts { + if strings.HasPrefix(part, "tag:") { + tags = append(tags, strings.TrimPrefix(part, "tag:")) + } else if strings.HasPrefix(part, "tags:") { + tags = append(tags, strings.Split(strings.TrimPrefix(part, "tag:"), ",")...) + } else { + terms = append(terms, part) + } + } + return strings.Join(terms, " "), tags + } + + searchTerm, tags := parseQuery(q) offset := (page - 1) * size filtered := make([]*tork.Job, 0) ds.jobs.Iterate(func(_ string, j *tork.Job) { - if strings.Contains(strings.ToLower(j.Name), strings.ToLower(q)) || - strings.Contains(strings.ToLower(string(j.State)), strings.ToLower(q)) { + if searchTerm != "" { + if strings.Contains(strings.ToLower(j.Name), strings.ToLower(searchTerm)) || + strings.Contains(strings.ToLower(string(j.State)), strings.ToLower(searchTerm)) { + filtered = append(filtered, j) + } + } else if len(tags) > 0 { + if slices.Intersect(j.Tags, tags) { + filtered = append(filtered, j) + } + } else { filtered = append(filtered, j) } }) diff --git a/datastore/inmemory/inmemory_test.go b/datastore/inmemory/inmemory_test.go index 137a9abf..4fa4f1c3 100644 --- a/datastore/inmemory/inmemory_test.go +++ b/datastore/inmemory/inmemory_test.go @@ -508,3 +508,67 @@ func TestInMemoryGetJobLogParts(t *testing.T) { assert.Equal(t, "line 2", logs.Items[0].Contents) assert.Equal(t, 1, logs.TotalPages) } + +func TestInMemorySearchJobs(t *testing.T) { + ctx := context.Background() + ds := inmemory.NewInMemoryDatastore() + for i := 0; i < 101; i++ { + j1 := tork.Job{ + ID: uuid.NewUUID(), + Name: fmt.Sprintf("Job %d", (i + 1)), + State: tork.JobStateRunning, + Tasks: []*tork.Task{ + { + Name: "some task", + }, + }, + Tags: []string{fmt.Sprintf("tag-%d", i)}, + } + err := ds.CreateJob(ctx, &j1) + assert.NoError(t, err) + + now := time.Now().UTC() + err = ds.CreateTask(ctx, &tork.Task{ + ID: uuid.NewUUID(), + JobID: j1.ID, + State: tork.TaskStateRunning, + CreatedAt: &now, + }) + assert.NoError(t, err) + } + + p1, err := ds.GetJobs(ctx, "", 1, 10) + assert.NoError(t, err) + assert.Equal(t, 10, p1.Size) + assert.Equal(t, 101, p1.TotalItems) + + p1, err = ds.GetJobs(ctx, "101", 1, 10) + assert.NoError(t, err) + assert.Equal(t, 1, p1.Size) + assert.Equal(t, 1, p1.TotalItems) + + p1, err = ds.GetJobs(ctx, "tag:tag-1", 1, 10) + assert.NoError(t, err) + assert.Equal(t, 1, p1.Size) + assert.Equal(t, 1, p1.TotalItems) + + p1, err = ds.GetJobs(ctx, "tag:not-a-tag", 1, 10) + assert.NoError(t, err) + assert.Equal(t, 0, p1.Size) + assert.Equal(t, 0, p1.TotalItems) + + p1, err = ds.GetJobs(ctx, "tags:not-a-tag,tag-1", 1, 10) + assert.NoError(t, err) + assert.Equal(t, 1, p1.Size) + assert.Equal(t, 1, p1.TotalItems) + + p1, err = ds.GetJobs(ctx, "Job", 1, 10) + assert.NoError(t, err) + assert.Equal(t, 10, p1.Size) + assert.Equal(t, 101, p1.TotalItems) + + p1, err = ds.GetJobs(ctx, "running", 1, 10) + assert.NoError(t, err) + assert.Equal(t, 10, p1.Size) + assert.Equal(t, 101, p1.TotalItems) +} diff --git a/datastore/postgres/postgres.go b/datastore/postgres/postgres.go index b542da31..d4ce8c7e 100644 --- a/datastore/postgres/postgres.go +++ b/datastore/postgres/postgres.go @@ -522,13 +522,16 @@ func (ds *PostgresDatastore) CreateJob(ctx context.Context, j *tork.Job) error { if err != nil { return errors.Wrapf(err, "failed to serialize job.webhooks") } + if j.Tags == nil { + j.Tags = make([]string, 0) + } q := `insert into jobs (id,name,description,state,created_at,started_at,tasks,position, - inputs,context,parent_id,task_count,output_,result,error_,defaults,webhooks,created_by) + inputs,context,parent_id,task_count,output_,result,error_,defaults,webhooks,created_by,tags) values - ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18)` + ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19)` _, err = ds.exec(q, j.ID, j.Name, j.Description, j.State, j.CreatedAt, j.StartedAt, tasks, j.Position, - inputs, c, j.ParentID, j.TaskCount, j.Output, j.Result, j.Error, defaults, webhooks, j.CreatedBy.ID) + inputs, c, j.ParentID, j.TaskCount, j.Output, j.Result, j.Error, defaults, webhooks, j.CreatedBy.ID, pq.StringArray(j.Tags)) if err != nil { return errors.Wrapf(err, "error inserting job to the db") } @@ -746,16 +749,36 @@ func (ds *PostgresDatastore) GetJobLogParts(ctx context.Context, jobID string, p } func (ds *PostgresDatastore) GetJobs(ctx context.Context, q string, page, size int) (*datastore.Page[*tork.JobSummary], error) { + parseQuery := func(query string) (string, []string) { + terms := []string{} + tags := []string{} + parts := strings.Fields(query) + for _, part := range parts { + if strings.HasPrefix(part, "tag:") { + tags = append(tags, strings.TrimPrefix(part, "tag:")) + } else if strings.HasPrefix(part, "tags:") { + tags = append(tags, strings.Split(strings.TrimPrefix(part, "tags:"), ",")...) + } else { + terms = append(terms, part) + } + } + return strings.Join(terms, " "), tags + } + + searchTerm, tags := parseQuery(q) + offset := (page - 1) * size rs := make([]jobRecord, 0) qry := fmt.Sprintf(` SELECT * FROM jobs - where - case when $1 != '' then ts @@ plainto_tsquery('english', $1) else true end + WHERE + CASE WHEN $1 != '' THEN ts @@ plainto_tsquery('english', $1) ELSE TRUE END + AND + CASE WHEN array_length($2::text[], 1) > 0 THEN tags && $2 ELSE TRUE END ORDER BY created_at DESC OFFSET %d LIMIT %d`, offset, size) - if err := ds.select_(&rs, qry, q); err != nil { + if err := ds.select_(&rs, qry, searchTerm, pq.StringArray(tags)); err != nil { return nil, errors.Wrapf(err, "error getting a page of jobs") } result := make([]*tork.JobSummary, len(rs)) @@ -773,12 +796,13 @@ func (ds *PostgresDatastore) GetJobs(ctx context.Context, q string, page, size i var count *int if err := ds.get(&count, ` - select count(*) - from jobs - where case when $1 != '' - then ts @@ plainto_tsquery('english', $1) - else true - end`, q); err != nil { + SELECT count(*) + FROM jobs + WHERE + CASE WHEN $1 != '' THEN ts @@ plainto_tsquery('english', $1) ELSE TRUE + AND + CASE WHEN array_length($2::text[], 1) > 0 THEN tags && $2 ELSE TRUE END + END`, searchTerm, pq.StringArray(tags)); err != nil { return nil, errors.Wrapf(err, "error getting the jobs count") } diff --git a/datastore/postgres/postgres_test.go b/datastore/postgres/postgres_test.go index e3804479..d00d091d 100644 --- a/datastore/postgres/postgres_test.go +++ b/datastore/postgres/postgres_test.go @@ -83,6 +83,7 @@ func TestPostgresCreateJob(t *testing.T) { j1 := tork.Job{ ID: uuid.NewUUID(), CreatedBy: u, + Tags: []string{"tag-a", "tag-b"}, } err = ds.CreateJob(ctx, &j1) assert.NoError(t, err) @@ -91,6 +92,7 @@ func TestPostgresCreateJob(t *testing.T) { j2, err := ds.GetJobByID(ctx, j1.ID) assert.NoError(t, err) assert.Equal(t, u.Username, j2.CreatedBy.Username) + assert.Equal(t, []string{"tag-a", "tag-b"}, j2.Tags) } func TestPostgresCreateAndGetParallelTask(t *testing.T) { @@ -655,6 +657,7 @@ func TestPostgresSearchJobs(t *testing.T) { Name: "some task", }, }, + Tags: []string{fmt.Sprintf("tag-%d", i)}, } err := ds.CreateJob(ctx, &j1) assert.NoError(t, err) @@ -668,7 +671,33 @@ func TestPostgresSearchJobs(t *testing.T) { }) assert.NoError(t, err) } - p1, err := ds.GetJobs(ctx, "101", 1, 10) + + p1, err := ds.GetJobs(ctx, "", 1, 10) + assert.NoError(t, err) + assert.Equal(t, 10, p1.Size) + assert.Equal(t, 101, p1.TotalItems) + + p1, err = ds.GetJobs(ctx, "101", 1, 10) + assert.NoError(t, err) + assert.Equal(t, 1, p1.Size) + assert.Equal(t, 1, p1.TotalItems) + + p1, err = ds.GetJobs(ctx, "tag:tag-1", 1, 10) + assert.NoError(t, err) + assert.Equal(t, 1, p1.Size) + assert.Equal(t, 1, p1.TotalItems) + + p1, err = ds.GetJobs(ctx, "tag:not-a-tag", 1, 10) + assert.NoError(t, err) + assert.Equal(t, 0, p1.Size) + assert.Equal(t, 0, p1.TotalItems) + + p1, err = ds.GetJobs(ctx, "tags:not-a-tag,tag-1", 1, 10) + assert.NoError(t, err) + assert.Equal(t, 1, p1.Size) + assert.Equal(t, 1, p1.TotalItems) + + p1, err = ds.GetJobs(ctx, "tags:tag-1", 1, 10) assert.NoError(t, err) assert.Equal(t, 1, p1.Size) assert.Equal(t, 1, p1.TotalItems) diff --git a/datastore/postgres/record.go b/datastore/postgres/record.go index 74852593..315f545e 100644 --- a/datastore/postgres/record.go +++ b/datastore/postgres/record.go @@ -53,27 +53,28 @@ type taskRecord struct { } type jobRecord struct { - ID string `db:"id"` - Name string `db:"name"` - Description string `db:"description"` - State string `db:"state"` - CreatedAt time.Time `db:"created_at"` - CreatedBy string `db:"created_by"` - StartedAt *time.Time `db:"started_at"` - CompletedAt *time.Time `db:"completed_at"` - FailedAt *time.Time `db:"failed_at"` - Tasks []byte `db:"tasks"` - Position int `db:"position"` - Inputs []byte `db:"inputs"` - Context []byte `db:"context"` - ParentID string `db:"parent_id"` - TaskCount int `db:"task_count"` - Output string `db:"output_"` - Result string `db:"result"` - Error string `db:"error_"` - TS string `db:"ts"` - Defaults []byte `db:"defaults"` - Webhooks []byte `db:"webhooks"` + ID string `db:"id"` + Name string `db:"name"` + Description string `db:"description"` + Tags pq.StringArray `db:"tags"` + State string `db:"state"` + CreatedAt time.Time `db:"created_at"` + CreatedBy string `db:"created_by"` + StartedAt *time.Time `db:"started_at"` + CompletedAt *time.Time `db:"completed_at"` + FailedAt *time.Time `db:"failed_at"` + Tasks []byte `db:"tasks"` + Position int `db:"position"` + Inputs []byte `db:"inputs"` + Context []byte `db:"context"` + ParentID string `db:"parent_id"` + TaskCount int `db:"task_count"` + Output string `db:"output_"` + Result string `db:"result"` + Error string `db:"error_"` + TS string `db:"ts"` + Defaults []byte `db:"defaults"` + Webhooks []byte `db:"webhooks"` } type nodeRecord struct { @@ -275,6 +276,7 @@ func (r jobRecord) toJob(tasks, execution []*tork.Task, createdBy *tork.User) (* return &tork.Job{ ID: r.ID, Name: r.Name, + Tags: r.Tags, State: tork.JobState(r.State), CreatedAt: r.CreatedAt, CreatedBy: createdBy, diff --git a/db/postgres/schema.go b/db/postgres/schema.go index 47721f6b..07145b49 100644 --- a/db/postgres/schema.go +++ b/db/postgres/schema.go @@ -30,6 +30,7 @@ insert into users (SELECT REPLACE(gen_random_uuid()::text, '-', ''),'Guest','gue CREATE TABLE jobs ( id varchar(32) not null primary key, name varchar(256), + tags text[] not null default '{}', state varchar(10) not null, created_at timestamp not null, created_by varchar(32) not null references users(id), @@ -62,6 +63,7 @@ ALTER TABLE jobs ADD COLUMN ts tsvector NOT NULL CREATE INDEX jobs_ts_idx ON jobs USING GIN (ts); +create index jobs_tags_idx on jobs using gin (tags); CREATE TABLE tasks ( id varchar(32) not null primary key, diff --git a/input/job.go b/input/job.go index 6c18d6c8..b4204061 100644 --- a/input/job.go +++ b/input/job.go @@ -12,6 +12,7 @@ type Job struct { id string Name string `json:"name,omitempty" yaml:"name,omitempty" validate:"required"` Description string `json:"description,omitempty" yaml:"description,omitempty"` + Tags []string `json:"tags,omitempty" yaml:"tags,omitempty"` Tasks []Task `json:"tasks,omitempty" yaml:"tasks,omitempty" validate:"required,min=1,dive"` Inputs map[string]string `json:"inputs,omitempty" yaml:"inputs,omitempty"` Output string `json:"output,omitempty" yaml:"output,omitempty" validate:"expr"` @@ -46,6 +47,7 @@ func (ji *Job) ToJob() *tork.Job { j.ID = ji.ID() j.Description = ji.Description j.Inputs = ji.Inputs + j.Tags = ji.Tags j.Name = ji.Name tasks := make([]*tork.Task, len(ji.Tasks)) for i, ti := range ji.Tasks { diff --git a/internal/slices/slices.go b/internal/slices/slices.go new file mode 100644 index 00000000..dc8f9190 --- /dev/null +++ b/internal/slices/slices.go @@ -0,0 +1,17 @@ +package slices + +func Intersect[T comparable](a []T, b []T) bool { + elements := make(map[T]struct{}) + + for _, item := range a { + elements[item] = struct{}{} + } + + for _, item := range b { + if _, found := elements[item]; found { + return true + } + } + + return false +} diff --git a/internal/slices/slirces_test.go b/internal/slices/slirces_test.go new file mode 100644 index 00000000..2c69666f --- /dev/null +++ b/internal/slices/slirces_test.go @@ -0,0 +1,27 @@ +package slices + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestHasIntersection(t *testing.T) { + tests := []struct { + slice1 []int + slice2 []int + want bool + }{ + {slice1: []int{1, 2, 3, 4, 5}, slice2: []int{4, 5, 6, 7, 8}, want: true}, + {slice1: []int{1, 2, 3, 4, 5}, slice2: []int{6, 7, 8, 9, 10}, want: false}, + {slice1: []int{}, slice2: []int{1, 2, 3}, want: false}, + {slice1: []int{1, 2, 3}, slice2: []int{}, want: false}, + {slice1: []int{}, slice2: []int{}, want: false}, + {slice1: []int{1, 2, 3}, slice2: []int{3, 4, 5}, want: true}, + } + + for _, tt := range tests { + got := Intersect(tt.slice1, tt.slice2) + assert.Equal(t, tt.want, got) + } +} diff --git a/job.go b/job.go index 10474e13..41789332 100644 --- a/job.go +++ b/job.go @@ -23,6 +23,7 @@ type Job struct { ParentID string `json:"parentId,omitempty"` Name string `json:"name,omitempty"` Description string `json:"description,omitempty"` + Tags []string `json:"tags,omitempty"` State JobState `json:"state,omitempty"` CreatedAt time.Time `json:"createdAt,omitempty"` CreatedBy *User `json:"createdBy,omitempty"` @@ -49,6 +50,7 @@ type JobSummary struct { Inputs map[string]string `json:"inputs,omitempty"` Name string `json:"name,omitempty"` Description string `json:"description,omitempty"` + Tags []string `json:"tags,omitempty"` State JobState `json:"state,omitempty"` CreatedAt time.Time `json:"createdAt,omitempty"` StartedAt *time.Time `json:"startedAt,omitempty"` @@ -93,6 +95,7 @@ func (j *Job) Clone() *Job { ID: j.ID, Name: j.Name, Description: j.Description, + Tags: j.Tags, State: j.State, CreatedAt: j.CreatedAt, CreatedBy: createdBy, @@ -151,6 +154,7 @@ func NewJobSummary(j *Job) *JobSummary { ParentID: j.ParentID, Name: j.Name, Description: j.Description, + Tags: j.Tags, Inputs: maps.Clone(j.Inputs), State: j.State, CreatedAt: j.CreatedAt,