Skip to content

Commit

Permalink
Convert states to alias
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Dec 24, 2024
1 parent 559c6eb commit e3e272a
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 14 deletions.
2 changes: 1 addition & 1 deletion datastore/inmemory/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (ds *InMemoryDatastore) GetJobByID(ctx context.Context, id string) (*tork.J
func (ds *InMemoryDatastore) GetActiveTasks(ctx context.Context, jobID string) ([]*tork.Task, error) {
result := make([]*tork.Task, 0)
ds.tasks.Iterate(func(_ string, t *tork.Task) {
if t.JobID == jobID && t.State.IsActive() {
if t.JobID == jobID && t.IsActive() {
result = append(result, t.Clone())
}
})
Expand Down
2 changes: 1 addition & 1 deletion internal/coordinator/handlers/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (h *errorHandler) handle(ctx context.Context, et task.EventType, t *tork.Ta

// mark the task as FAILED
if err := h.ds.UpdateTask(ctx, t.ID, func(u *tork.Task) error {
if u.State.IsActive() {
if u.IsActive() {
u.State = tork.TaskStateFailed
u.FailedAt = t.FailedAt
u.Error = t.Error
Expand Down
2 changes: 1 addition & 1 deletion job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"golang.org/x/exp/maps"
)

type JobState string
type JobState = string

const (
JobStatePending JobState = "PENDING"
Expand Down
2 changes: 1 addition & 1 deletion middleware/job/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func Webhook(next HandlerFunc) HandlerFunc {
}
if wh.If != "" {
val, err := eval.EvaluateExpr(wh.If, map[string]any{
"job": j,
"job": tork.NewJobSummary(j),
})
if err != nil {
log.Error().Err(err).Msgf("[Webhook] error evaluating if expression %s", wh.If)
Expand Down
18 changes: 18 additions & 0 deletions middleware/job/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,21 @@ func TestWebhookIfFalse(t *testing.T) {
case <-time.After(500 * time.Millisecond):
}
}

func TestWebhookIfJobStatus(t *testing.T) {
hm := ApplyMiddleware(NoOpHandlerFunc, []MiddlewareFunc{Webhook})
received := make(chan any)
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
close(received)
}))
j := &tork.Job{
ID: "1234",
State: tork.JobStateCompleted,
Webhooks: []*tork.Webhook{{
URL: svr.URL,
If: "{{ job.State == 'COMPLETED' }}",
}},
}
assert.NoError(t, hm(context.Background(), StateChange, j))
<-received
}
4 changes: 2 additions & 2 deletions middleware/task/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func Webhook(ds datastore.Datastore) MiddlewareFunc {
}
if wh.If != "" {
val, err := eval.EvaluateExpr(wh.If, map[string]any{
"task": t,
"job": job,
"task": tork.NewTaskSummary(t),
"job": tork.NewJobSummary(job),
})
if err != nil {
log.Error().Err(err).Msgf("[Webhook] error evaluating if expression %s", wh.If)
Expand Down
28 changes: 28 additions & 0 deletions middleware/task/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,31 @@ func TestWebhookIfFalse(t *testing.T) {
case <-time.After(500 * time.Millisecond):
}
}

func TestWebhookState(t *testing.T) {
ds := inmemory.NewInMemoryDatastore()
hm := ApplyMiddleware(NoOpHandlerFunc, []MiddlewareFunc{Webhook(ds)})
received := make(chan any)
callbackState := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
close(received)
}))
j := &tork.Job{
ID: "1",
State: tork.JobStateCompleted,
Context: tork.JobContext{},
Webhooks: []*tork.Webhook{{
URL: callbackState.URL,
Event: webhook.EventTaskStateChange,
If: "{{ task.State == 'COMPLETED' && job.State == 'COMPLETED' }}",
}},
}
err := ds.CreateJob(context.Background(), j)
assert.NoError(t, err)
tk := &tork.Task{
ID: "2",
JobID: j.ID,
State: tork.TaskStateCompleted,
}
assert.NoError(t, hm(context.Background(), StateChange, tk))
<-received
}
6 changes: 3 additions & 3 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

// State defines the list of states that a
// task can be in, at any given moment.
type TaskState string
type TaskState = string

const (
TaskStateCreated TaskState = "CREATED"
Expand Down Expand Up @@ -150,8 +150,8 @@ type Port struct {
HostPort string `json:"-"`
}

func (s TaskState) IsActive() bool {
return slices.Contains(TaskStateActive, s)
func (t *Task) IsActive() bool {
return slices.Contains(TaskStateActive, t.State)
}

func (t *Task) Clone() *Task {
Expand Down
10 changes: 5 additions & 5 deletions task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,21 @@ func TestIsActive(t *testing.T) {
t1 := &tork.Task{
State: tork.TaskStateCancelled,
}
assert.False(t, t1.State.IsActive())
assert.False(t, t1.IsActive())
t2 := &tork.Task{
State: tork.TaskStateCreated,
}
assert.True(t, t2.State.IsActive())
assert.True(t, t2.IsActive())
t3 := &tork.Task{
State: tork.TaskStatePending,
}
assert.True(t, t3.State.IsActive())
assert.True(t, t3.IsActive())
t4 := &tork.Task{
State: tork.TaskStateRunning,
}
assert.True(t, t4.State.IsActive())
assert.True(t, t4.IsActive())
t5 := &tork.Task{
State: tork.TaskStateCompleted,
}
assert.False(t, t5.State.IsActive())
assert.False(t, t5.IsActive())
}

0 comments on commit e3e272a

Please sign in to comment.