From f52d6b20e262ab9a3ba34ccc3018dea1573a21bc Mon Sep 17 00:00:00 2001 From: Arik Cohen Date: Wed, 10 Jan 2024 13:17:40 -0500 Subject: [PATCH] Feature: task logs pruning --- configs/sample.config.toml | 1 + datastore/postgres/postgres.go | 364 +++++++--------------------- datastore/postgres/postgres_test.go | 96 ++++++++ datastore/postgres/record.go | 282 +++++++++++++++++++++ db/postgres/schema.go | 1 + engine/datastore.go | 4 +- 6 files changed, 476 insertions(+), 272 deletions(-) create mode 100644 datastore/postgres/record.go diff --git a/configs/sample.config.toml b/configs/sample.config.toml index 482d240f..14388e7f 100644 --- a/configs/sample.config.toml +++ b/configs/sample.config.toml @@ -21,6 +21,7 @@ type = "inmemory" # inmemory | postgres [datastore.postgres] dsn = "host=localhost user=tork password=tork dbname=tork port=5432 sslmode=disable" +task.logs.interval = "168h" [coordinator] address = "localhost:8000" diff --git a/datastore/postgres/postgres.go b/datastore/postgres/postgres.go index aeaa84c7..f7c2359f 100644 --- a/datastore/postgres/postgres.go +++ b/datastore/postgres/postgres.go @@ -5,6 +5,7 @@ import ( "database/sql" "encoding/json" "fmt" + "math/rand" "strings" "time" @@ -18,290 +19,93 @@ import ( ) type PostgresDatastore struct { - db *sqlx.DB - tx *sqlx.Tx -} - -type taskRecord struct { - ID string `db:"id"` - JobID string `db:"job_id"` - Position int `db:"position"` - Name string `db:"name"` - Description string `db:"description"` - State string `db:"state"` - CreatedAt time.Time `db:"created_at"` - ScheduledAt *time.Time `db:"scheduled_at"` - StartedAt *time.Time `db:"started_at"` - CompletedAt *time.Time `db:"completed_at"` - FailedAt *time.Time `db:"failed_at"` - CMD pq.StringArray `db:"cmd"` - Entrypoint pq.StringArray `db:"entrypoint"` - Run string `db:"run_script"` - Image string `db:"image"` - Registry []byte `db:"registry"` - Env []byte `db:"env"` - Files []byte `db:"files_"` - Queue string `db:"queue"` - Error string `db:"error_"` - Pre []byte `db:"pre_tasks"` - Post []byte `db:"post_tasks"` - Mounts []byte `db:"mounts"` - Networks pq.StringArray `db:"networks"` - NodeID string `db:"node_id"` - Retry []byte `db:"retry"` - Limits []byte `db:"limits"` - Timeout string `db:"timeout"` - Var string `db:"var"` - Result string `db:"result"` - Parallel []byte `db:"parallel"` - ParentID string `db:"parent_id"` - Each []byte `db:"each_"` - SubJob []byte `db:"subjob"` - SubJobID string `db:"subjob_id"` - GPUs string `db:"gpus"` - IF string `db:"if_"` - Tags pq.StringArray `db:"tags"` -} - -type jobRecord struct { - ID string `db:"id"` - Name string `db:"name"` - Description string `db:"description"` - State string `db:"state"` - CreatedAt time.Time `db:"created_at"` - StartedAt *time.Time `db:"started_at"` - CompletedAt *time.Time `db:"completed_at"` - FailedAt *time.Time `db:"failed_at"` - Tasks []byte `db:"tasks"` - Position int `db:"position"` - Inputs []byte `db:"inputs"` - Context []byte `db:"context"` - ParentID string `db:"parent_id"` - TaskCount int `db:"task_count"` - Output string `db:"output_"` - Result string `db:"result"` - Error string `db:"error_"` - TS string `db:"ts"` - Defaults []byte `db:"defaults"` - Webhooks []byte `db:"webhooks"` -} + db *sqlx.DB + tx *sqlx.Tx + taskLogsRetentionPeriod *time.Duration + cleanupInterval *time.Duration + rand *rand.Rand + disableCleanup bool +} + +var ( + initialCleanupInterval = minCleanupInterval + minCleanupInterval = time.Minute + maxCleanupInterval = time.Hour + DefaultTaskLogsRetentionPeriod = time.Hour * 24 * 7 +) -type nodeRecord struct { - ID string `db:"id"` - Name string `db:"name"` - StartedAt time.Time `db:"started_at"` - LastHeartbeatAt time.Time `db:"last_heartbeat_at"` - CPUPercent float64 `db:"cpu_percent"` - Queue string `db:"queue"` - Status string `db:"status"` - Hostname string `db:"hostname"` - TaskCount int `db:"task_count"` - Version string `db:"version_"` -} -type taskLogPartRecord struct { - ID string `db:"id"` - Number int `db:"number_"` - TaskID string `db:"task_id"` - CreateAt time.Time `db:"created_at"` - Contents string `db:"contents"` -} +type Option = func(ds *PostgresDatastore) -func (r taskRecord) toTask() (*tork.Task, error) { - var env map[string]string - if r.Env != nil { - if err := json.Unmarshal(r.Env, &env); err != nil { - return nil, errors.Wrapf(err, "error deserializing task.env") - } - } - var files map[string]string - if r.Files != nil { - if err := json.Unmarshal(r.Files, &files); err != nil { - return nil, errors.Wrapf(err, "error deserializing task.files") - } - } - var pre []*tork.Task - if r.Pre != nil { - if err := json.Unmarshal(r.Pre, &pre); err != nil { - return nil, errors.Wrapf(err, "error deserializing task.pre") - } - } - var post []*tork.Task - if r.Post != nil { - if err := json.Unmarshal(r.Post, &post); err != nil { - return nil, errors.Wrapf(err, "error deserializing task.post") - } - } - var retry *tork.TaskRetry - if r.Retry != nil { - retry = &tork.TaskRetry{} - if err := json.Unmarshal(r.Retry, retry); err != nil { - return nil, errors.Wrapf(err, "error deserializing task.retry") - } - } - var limits *tork.TaskLimits - if r.Limits != nil { - limits = &tork.TaskLimits{} - if err := json.Unmarshal(r.Limits, limits); err != nil { - return nil, errors.Wrapf(err, "error deserializing task.limits") - } - } - var parallel *tork.ParallelTask - if r.Parallel != nil { - parallel = &tork.ParallelTask{} - if err := json.Unmarshal(r.Parallel, parallel); err != nil { - return nil, errors.Wrapf(err, "error deserializing task.parallel") - } - } - var each *tork.EachTask - if r.Each != nil { - each = &tork.EachTask{} - if err := json.Unmarshal(r.Each, each); err != nil { - return nil, errors.Wrapf(err, "error deserializing task.each") - } - } - var subjob *tork.SubJobTask - if r.SubJob != nil { - subjob = &tork.SubJobTask{} - if err := json.Unmarshal(r.SubJob, subjob); err != nil { - return nil, errors.Wrapf(err, "error deserializing task.subjob") - } - } - var registry *tork.Registry - if r.Registry != nil { - registry = &tork.Registry{} - if err := json.Unmarshal(r.Registry, registry); err != nil { - return nil, errors.Wrapf(err, "error deserializing task.registry") - } - } - var mounts []tork.Mount - if r.Mounts != nil { - if err := json.Unmarshal(r.Mounts, &mounts); err != nil { - return nil, errors.Wrapf(err, "error deserializing task.registry") - } - } - return &tork.Task{ - ID: r.ID, - JobID: r.JobID, - Position: r.Position, - Name: r.Name, - State: tork.TaskState(r.State), - CreatedAt: &r.CreatedAt, - ScheduledAt: r.ScheduledAt, - StartedAt: r.StartedAt, - CompletedAt: r.CompletedAt, - FailedAt: r.FailedAt, - CMD: r.CMD, - Entrypoint: r.Entrypoint, - Run: r.Run, - Image: r.Image, - Registry: registry, - Env: env, - Files: files, - Queue: r.Queue, - Error: r.Error, - Pre: pre, - Post: post, - Mounts: mounts, - Networks: r.Networks, - NodeID: r.NodeID, - Retry: retry, - Limits: limits, - Timeout: r.Timeout, - Var: r.Var, - Result: r.Result, - Parallel: parallel, - ParentID: r.ParentID, - Each: each, - Description: r.Description, - SubJob: subjob, - GPUs: r.GPUs, - If: r.IF, - Tags: r.Tags, - }, nil +func WithTaskLogRetentionPeriod(dur time.Duration) Option { + return func(ds *PostgresDatastore) { + ds.taskLogsRetentionPeriod = &dur + } } -func (r nodeRecord) toNode() *tork.Node { - n := tork.Node{ - ID: r.ID, - Name: r.Name, - StartedAt: r.StartedAt, - CPUPercent: r.CPUPercent, - LastHeartbeatAt: r.LastHeartbeatAt, - Queue: r.Queue, - Status: tork.NodeStatus(r.Status), - Hostname: r.Hostname, - TaskCount: r.TaskCount, - Version: r.Version, - } - // if we hadn't seen an heartbeat for two or more - // consecutive periods we consider the node as offline - if n.LastHeartbeatAt.Before(time.Now().UTC().Add(-tork.HEARTBEAT_RATE * 2)) { - n.Status = tork.NodeStatusOffline - } - return &n +func WithDisableCleanup(val bool) Option { + return func(ds *PostgresDatastore) { + ds.disableCleanup = val + } } -func (r taskLogPartRecord) toTaskLogPart() *tork.TaskLogPart { - return &tork.TaskLogPart{ - Number: r.Number, - TaskID: r.TaskID, - Contents: r.Contents, - CreatedAt: &r.CreateAt, +func NewPostgresDataStore(dsn string, opts ...Option) (*PostgresDatastore, error) { + db, err := sqlx.Connect("postgres", dsn) + if err != nil { + return nil, errors.Wrapf(err, "unable to connect to postgres") + } + ds := &PostgresDatastore{ + db: db, + rand: rand.New(rand.NewSource(time.Now().UnixNano())), + } + for _, opt := range opts { + opt(ds) + } + ds.cleanupInterval = &initialCleanupInterval + if ds.taskLogsRetentionPeriod == nil { + ds.taskLogsRetentionPeriod = &DefaultTaskLogsRetentionPeriod + } + if *ds.cleanupInterval < time.Minute { + return nil, errors.Errorf("cleanup interval can not be under 1 minute") + } + if *ds.taskLogsRetentionPeriod < time.Minute { + return nil, errors.Errorf("task logs retention period can not be under 1 minute") } + if !ds.disableCleanup { + go ds.cleanupProcess() + } + return ds, nil } -func (r jobRecord) toJob(tasks, execution []*tork.Task) (*tork.Job, error) { - var c tork.JobContext - if err := json.Unmarshal(r.Context, &c); err != nil { - return nil, errors.Wrapf(err, "error deserializing job.context") - } - var inputs map[string]string - if err := json.Unmarshal(r.Inputs, &inputs); err != nil { - return nil, errors.Wrapf(err, "error deserializing job.inputs") - } - var defaults *tork.JobDefaults - if r.Defaults != nil { - defaults = &tork.JobDefaults{} - if err := json.Unmarshal(r.Defaults, defaults); err != nil { - return nil, errors.Wrapf(err, "error deserializing job.defaults") - } - } - var webhooks []*tork.Webhook - if err := json.Unmarshal(r.Webhooks, &webhooks); err != nil { - return nil, errors.Wrapf(err, "error deserializing job.webhook") - } - return &tork.Job{ - ID: r.ID, - Name: r.Name, - State: tork.JobState(r.State), - CreatedAt: r.CreatedAt, - StartedAt: r.StartedAt, - CompletedAt: r.CompletedAt, - FailedAt: r.FailedAt, - Tasks: tasks, - Execution: execution, - Position: r.Position, - Context: c, - Inputs: inputs, - Description: r.Description, - ParentID: r.ParentID, - TaskCount: r.TaskCount, - Output: r.Output, - Result: r.Result, - Error: r.Error, - Defaults: defaults, - Webhooks: webhooks, - }, nil +func (ds *PostgresDatastore) cleanupProcess() { + for { + jitter := time.Second * (time.Duration(ds.rand.Intn(60) + 1)) + time.Sleep(*ds.cleanupInterval + jitter) + if err := ds.cleanup(); err != nil { + log.Error().Err(err).Msg("error expunging task logs") + } + } } -func NewPostgresDataStore(dsn string) (*PostgresDatastore, error) { - db, err := sqlx.Connect("postgres", dsn) +func (ds *PostgresDatastore) cleanup() error { + n, err := ds.expungeExpiredTaskLogPart() if err != nil { - return nil, errors.Wrapf(err, "unable to connect to postgres") + return err } - return &PostgresDatastore{ - db: db, - }, nil + if n > 0 { + log.Debug().Msgf("Expunged %d expired task log parts from the DB", n) + newCleanupInterval := (*ds.cleanupInterval) / 2 + if newCleanupInterval < minCleanupInterval { + newCleanupInterval = minCleanupInterval + } + ds.cleanupInterval = &newCleanupInterval + } else { + newCleanupInterval := (*ds.cleanupInterval) * 2 + if newCleanupInterval > maxCleanupInterval { + newCleanupInterval = maxCleanupInterval + } + ds.cleanupInterval = &newCleanupInterval + } + return nil } func (ds *PostgresDatastore) ExecScript(script string) error { @@ -829,6 +633,24 @@ func (ds *PostgresDatastore) CreateTaskLogPart(ctx context.Context, p *tork.Task return nil } +func (ds *PostgresDatastore) expungeExpiredTaskLogPart() (int, error) { + q := `delete from tasks_log_parts where id in ( + select id + from tasks_log_parts + where created_at < $1 + limit 1000 + )` + res, err := ds.exec(q, time.Now().UTC().Add(-*ds.taskLogsRetentionPeriod)) + if err != nil { + return 0, errors.Wrapf(err, "error deleting expired task log parts from the db") + } + rows, err := res.RowsAffected() + if err != nil { + return 0, errors.Wrapf(err, "error getting the number of deleted log parts") + } + return int(rows), nil +} + func (ds *PostgresDatastore) GetTaskLogParts(ctx context.Context, taskID string, page, size int) (*datastore.Page[*tork.TaskLogPart], error) { offset := (page - 1) * size rs := []taskLogPartRecord{} diff --git a/datastore/postgres/postgres_test.go b/datastore/postgres/postgres_test.go index efed9e1b..adde8cc6 100644 --- a/datastore/postgres/postgres_test.go +++ b/datastore/postgres/postgres_test.go @@ -912,3 +912,99 @@ func TestPostgresCreateAndGetTaskLogsLarge(t *testing.T) { assert.Equal(t, 10, logs.Size) assert.Equal(t, 10, logs.TotalPages) } + +func TestPostgresCreateAndExpungeTaskLogs(t *testing.T) { + ctx := context.Background() + dsn := "host=localhost user=tork password=tork dbname=tork port=5432 sslmode=disable" + ds, err := NewPostgresDataStore(dsn) + assert.NoError(t, err) + now := time.Now().UTC() + j1 := tork.Job{ + ID: uuid.NewUUID(), + } + err = ds.CreateJob(ctx, &j1) + assert.NoError(t, err) + t1 := tork.Task{ + ID: uuid.NewUUID(), + CreatedAt: &now, + JobID: j1.ID, + } + err = ds.CreateTask(ctx, &t1) + assert.NoError(t, err) + + for i := 1; i <= 100; i++ { + err := ds.CreateTaskLogPart(ctx, &tork.TaskLogPart{ + Number: i, + TaskID: t1.ID, + Contents: fmt.Sprintf("line %d", i), + }) + assert.NoError(t, err) + } + + n, err := ds.expungeExpiredTaskLogPart() + assert.NoError(t, err) + assert.Equal(t, 0, n) + + logs, err := ds.GetTaskLogParts(ctx, t1.ID, 1, 1) + assert.NoError(t, err) + assert.Equal(t, 100, logs.TotalItems) + + retentionPeriod := time.Microsecond + ds.taskLogsRetentionPeriod = &retentionPeriod + + n, err = ds.expungeExpiredTaskLogPart() + assert.NoError(t, err) + assert.GreaterOrEqual(t, n, 100) + + logs, err = ds.GetTaskLogParts(ctx, t1.ID, 1, 1) + assert.NoError(t, err) + assert.Equal(t, 0, logs.TotalItems) +} + +func Test_cleanup(t *testing.T) { + ctx := context.Background() + dsn := "host=localhost user=tork password=tork dbname=tork port=5432 sslmode=disable" + ds, err := NewPostgresDataStore(dsn, WithDisableCleanup(true)) + assert.NoError(t, err) + now := time.Now().UTC() + j1 := tork.Job{ + ID: uuid.NewUUID(), + } + err = ds.CreateJob(ctx, &j1) + assert.NoError(t, err) + t1 := tork.Task{ + ID: uuid.NewUUID(), + CreatedAt: &now, + JobID: j1.ID, + } + err = ds.CreateTask(ctx, &t1) + assert.NoError(t, err) + + for i := 1; i <= 100; i++ { + err := ds.CreateTaskLogPart(ctx, &tork.TaskLogPart{ + Number: i, + TaskID: t1.ID, + Contents: fmt.Sprintf("line %d", i), + }) + assert.NoError(t, err) + } + + err = ds.cleanup() + assert.NoError(t, err) + assert.Equal(t, time.Minute*2, *ds.cleanupInterval) + + logs, err := ds.GetTaskLogParts(ctx, t1.ID, 1, 1) + assert.NoError(t, err) + assert.Equal(t, 100, logs.TotalItems) + + retentionPeriod := time.Microsecond + ds.taskLogsRetentionPeriod = &retentionPeriod + + err = ds.cleanup() + assert.NoError(t, err) + assert.Equal(t, time.Minute, *ds.cleanupInterval) + + logs, err = ds.GetTaskLogParts(ctx, t1.ID, 1, 1) + assert.NoError(t, err) + assert.Equal(t, 0, logs.TotalItems) +} diff --git a/datastore/postgres/record.go b/datastore/postgres/record.go new file mode 100644 index 00000000..1ba03ed7 --- /dev/null +++ b/datastore/postgres/record.go @@ -0,0 +1,282 @@ +package postgres + +import ( + "encoding/json" + "time" + + "github.com/lib/pq" + "github.com/pkg/errors" + "github.com/runabol/tork" +) + +type taskRecord struct { + ID string `db:"id"` + JobID string `db:"job_id"` + Position int `db:"position"` + Name string `db:"name"` + Description string `db:"description"` + State string `db:"state"` + CreatedAt time.Time `db:"created_at"` + ScheduledAt *time.Time `db:"scheduled_at"` + StartedAt *time.Time `db:"started_at"` + CompletedAt *time.Time `db:"completed_at"` + FailedAt *time.Time `db:"failed_at"` + CMD pq.StringArray `db:"cmd"` + Entrypoint pq.StringArray `db:"entrypoint"` + Run string `db:"run_script"` + Image string `db:"image"` + Registry []byte `db:"registry"` + Env []byte `db:"env"` + Files []byte `db:"files_"` + Queue string `db:"queue"` + Error string `db:"error_"` + Pre []byte `db:"pre_tasks"` + Post []byte `db:"post_tasks"` + Mounts []byte `db:"mounts"` + Networks pq.StringArray `db:"networks"` + NodeID string `db:"node_id"` + Retry []byte `db:"retry"` + Limits []byte `db:"limits"` + Timeout string `db:"timeout"` + Var string `db:"var"` + Result string `db:"result"` + Parallel []byte `db:"parallel"` + ParentID string `db:"parent_id"` + Each []byte `db:"each_"` + SubJob []byte `db:"subjob"` + SubJobID string `db:"subjob_id"` + GPUs string `db:"gpus"` + IF string `db:"if_"` + Tags pq.StringArray `db:"tags"` +} + +type jobRecord struct { + ID string `db:"id"` + Name string `db:"name"` + Description string `db:"description"` + State string `db:"state"` + CreatedAt time.Time `db:"created_at"` + StartedAt *time.Time `db:"started_at"` + CompletedAt *time.Time `db:"completed_at"` + FailedAt *time.Time `db:"failed_at"` + Tasks []byte `db:"tasks"` + Position int `db:"position"` + Inputs []byte `db:"inputs"` + Context []byte `db:"context"` + ParentID string `db:"parent_id"` + TaskCount int `db:"task_count"` + Output string `db:"output_"` + Result string `db:"result"` + Error string `db:"error_"` + TS string `db:"ts"` + Defaults []byte `db:"defaults"` + Webhooks []byte `db:"webhooks"` +} + +type nodeRecord struct { + ID string `db:"id"` + Name string `db:"name"` + StartedAt time.Time `db:"started_at"` + LastHeartbeatAt time.Time `db:"last_heartbeat_at"` + CPUPercent float64 `db:"cpu_percent"` + Queue string `db:"queue"` + Status string `db:"status"` + Hostname string `db:"hostname"` + TaskCount int `db:"task_count"` + Version string `db:"version_"` +} +type taskLogPartRecord struct { + ID string `db:"id"` + Number int `db:"number_"` + TaskID string `db:"task_id"` + CreateAt time.Time `db:"created_at"` + Contents string `db:"contents"` +} + +func (r taskRecord) toTask() (*tork.Task, error) { + var env map[string]string + if r.Env != nil { + if err := json.Unmarshal(r.Env, &env); err != nil { + return nil, errors.Wrapf(err, "error deserializing task.env") + } + } + var files map[string]string + if r.Files != nil { + if err := json.Unmarshal(r.Files, &files); err != nil { + return nil, errors.Wrapf(err, "error deserializing task.files") + } + } + var pre []*tork.Task + if r.Pre != nil { + if err := json.Unmarshal(r.Pre, &pre); err != nil { + return nil, errors.Wrapf(err, "error deserializing task.pre") + } + } + var post []*tork.Task + if r.Post != nil { + if err := json.Unmarshal(r.Post, &post); err != nil { + return nil, errors.Wrapf(err, "error deserializing task.post") + } + } + var retry *tork.TaskRetry + if r.Retry != nil { + retry = &tork.TaskRetry{} + if err := json.Unmarshal(r.Retry, retry); err != nil { + return nil, errors.Wrapf(err, "error deserializing task.retry") + } + } + var limits *tork.TaskLimits + if r.Limits != nil { + limits = &tork.TaskLimits{} + if err := json.Unmarshal(r.Limits, limits); err != nil { + return nil, errors.Wrapf(err, "error deserializing task.limits") + } + } + var parallel *tork.ParallelTask + if r.Parallel != nil { + parallel = &tork.ParallelTask{} + if err := json.Unmarshal(r.Parallel, parallel); err != nil { + return nil, errors.Wrapf(err, "error deserializing task.parallel") + } + } + var each *tork.EachTask + if r.Each != nil { + each = &tork.EachTask{} + if err := json.Unmarshal(r.Each, each); err != nil { + return nil, errors.Wrapf(err, "error deserializing task.each") + } + } + var subjob *tork.SubJobTask + if r.SubJob != nil { + subjob = &tork.SubJobTask{} + if err := json.Unmarshal(r.SubJob, subjob); err != nil { + return nil, errors.Wrapf(err, "error deserializing task.subjob") + } + } + var registry *tork.Registry + if r.Registry != nil { + registry = &tork.Registry{} + if err := json.Unmarshal(r.Registry, registry); err != nil { + return nil, errors.Wrapf(err, "error deserializing task.registry") + } + } + var mounts []tork.Mount + if r.Mounts != nil { + if err := json.Unmarshal(r.Mounts, &mounts); err != nil { + return nil, errors.Wrapf(err, "error deserializing task.registry") + } + } + return &tork.Task{ + ID: r.ID, + JobID: r.JobID, + Position: r.Position, + Name: r.Name, + State: tork.TaskState(r.State), + CreatedAt: &r.CreatedAt, + ScheduledAt: r.ScheduledAt, + StartedAt: r.StartedAt, + CompletedAt: r.CompletedAt, + FailedAt: r.FailedAt, + CMD: r.CMD, + Entrypoint: r.Entrypoint, + Run: r.Run, + Image: r.Image, + Registry: registry, + Env: env, + Files: files, + Queue: r.Queue, + Error: r.Error, + Pre: pre, + Post: post, + Mounts: mounts, + Networks: r.Networks, + NodeID: r.NodeID, + Retry: retry, + Limits: limits, + Timeout: r.Timeout, + Var: r.Var, + Result: r.Result, + Parallel: parallel, + ParentID: r.ParentID, + Each: each, + Description: r.Description, + SubJob: subjob, + GPUs: r.GPUs, + If: r.IF, + Tags: r.Tags, + }, nil +} + +func (r nodeRecord) toNode() *tork.Node { + n := tork.Node{ + ID: r.ID, + Name: r.Name, + StartedAt: r.StartedAt, + CPUPercent: r.CPUPercent, + LastHeartbeatAt: r.LastHeartbeatAt, + Queue: r.Queue, + Status: tork.NodeStatus(r.Status), + Hostname: r.Hostname, + TaskCount: r.TaskCount, + Version: r.Version, + } + // if we hadn't seen an heartbeat for two or more + // consecutive periods we consider the node as offline + if n.LastHeartbeatAt.Before(time.Now().UTC().Add(-tork.HEARTBEAT_RATE * 2)) { + n.Status = tork.NodeStatusOffline + } + return &n +} + +func (r taskLogPartRecord) toTaskLogPart() *tork.TaskLogPart { + return &tork.TaskLogPart{ + Number: r.Number, + TaskID: r.TaskID, + Contents: r.Contents, + CreatedAt: &r.CreateAt, + } +} + +func (r jobRecord) toJob(tasks, execution []*tork.Task) (*tork.Job, error) { + var c tork.JobContext + if err := json.Unmarshal(r.Context, &c); err != nil { + return nil, errors.Wrapf(err, "error deserializing job.context") + } + var inputs map[string]string + if err := json.Unmarshal(r.Inputs, &inputs); err != nil { + return nil, errors.Wrapf(err, "error deserializing job.inputs") + } + var defaults *tork.JobDefaults + if r.Defaults != nil { + defaults = &tork.JobDefaults{} + if err := json.Unmarshal(r.Defaults, defaults); err != nil { + return nil, errors.Wrapf(err, "error deserializing job.defaults") + } + } + var webhooks []*tork.Webhook + if err := json.Unmarshal(r.Webhooks, &webhooks); err != nil { + return nil, errors.Wrapf(err, "error deserializing job.webhook") + } + return &tork.Job{ + ID: r.ID, + Name: r.Name, + State: tork.JobState(r.State), + CreatedAt: r.CreatedAt, + StartedAt: r.StartedAt, + CompletedAt: r.CompletedAt, + FailedAt: r.FailedAt, + Tasks: tasks, + Execution: execution, + Position: r.Position, + Context: c, + Inputs: inputs, + Description: r.Description, + ParentID: r.ParentID, + TaskCount: r.TaskCount, + Output: r.Output, + Result: r.Result, + Error: r.Error, + Defaults: defaults, + Webhooks: webhooks, + }, nil +} diff --git a/db/postgres/schema.go b/db/postgres/schema.go index cc7ec644..3ff44ae3 100644 --- a/db/postgres/schema.go +++ b/db/postgres/schema.go @@ -103,4 +103,5 @@ CREATE TABLE tasks_log_parts ( ); CREATE INDEX idx_tasks_log_parts_task_id ON tasks_log_parts (task_id); +CREATE INDEX idx_tasks_log_parts_created_at ON tasks_log_parts (created_at); ` diff --git a/engine/datastore.go b/engine/datastore.go index 60165e12..4d72d7ef 100644 --- a/engine/datastore.go +++ b/engine/datastore.go @@ -31,7 +31,9 @@ func (e *Engine) createDatastore(dstype string) (datastore.Datastore, error) { "datastore.postgres.dsn", "host=localhost user=tork password=tork dbname=tork port=5432 sslmode=disable", ) - return postgres.NewPostgresDataStore(dsn) + return postgres.NewPostgresDataStore(dsn, + postgres.WithTaskLogRetentionPeriod(conf.DurationDefault("datastore.postgres.task.logs.interval", postgres.DefaultTaskLogsRetentionPeriod)), + ) default: return nil, errors.Errorf("unknown datastore type: %s", dstype) }