Skip to content

Commit

Permalink
Feature: scheduled jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Dec 21, 2024
1 parent 7a01810 commit 946b583
Show file tree
Hide file tree
Showing 28 changed files with 1,666 additions and 106 deletions.
19 changes: 13 additions & 6 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
type Provider func() (Datastore, error)

var (
ErrTaskNotFound = errors.New("task not found")
ErrNodeNotFound = errors.New("node not found")
ErrJobNotFound = errors.New("job not found")
ErrUserNotFound = errors.New("user not found")
ErrRoleNotFound = errors.New("role not found")
ErrContextNotFound = errors.New("context not found")
ErrTaskNotFound = errors.New("task not found")
ErrNodeNotFound = errors.New("node not found")
ErrJobNotFound = errors.New("job not found")
ErrScheduledJobNotFound = errors.New("scheduled job not found")
ErrUserNotFound = errors.New("user not found")
ErrRoleNotFound = errors.New("role not found")
ErrContextNotFound = errors.New("context not found")
)

const (
Expand Down Expand Up @@ -43,6 +44,12 @@ type Datastore interface {
GetJobLogParts(ctx context.Context, jobID, q string, page, size int) (*Page[*tork.TaskLogPart], error)
GetJobs(ctx context.Context, currentUser, q string, page, size int) (*Page[*tork.JobSummary], error)

CreateScheduledJob(ctx context.Context, s *tork.ScheduledJob) error
GetActiveScheduledJobs(ctx context.Context) ([]*tork.ScheduledJob, error)
GetScheduledJobs(ctx context.Context, currentUser string, page, size int) (*Page[*tork.ScheduledJobSummary], error)
GetScheduledJobByID(ctx context.Context, id string) (*tork.ScheduledJob, error)
UpdateScheduledJob(ctx context.Context, id string, modify func(u *tork.ScheduledJob) error) error

CreateUser(ctx context.Context, u *tork.User) error
GetUser(ctx context.Context, username string) (*tork.User, error)

Expand Down
110 changes: 110 additions & 0 deletions datastore/inmemory/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type InMemoryDatastore struct {
usersByID *cache.Cache[*tork.User]
usersByUsername *cache.Cache[*tork.User]
roles *cache.Cache[*tork.Role]
scheduledJobs *cache.Cache[*tork.ScheduledJob]
userRoles *cache.Cache[[]*tork.UserRole]
logs *cache.Cache[[]*tork.TaskLogPart]
logsMu sync.RWMutex
Expand Down Expand Up @@ -85,6 +86,7 @@ func NewInMemoryDatastore(opts ...Option) *InMemoryDatastore {
ds.usersByUsername = cache.New[*tork.User](cache.NoExpiration, ci)
ds.roles = cache.New[*tork.Role](cache.NoExpiration, ci)
ds.userRoles = cache.New[[]*tork.UserRole](cache.NoExpiration, ci)
ds.scheduledJobs = cache.New[*tork.ScheduledJob](cache.NoExpiration, ci)
ds.jobs.OnEvicted(ds.onJobEviction)
return ds
}
Expand Down Expand Up @@ -610,6 +612,114 @@ func (ds *InMemoryDatastore) GetUserRoles(ctx context.Context, userID string) ([
return result, nil
}

func (ds *InMemoryDatastore) CreateScheduledJob(ctx context.Context, s *tork.ScheduledJob) error {
if s.CreatedBy == nil {
s.CreatedBy = guestUser
}
ds.scheduledJobs.Set(s.ID, s.Clone())
return nil
}

func (ds *InMemoryDatastore) GetActiveScheduledJobs(ctx context.Context) ([]*tork.ScheduledJob, error) {
ajobs := make([]*tork.ScheduledJob, 0)
ds.scheduledJobs.Iterate(func(_ string, sj *tork.ScheduledJob) {
if sj.State == tork.ScheduledJobStateActive {
ajobs = append(ajobs, sj.Clone())
}
})
return ajobs, nil
}

func (ds *InMemoryDatastore) GetScheduledJobs(ctx context.Context, currentUser string, page, size int) (*datastore.Page[*tork.ScheduledJobSummary], error) {
var urs []*tork.Role
var user *tork.User
if currentUser != "" {
u, err := ds.GetUser(ctx, currentUser)
if err != nil {
return nil, err
}
user = u
ur, err := ds.GetUserRoles(ctx, u.ID)
if err != nil {
return nil, err
}
urs = ur
}

offset := (page - 1) * size
filtered := make([]*tork.ScheduledJob, 0)
hasPermission := func(user *tork.User, uroles []*tork.Role, job *tork.ScheduledJob) bool {
if len(job.Permissions) == 0 {
return true
}
for _, p := range job.Permissions {
if p.User != nil && p.User.Username == user.Username {
return true
}
if p.Role != nil {
for _, ur := range uroles {
if p.Role.Slug == ur.Slug {
return true
}
}
}
}
return false
}
ds.scheduledJobs.Iterate(func(_ string, j *tork.ScheduledJob) {
if currentUser != "" && !hasPermission(user, urs, j) {
return
}

filtered = append(filtered, j)
})
sort.Slice(filtered, func(i, j int) bool {
return filtered[i].CreatedAt.After(filtered[j].CreatedAt)
})
result := make([]*tork.ScheduledJobSummary, 0)
for i := offset; i < (offset+size) && i < len(filtered); i++ {
j := filtered[i]
result = append(result, tork.NewScheduledJobSummary(j))
}
totalPages := len(filtered) / size
if len(filtered)%size != 0 {
totalPages = totalPages + 1
}
return &datastore.Page[*tork.ScheduledJobSummary]{
Items: result,
Number: page,
Size: len(result),
TotalPages: totalPages,
TotalItems: len(filtered),
}, nil
}

func (ds *InMemoryDatastore) GetScheduledJobByID(ctx context.Context, id string) (*tork.ScheduledJob, error) {
j, ok := ds.scheduledJobs.Get(id)
if !ok {
return nil, datastore.ErrJobNotFound
}
return j.Clone(), nil
}

func (ds *InMemoryDatastore) UpdateScheduledJob(ctx context.Context, id string, modify func(u *tork.ScheduledJob) error) error {
_, ok := ds.scheduledJobs.Get(id)
if !ok {
return datastore.ErrJobNotFound
}
err := ds.scheduledJobs.Modify(id, func(j *tork.ScheduledJob) (*tork.ScheduledJob, error) {
update := j.Clone()
if err := modify(update); err != nil {
return nil, errors.Wrapf(err, "error modifying scheduled job %s", id)
}
return update, nil
})
if err != nil {
return err
}
return nil
}

func (ds *InMemoryDatastore) WithTx(ctx context.Context, f func(tx datastore.Datastore) error) error {
return f(ds)
}
Expand Down
90 changes: 90 additions & 0 deletions datastore/inmemory/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,3 +780,93 @@ func TestInMemoryGetNextTask(t *testing.T) {
_, err = ds.GetNextTask(ctx, "no-such-id")
assert.Error(t, err)
}

func TestInMemoryGetScheduledJobs(t *testing.T) {
ctx := context.Background()
ds := inmemory.NewInMemoryDatastore()

// Create some scheduled jobs
for i := 0; i < 15; i++ {
sj := &tork.ScheduledJob{
ID: uuid.NewUUID(),
Name: fmt.Sprintf("ScheduledJob-%d", i),
State: tork.ScheduledJobStateActive,
}
err := ds.CreateScheduledJob(ctx, sj)
assert.NoError(t, err)
}

// Test fetching the first page
page, err := ds.GetScheduledJobs(ctx, "", 1, 10)
assert.NoError(t, err)
assert.Equal(t, 10, len(page.Items))
assert.Equal(t, 1, page.Number)
assert.Equal(t, 10, page.Size)
assert.Equal(t, 2, page.TotalPages)
assert.Equal(t, 15, page.TotalItems)

// Test fetching the second page
page, err = ds.GetScheduledJobs(ctx, "", 2, 10)
assert.NoError(t, err)
assert.Equal(t, 5, len(page.Items))
assert.Equal(t, 2, page.Number)
assert.Equal(t, 5, page.Size)
assert.Equal(t, 2, page.TotalPages)
assert.Equal(t, 15, page.TotalItems)
}

func TestInMemoryGetScheduledJobByID(t *testing.T) {
ctx := context.Background()
ds := inmemory.NewInMemoryDatastore()

// Test case: Scheduled job not found
_, err := ds.GetScheduledJobByID(ctx, "non-existent-id")
assert.Error(t, err)
assert.Equal(t, datastore.ErrJobNotFound, err)

// Test case: Scheduled job found
scheduledJob := &tork.ScheduledJob{
ID: uuid.NewUUID(),
Name: "Test Scheduled Job",
}
err = ds.CreateScheduledJob(ctx, scheduledJob)
assert.NoError(t, err)

retrievedJob, err := ds.GetScheduledJobByID(ctx, scheduledJob.ID)
assert.NoError(t, err)
assert.Equal(t, scheduledJob.ID, retrievedJob.ID)
assert.Equal(t, scheduledJob.Name, retrievedJob.Name)
}

func TestInMemoryUpdateScheduledJob(t *testing.T) {
ctx := context.Background()
ds := inmemory.NewInMemoryDatastore()

// Create a scheduled job
scheduledJob := &tork.ScheduledJob{
ID: uuid.NewUUID(),
Name: "Test Scheduled Job",
}
err := ds.CreateScheduledJob(ctx, scheduledJob)
assert.NoError(t, err)

// Update the scheduled job
err = ds.UpdateScheduledJob(ctx, scheduledJob.ID, func(u *tork.ScheduledJob) error {
u.Name = "Updated Scheduled Job"
return nil
})
assert.NoError(t, err)

// Retrieve the updated scheduled job
updatedJob, err := ds.GetScheduledJobByID(ctx, scheduledJob.ID)
assert.NoError(t, err)
assert.Equal(t, "Updated Scheduled Job", updatedJob.Name)

// Test case: Scheduled job not found
err = ds.UpdateScheduledJob(ctx, "non-existent-id", func(u *tork.ScheduledJob) error {
u.Name = "Should not update"
return nil
})
assert.Error(t, err)
assert.Equal(t, datastore.ErrJobNotFound, err)
}
Loading

0 comments on commit 946b583

Please sign in to comment.