From ff64bef02c23d6ffd399afece2fc9982c9b1fc68 Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Fri, 27 Dec 2024 12:59:28 +0900 Subject: [PATCH] persistence: refactor: Avoid persisting unnecessary data of DAG steps (#750) --- cmd/retry.go | 2 +- internal/agent/agent.go | 47 +++--- internal/agent/agent_test.go | 2 +- internal/agent/reporter.go | 8 +- internal/agent/reporter_test.go | 12 +- internal/client/client.go | 47 +++--- internal/client/client_test.go | 31 ++-- internal/client/interface.go | 22 +-- internal/digraph/builder.go | 13 +- internal/digraph/context.go | 8 +- internal/digraph/executor/command.go | 4 +- internal/digraph/executor/sub.go | 2 +- internal/digraph/scheduler/node.go | 9 +- internal/digraph/scheduler/scheduler.go | 2 +- internal/digraph/scheduler/scheduler_test.go | 4 +- internal/digraph/step.go | 2 - internal/frontend/dag/convert.go | 5 +- internal/frontend/dag/handler.go | 26 ++-- internal/persistence/interface.go | 6 +- internal/persistence/jsondb/jsondb.go | 20 +-- internal/persistence/jsondb/jsondb_test.go | 54 ++++--- internal/persistence/jsondb/setup_test.go | 2 +- internal/persistence/jsondb/writer.go | 2 +- internal/persistence/jsondb/writer_test.go | 19 ++- internal/persistence/model/status.go | 153 ++++++++++++------- internal/persistence/model/status_test.go | 23 +-- 26 files changed, 292 insertions(+), 233 deletions(-) diff --git a/cmd/retry.go b/cmd/retry.go index 511455da4..3ccef57d6 100644 --- a/cmd/retry.go +++ b/cmd/retry.go @@ -145,7 +145,7 @@ func executeRetry(ctx context.Context, execCtx *executionContext, cfg *config.Co logFile.Name(), cli, execCtx.dataStore, - &agent.Options{RetryTarget: execCtx.originalState.Status}, + &agent.Options{RetryTarget: &execCtx.originalState.Status}, ) listenSignals(ctx, agt) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index ef583c955..61000696d 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -5,6 +5,7 @@ package agent import ( "context" + "encoding/json" "errors" "fmt" "net/http" @@ -222,7 +223,7 @@ func (a *Agent) PrintSummary(ctx context.Context) { } // Status collects the current running status of the DAG and returns it. -func (a *Agent) Status() *model.Status { +func (a *Agent) Status() model.Status { // Lock to avoid race condition. a.lock.RLock() defer a.lock.RUnlock() @@ -234,34 +235,20 @@ func (a *Agent) Status() *model.Status { } // Create the status object to record the current status. - status := &model.Status{ - RequestID: a.requestID, - Name: a.dag.Name, - Status: schedulerStatus, - StatusText: schedulerStatus.String(), - PID: model.PID(os.Getpid()), - Nodes: model.FromNodesOrSteps(a.graph.NodeData(), a.dag.Steps), - StartedAt: model.FormatTime(a.graph.StartAt()), - FinishedAt: model.FormatTime(a.graph.FinishAt()), - Log: a.logFile, - Params: model.Params(a.dag.Params), - } - - // Collect the handler nodes. - if node := a.scheduler.HandlerNode(digraph.HandlerOnExit); node != nil { - status.OnExit = model.FromNode(node.Data()) - } - if node := a.scheduler.HandlerNode(digraph.HandlerOnSuccess); node != nil { - status.OnSuccess = model.FromNode(node.Data()) - } - if node := a.scheduler.HandlerNode(digraph.HandlerOnFailure); node != nil { - status.OnFailure = model.FromNode(node.Data()) - } - if node := a.scheduler.HandlerNode(digraph.HandlerOnCancel); node != nil { - status.OnCancel = model.FromNode(node.Data()) - } - - return status + return model.NewStatusFactory(a.dag). + Create( + a.requestID, + schedulerStatus, + os.Getpid(), + a.graph.StartAt(), + model.WithFinishedAt(a.graph.FinishAt()), + model.WithNodes(a.graph.NodeData()), + model.WithLogFilePath(a.logFile), + model.WithOnExitNode(a.scheduler.HandlerNode(digraph.HandlerOnExit)), + model.WithOnSuccessNode(a.scheduler.HandlerNode(digraph.HandlerOnSuccess)), + model.WithOnFailureNode(a.scheduler.HandlerNode(digraph.HandlerOnFailure)), + model.WithOnCancelNode(a.scheduler.HandlerNode(digraph.HandlerOnCancel)), + ) } // Signal sends the signal to the processes running @@ -284,7 +271,7 @@ func (a *Agent) HandleHTTP(ctx context.Context) sock.HTTPHandlerFunc { // Return the current status of the execution. status := a.Status() status.Status = scheduler.StatusRunning - statusJSON, err := status.ToJSON() + statusJSON, err := json.Marshal(status) if err != nil { encodeError(w, err) return diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go index 912f3b432..cbd68bc51 100644 --- a/internal/agent/agent_test.go +++ b/internal/agent/agent_test.go @@ -176,7 +176,7 @@ func TestAgent_Retry(t *testing.T) { // Retry the DAG and check if it is successful dagAgent = dag.Agent(test.WithAgentOptions(&agent.Options{ - RetryTarget: status, + RetryTarget: &status, })) dagAgent.RunSuccess(t) diff --git a/internal/agent/reporter.go b/internal/agent/reporter.go index 59af29703..e584f556c 100644 --- a/internal/agent/reporter.go +++ b/internal/agent/reporter.go @@ -31,7 +31,7 @@ func newReporter(sender Sender) *reporter { // reportStep is a function that reports the status of a step. func (r *reporter) reportStep( - ctx context.Context, dag *digraph.DAG, status *model.Status, node *scheduler.Node, + ctx context.Context, dag *digraph.DAG, status model.Status, node *scheduler.Node, ) error { nodeStatus := node.State().Status if nodeStatus != scheduler.NodeStatusNone { @@ -49,7 +49,7 @@ func (r *reporter) reportStep( } // report is a function that reports the status of the scheduler. -func (r *reporter) getSummary(_ context.Context, status *model.Status, err error) string { +func (r *reporter) getSummary(_ context.Context, status model.Status, err error) string { var buf bytes.Buffer _, _ = buf.Write([]byte("\n")) _, _ = buf.Write([]byte("Summary ->\n")) @@ -61,7 +61,7 @@ func (r *reporter) getSummary(_ context.Context, status *model.Status, err error } // send is a function that sends a report mail. -func (r *reporter) send(ctx context.Context, dag *digraph.DAG, status *model.Status, err error) error { +func (r *reporter) send(ctx context.Context, dag *digraph.DAG, status model.Status, err error) error { if err != nil || status.Status == scheduler.StatusError { if dag.MailOn != nil && dag.MailOn.Failure { fromAddress := dag.ErrorMail.From @@ -94,7 +94,7 @@ var dagHeader = table.Row{ "Error", } -func renderDAGSummary(status *model.Status, err error) string { +func renderDAGSummary(status model.Status, err error) string { dataRow := table.Row{ status.RequestID, status.Name, diff --git a/internal/agent/reporter_test.go b/internal/agent/reporter_test.go index c412528aa..d164569e6 100644 --- a/internal/agent/reporter_test.go +++ b/internal/agent/reporter_test.go @@ -76,7 +76,7 @@ func testErrorMail(t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*model. dag.MailOn.Failure = true dag.MailOn.Success = false - _ = rp.send(context.Background(), dag, &model.Status{ + _ = rp.send(context.Background(), dag, model.Status{ Status: scheduler.StatusError, Nodes: nodes, }, fmt.Errorf("Error")) @@ -92,7 +92,7 @@ func testNoErrorMail(t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*mode dag.MailOn.Failure = false dag.MailOn.Success = true - err := rp.send(context.Background(), dag, &model.Status{ + err := rp.send(context.Background(), dag, model.Status{ Status: scheduler.StatusError, Nodes: nodes, }, nil) @@ -107,7 +107,7 @@ func testSuccessMail(t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*mode dag.MailOn.Failure = true dag.MailOn.Success = true - err := rp.send(context.Background(), dag, &model.Status{ + err := rp.send(context.Background(), dag, model.Status{ Status: scheduler.StatusSuccess, Nodes: nodes, }, nil) @@ -121,11 +121,7 @@ func testSuccessMail(t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*mode } func testRenderSummary(t *testing.T, _ *reporter, dag *digraph.DAG, nodes []*model.Node) { - status := &model.Status{ - Name: dag.Name, - Status: scheduler.StatusError, - Nodes: nodes, - } + status := model.NewStatusFactory(dag).Create("request-id", scheduler.StatusError, 0, time.Now()) summary := renderDAGSummary(status, errors.New("test error")) require.Contains(t, summary, "test error") require.Contains(t, summary, dag.Name) diff --git a/internal/client/client.go b/internal/client/client.go index cdfa19a05..52966af71 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -36,6 +36,8 @@ func New( } } +var _ Client = (*client)(nil) + type client struct { dataStore persistence.DataStores executable string @@ -174,7 +176,9 @@ func (*client) GetCurrentStatus(_ context.Context, dag *digraph.DAG) (*model.Sta if errors.Is(err, sock.ErrTimeout) { return nil, err } - return model.NewStatusDefault(dag), nil + // The DAG is not running so return the default status + status := model.NewStatusFactory(dag).CreateDefault() + return &status, nil } return model.StatusFromJSON(ret) } @@ -191,7 +195,7 @@ func (e *client) GetStatusByRequestID(ctx context.Context, dag *digraph.DAG, req // if the request id is not matched then correct the status ret.Status.CorrectRunningStatus() } - return ret.Status, err + return &ret.Status, err } func (*client) currentStatus(_ context.Context, dag *digraph.DAG) (*model.Status, error) { @@ -203,28 +207,30 @@ func (*client) currentStatus(_ context.Context, dag *digraph.DAG) (*model.Status return model.StatusFromJSON(ret) } -func (e *client) GetLatestStatus(ctx context.Context, dag *digraph.DAG) (*model.Status, error) { +func (e *client) GetLatestStatus(ctx context.Context, dag *digraph.DAG) (model.Status, error) { currStatus, _ := e.currentStatus(ctx, dag) if currStatus != nil { - return currStatus, nil + return *currStatus, nil } status, err := e.dataStore.HistoryStore().ReadStatusToday(ctx, dag.Location) - if errors.Is(err, persistence.ErrNoStatusDataToday) || - errors.Is(err, persistence.ErrNoStatusData) { - return model.NewStatusDefault(dag), nil - } if err != nil { - return model.NewStatusDefault(dag), err + status := model.NewStatusFactory(dag).CreateDefault() + if errors.Is(err, persistence.ErrNoStatusDataToday) || + errors.Is(err, persistence.ErrNoStatusData) { + // No status for today + return status, nil + } + return status, err } status.CorrectRunningStatus() - return status, nil + return *status, nil } -func (e *client) GetRecentHistory(ctx context.Context, dag *digraph.DAG, n int) []*model.StatusFile { +func (e *client) GetRecentHistory(ctx context.Context, dag *digraph.DAG, n int) []model.StatusFile { return e.dataStore.HistoryStore().ReadStatusRecent(ctx, dag.Location, n) } -func (e *client) UpdateStatus(ctx context.Context, dag *digraph.DAG, status *model.Status) error { +func (e *client) UpdateStatus(ctx context.Context, dag *digraph.DAG, status model.Status) error { client := sock.NewClient(dag.SockAddr()) res, err := client.Request("GET", "/status") if err != nil { @@ -256,12 +262,12 @@ func (e *client) DeleteDAG(ctx context.Context, name, loc string) error { } func (e *client) GetAllStatus(ctx context.Context) ( - statuses []*DAGStatus, errs []string, err error, + statuses []DAGStatus, errs []string, err error, ) { dagStore := e.dataStore.DAGStore() dagList, errs, err := dagStore.List(ctx) - var ret []*DAGStatus + var ret []DAGStatus for _, d := range dagList { status, err := e.readStatus(ctx, d) if err != nil { @@ -277,13 +283,12 @@ func (e *client) getPageCount(total int, limit int) int { return (total-1)/(limit) + 1 } -func (e *client) GetAllStatusPagination(ctx context.Context, params dags.ListDagsParams) ([]*DAGStatus, *DagListPaginationSummaryResult, error) { +func (e *client) GetAllStatusPagination(ctx context.Context, params dags.ListDagsParams) ([]DAGStatus, *DagListPaginationSummaryResult, error) { var ( dagListPaginationResult *persistence.DagListPaginationResult err error dagStore = e.dataStore.DAGStore() - dagStatusList = make([]*DAGStatus, 0) - currentStatus *DAGStatus + dagStatusList = make([]DAGStatus, 0) ) page := 1 @@ -305,6 +310,10 @@ func (e *client) GetAllStatusPagination(ctx context.Context, params dags.ListDag } for _, currentDag := range dagListPaginationResult.DagList { + var ( + currentStatus DAGStatus + err error + ) if currentStatus, err = e.readStatus(ctx, currentDag); err != nil { dagListPaginationResult.ErrorList = append(dagListPaginationResult.ErrorList, err.Error()) } @@ -323,7 +332,7 @@ func (e *client) getDAG(ctx context.Context, name string) (*digraph.DAG, error) return e.emptyDAGIfNil(dagDetail, name), err } -func (e *client) GetStatus(ctx context.Context, id string) (*DAGStatus, error) { +func (e *client) GetStatus(ctx context.Context, id string) (DAGStatus, error) { dag, err := e.getDAG(ctx, id) if dag == nil { // TODO: fix not to use location @@ -344,7 +353,7 @@ func (e *client) ToggleSuspend(_ context.Context, id string, suspend bool) error return flagStore.ToggleSuspend(id, suspend) } -func (e *client) readStatus(ctx context.Context, dag *digraph.DAG) (*DAGStatus, error) { +func (e *client) readStatus(ctx context.Context, dag *digraph.DAG) (DAGStatus, error) { latestStatus, err := e.GetLatestStatus(ctx, dag) id := strings.TrimSuffix( filepath.Base(dag.Location), diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 034b35775..df037ee0a 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -4,6 +4,7 @@ package client_test import ( + "encoding/json" "fmt" "net/http" "path/filepath" @@ -29,14 +30,20 @@ func TestClient_GetStatus(t *testing.T) { dag := th.LoadDAGFile(t, "valid.yaml") ctx := th.Context + requestID := fmt.Sprintf("request-id-%d", time.Now().Unix()) socketServer, _ := sock.NewServer( dag.SockAddr(), func(w http.ResponseWriter, _ *http.Request) { - status := model.NewStatus(dag.DAG, nil, - scheduler.StatusRunning, 0, nil, nil) + status := model.NewStatusFactory(dag.DAG).Create( + requestID, scheduler.StatusRunning, 0, time.Now(), + ) w.WriteHeader(http.StatusOK) - b, _ := status.ToJSON() - _, _ = w.Write(b) + jsonData, err := json.Marshal(status) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + _, _ = w.Write(jsonData) }, ) @@ -83,9 +90,9 @@ func TestClient_GetStatus(t *testing.T) { _ = historyStore.Close(ctx) // Get the status and check if it is the same as the one we wrote. - status, err = cli.GetStatusByRequestID(ctx, dag.DAG, requestID) + statusToCheck, err := cli.GetStatusByRequestID(ctx, dag.DAG, requestID) require.NoError(t, err) - require.Equal(t, scheduler.NodeStatusSuccess, status.Nodes[0].Status) + require.Equal(t, scheduler.NodeStatusSuccess, statusToCheck.Nodes[0].Status) // Update the status. newStatus := scheduler.NodeStatusError @@ -305,14 +312,12 @@ func TestClient_ReadHistory(t *testing.T) { }) } -func testNewStatus(dag *digraph.DAG, requestID string, status scheduler.Status, nodeStatus scheduler.NodeStatus) *model.Status { - nodeData := scheduler.NodeData{ - State: scheduler.NodeState{Status: nodeStatus}, - } +func testNewStatus(dag *digraph.DAG, requestID string, status scheduler.Status, nodeStatus scheduler.NodeStatus) model.Status { + nodes := []scheduler.NodeData{{State: scheduler.NodeState{Status: nodeStatus}}} startedAt := model.Time(time.Now()) - statusModel := model.NewStatus(dag, []scheduler.NodeData{nodeData}, status, 0, startedAt, nil) - statusModel.RequestID = requestID - return statusModel + return model.NewStatusFactory(dag).Create( + requestID, status, 0, *startedAt, model.WithNodes(nodes), + ) } func TestClient_GetTagList(t *testing.T) { diff --git a/internal/client/interface.go b/internal/client/interface.go index ee1ea26d8..ece57e5e0 100644 --- a/internal/client/interface.go +++ b/internal/client/interface.go @@ -25,14 +25,14 @@ type Client interface { Retry(ctx context.Context, dag *digraph.DAG, requestID string) error GetCurrentStatus(ctx context.Context, dag *digraph.DAG) (*model.Status, error) GetStatusByRequestID(ctx context.Context, dag *digraph.DAG, requestID string) (*model.Status, error) - GetLatestStatus(ctx context.Context, dag *digraph.DAG) (*model.Status, error) - GetRecentHistory(ctx context.Context, dag *digraph.DAG, n int) []*model.StatusFile - UpdateStatus(ctx context.Context, dag *digraph.DAG, status *model.Status) error + GetLatestStatus(ctx context.Context, dag *digraph.DAG) (model.Status, error) + GetRecentHistory(ctx context.Context, dag *digraph.DAG, n int) []model.StatusFile + UpdateStatus(ctx context.Context, dag *digraph.DAG, status model.Status) error UpdateDAG(ctx context.Context, id string, spec string) error DeleteDAG(ctx context.Context, id, loc string) error - GetAllStatus(ctx context.Context) (statuses []*DAGStatus, errs []string, err error) - GetAllStatusPagination(ctx context.Context, params dags.ListDagsParams) ([]*DAGStatus, *DagListPaginationSummaryResult, error) - GetStatus(ctx context.Context, dagLocation string) (*DAGStatus, error) + GetAllStatus(ctx context.Context) (statuses []DAGStatus, errs []string, err error) + GetAllStatusPagination(ctx context.Context, params dags.ListDagsParams) ([]DAGStatus, *DagListPaginationSummaryResult, error) + GetStatus(ctx context.Context, dagLocation string) (DAGStatus, error) IsSuspended(ctx context.Context, id string) bool ToggleSuspend(ctx context.Context, id string, suspend bool) error GetTagList(ctx context.Context) ([]string, []string, error) @@ -51,7 +51,7 @@ type DAGStatus struct { File string Dir string DAG *digraph.DAG - Status *model.Status + Status model.Status Suspended bool Error error ErrorT *string @@ -63,13 +63,13 @@ type DagListPaginationSummaryResult struct { } func newDAGStatus( - dag *digraph.DAG, s *model.Status, suspended bool, err error, -) *DAGStatus { - ret := &DAGStatus{ + dag *digraph.DAG, status model.Status, suspended bool, err error, +) DAGStatus { + ret := DAGStatus{ File: filepath.Base(dag.Location), Dir: filepath.Dir(dag.Location), DAG: dag, - Status: s, + Status: status, Suspended: suspended, Error: err, } diff --git a/internal/digraph/builder.go b/internal/digraph/builder.go index 5de44643a..71c3b71e8 100644 --- a/internal/digraph/builder.go +++ b/internal/digraph/builder.go @@ -299,28 +299,28 @@ func buildLogDir(_ BuildContext, spec *definition, dag *DAG) (err error) { func buildHandlers(ctx BuildContext, spec *definition, dag *DAG) (err error) { if spec.HandlerOn.Exit != nil { spec.HandlerOn.Exit.Name = HandlerOnExit.String() - if dag.HandlerOn.Exit, err = buildStep(ctx, dag.Env, *spec.HandlerOn.Exit, spec.Functions); err != nil { + if dag.HandlerOn.Exit, err = buildStep(ctx, *spec.HandlerOn.Exit, spec.Functions); err != nil { return err } } if spec.HandlerOn.Success != nil { spec.HandlerOn.Success.Name = HandlerOnSuccess.String() - if dag.HandlerOn.Success, err = buildStep(ctx, dag.Env, *spec.HandlerOn.Success, spec.Functions); err != nil { + if dag.HandlerOn.Success, err = buildStep(ctx, *spec.HandlerOn.Success, spec.Functions); err != nil { return } } if spec.HandlerOn.Failure != nil { spec.HandlerOn.Failure.Name = HandlerOnFailure.String() - if dag.HandlerOn.Failure, err = buildStep(ctx, dag.Env, *spec.HandlerOn.Failure, spec.Functions); err != nil { + if dag.HandlerOn.Failure, err = buildStep(ctx, *spec.HandlerOn.Failure, spec.Functions); err != nil { return } } if spec.HandlerOn.Cancel != nil { spec.HandlerOn.Cancel.Name = HandlerOnCancel.String() - if dag.HandlerOn.Cancel, err = buildStep(ctx, dag.Env, *spec.HandlerOn.Cancel, spec.Functions); err != nil { + if dag.HandlerOn.Cancel, err = buildStep(ctx, *spec.HandlerOn.Cancel, spec.Functions); err != nil { return } } @@ -357,7 +357,7 @@ func skipIfSuccessful(_ BuildContext, spec *definition, dag *DAG) error { func buildSteps(ctx BuildContext, spec *definition, dag *DAG) error { var steps []Step for _, stepDef := range spec.Steps { - step, err := buildStep(ctx, dag.Env, stepDef, spec.Functions) + step, err := buildStep(ctx, stepDef, spec.Functions) if err != nil { return err } @@ -404,7 +404,7 @@ func buildMailConfig(def mailConfigDef) (*MailConfig, error) { } // buildStep builds a step from the step definition. -func buildStep(ctx BuildContext, variables []string, def stepDef, fns []*funcDef) (*Step, error) { +func buildStep(ctx BuildContext, def stepDef, fns []*funcDef) (*Step, error) { if err := assertStepDef(def, fns); err != nil { return nil, err } @@ -418,7 +418,6 @@ func buildStep(ctx BuildContext, variables []string, def stepDef, fns []*funcDef Stderr: def.Stderr, Output: def.Output, Dir: def.Dir, - Variables: variables, Depends: def.Depends, MailOnError: def.MailOnError, Preconditions: buildConditions(def.Preconditions), diff --git a/internal/digraph/context.go b/internal/digraph/context.go index 70eedb425..6eb8cfbe2 100644 --- a/internal/digraph/context.go +++ b/internal/digraph/context.go @@ -38,7 +38,7 @@ type Context struct { DAG *DAG Finder Finder ResultCollector ResultCollector - Envs Envs + AdditionalEnvs Envs } // Envs is a list of environment variables. @@ -73,15 +73,15 @@ func NewContext(ctx context.Context, dag *DAG, finder Finder, resultCollector Re DAG: dag, Finder: finder, ResultCollector: resultCollector, - Envs: []Env{ + AdditionalEnvs: []Env{ {Key: EnvKeySchedulerLogPath, Value: logFile}, {Key: EnvKeyRequestID, Value: requestID}, }, }) } -func (c Context) WithEnv(env Env) Context { - c.Envs = append([]Env{env}, c.Envs...) +func (c Context) WithAdditionalEnv(env Env) Context { + c.AdditionalEnvs = append([]Env{env}, c.AdditionalEnvs...) return c } diff --git a/internal/digraph/executor/command.go b/internal/digraph/executor/command.go index 66d054c11..cbc2fb4c0 100644 --- a/internal/digraph/executor/command.go +++ b/internal/digraph/executor/command.go @@ -36,8 +36,8 @@ func newCommand(ctx context.Context, step digraph.Step) (Executor, error) { cmd := createCommand(ctx, step) cmd.Dir = step.Dir cmd.Env = append(cmd.Env, os.Environ()...) - cmd.Env = append(cmd.Env, step.Variables...) - cmd.Env = append(cmd.Env, dagContext.Envs.All()...) + cmd.Env = append(cmd.Env, dagContext.DAG.Env...) + cmd.Env = append(cmd.Env, dagContext.AdditionalEnvs.All()...) // Get output variables from the step context and set them as environment stepCtx := digraph.GetStepContext(ctx) diff --git a/internal/digraph/executor/sub.go b/internal/digraph/executor/sub.go index 4c0652626..72f21c1d2 100644 --- a/internal/digraph/executor/sub.go +++ b/internal/digraph/executor/sub.go @@ -79,7 +79,7 @@ func newSubWorkflow( } cmd.Dir = step.Dir cmd.Env = append(cmd.Env, os.Environ()...) - cmd.Env = append(cmd.Env, step.Variables...) + cmd.Env = append(cmd.Env, dagCtx.DAG.Env...) // Get output variables from the step context and set them as environment stepCtx := digraph.GetStepContext(ctx) diff --git a/internal/digraph/scheduler/node.go b/internal/digraph/scheduler/node.go index 8c794350b..59ad9811e 100644 --- a/internal/digraph/scheduler/node.go +++ b/internal/digraph/scheduler/node.go @@ -160,7 +160,7 @@ func (n *Node) Execute(ctx context.Context) error { } // Add the log path to the environment - dagCtx = dagCtx.WithEnv(digraph.Env{ + dagCtx = dagCtx.WithAdditionalEnv(digraph.Env{ Key: digraph.EnvKeyLogPath, Value: n.data.State.Log, }) @@ -529,13 +529,6 @@ func (n *Node) Init() { return } n.id = getNextNodeID() - - if n.data.Step.Variables == nil { - n.data.Step.Variables = []string{} - } - if n.data.Step.Variables == nil { - n.data.Step.Variables = []string{} - } if n.data.Step.Preconditions == nil { n.data.Step.Preconditions = []digraph.Condition{} } diff --git a/internal/digraph/scheduler/scheduler.go b/internal/digraph/scheduler/scheduler.go index 4f02bfda8..7fe1aa7e7 100644 --- a/internal/digraph/scheduler/scheduler.go +++ b/internal/digraph/scheduler/scheduler.go @@ -522,7 +522,7 @@ func (sc *Scheduler) setup(ctx context.Context) (err error) { if err != nil { return err } - for _, env := range dagCtx.Envs { + for _, env := range dagCtx.AdditionalEnvs { os.Setenv(env.Key, env.Value) } diff --git a/internal/digraph/scheduler/scheduler_test.go b/internal/digraph/scheduler/scheduler_test.go index 786def5b3..118c33a43 100644 --- a/internal/digraph/scheduler/scheduler_test.go +++ b/internal/digraph/scheduler/scheduler_test.go @@ -202,7 +202,9 @@ func TestScheduler(t *testing.T) { require.Equal(t, 2, node.State().RetryCount) // 2 retry }) t.Run("RetryPolicySuccess", func(t *testing.T) { - const file = "flag_test_retry_success" + file := filepath.Join( + os.TempDir(), fmt.Sprintf("flag_test_retry_success_%s", uuid.Must(uuid.NewRandom()).String()), + ) sc := setup(t) diff --git a/internal/digraph/step.go b/internal/digraph/step.go index 0e235ef56..6c9e6a321 100644 --- a/internal/digraph/step.go +++ b/internal/digraph/step.go @@ -19,8 +19,6 @@ type Step struct { Description string `json:"Description,omitempty"` // Shell is the shell program to execute the command. optional. Shell string `json:"Shell,omitempty"` - // Variables contains the list of variables to be set. - Variables []string `json:"Variables,omitempty"` // OutputVariables is a structure to store the output variables for the // following steps. It only contains the local output variables. OutputVariables *SyncMap `json:"OutputVariables,omitempty"` diff --git a/internal/frontend/dag/convert.go b/internal/frontend/dag/convert.go index 5d5a60960..d6c8c64f1 100644 --- a/internal/frontend/dag/convert.go +++ b/internal/frontend/dag/convert.go @@ -29,7 +29,7 @@ func convertToDAG(dag *digraph.DAG) *models.Dag { } } -func convertToStatusDetail(s *model.Status) *models.DagStatusDetail { +func convertToStatusDetail(s model.Status) *models.DagStatusDetail { status := &models.DagStatusDetail{ Log: swag.String(s.Log), Name: swag.String(s.Name), @@ -100,7 +100,8 @@ func convertToStepObject(step digraph.Step) *models.StepObject { Preconditions: conditions, RepeatPolicy: repeatPolicy, Script: swag.String(step.Script), - Variables: step.Variables, + // Deprecated: Removed field but keeping for backward compatibility. + Variables: []string{}, } if step.SubWorkflow != nil { so.Run = step.SubWorkflow.Name diff --git a/internal/frontend/dag/handler.go b/internal/frontend/dag/handler.go index 1ecf9cd80..ed809c493 100644 --- a/internal/frontend/dag/handler.go +++ b/internal/frontend/dag/handler.go @@ -434,14 +434,9 @@ func (h *Handler) getDetail( } dagStatus, err := h.client.GetStatus(ctx, dagID) - if dagStatus == nil { - return nil, newNotFoundError(err) - } - - dag := dagStatus.DAG var steps []*models.StepObject - for _, step := range dag.Steps { + for _, step := range dagStatus.DAG.Steps { steps = append(steps, convertToStepObject(step)) } @@ -462,20 +457,21 @@ func (h *Handler) getDetail( } var schedules []*models.Schedule - for _, s := range dag.Schedule { + for _, s := range dagStatus.DAG.Schedule { schedules = append(schedules, &models.Schedule{ Expression: swag.String(s.Expression), }) } var preconditions []*models.Condition - for _, p := range dag.Preconditions { + for _, p := range dagStatus.DAG.Preconditions { preconditions = append(preconditions, &models.Condition{ Condition: p.Condition, Expected: p.Expected, }) } + dag := dagStatus.DAG dagDetail := &models.DagDetail{ DefaultParams: swag.String(dag.DefaultParams), Delay: swag.Int64(int64(dag.Delay)), @@ -592,19 +588,19 @@ func (h *Handler) processStepLogRequest( } if params.File != nil { - s, err := jsondb.ParseStatusFile(*params.File) + parsedStatus, err := jsondb.ParseStatusFile(*params.File) if err != nil { return nil, newBadRequestError(err) } - status = s + status = parsedStatus } if status == nil { - s, err := h.client.GetLatestStatus(ctx, dag) + latestStatus, err := h.client.GetLatestStatus(ctx, dag) if err != nil { return nil, newInternalError(err) } - status = s + status = &latestStatus } // Find the step in the status to get the log file. @@ -777,7 +773,7 @@ func (h *Handler) postAction( return nil, newBadRequestError(errInvalidArgs) } - var dagStatus *client.DAGStatus + var dagStatus client.DAGStatus if *params.Body.Action != "save" { s, err := h.client.GetStatus(ctx, params.DagID) @@ -861,7 +857,7 @@ func (h *Handler) postAction( func (h *Handler) processUpdateStatus( ctx context.Context, params dags.PostDagActionParams, - dagStatus *client.DAGStatus, to scheduler.NodeStatus, + dagStatus client.DAGStatus, to scheduler.NodeStatus, ) (*models.PostDagActionResponse, *codedError) { if params.Body.RequestID == "" { return nil, newBadRequestError(fmt.Errorf("request-id is required: %w", errInvalidArgs)) @@ -901,7 +897,7 @@ func (h *Handler) processUpdateStatus( status.Nodes[idxToUpdate].Status = to status.Nodes[idxToUpdate].StatusText = to.String() - if err := h.client.UpdateStatus(ctx, dagStatus.DAG, status); err != nil { + if err := h.client.UpdateStatus(ctx, dagStatus.DAG, *status); err != nil { return nil, newInternalError(err) } diff --git a/internal/persistence/interface.go b/internal/persistence/interface.go index 71c4e38b0..567b9384a 100644 --- a/internal/persistence/interface.go +++ b/internal/persistence/interface.go @@ -27,10 +27,10 @@ type DataStores interface { type HistoryStore interface { Open(ctx context.Context, key string, timestamp time.Time, requestID string) error - Write(ctx context.Context, status *model.Status) error + Write(ctx context.Context, status model.Status) error Close(ctx context.Context) error - Update(ctx context.Context, key, requestID string, status *model.Status) error - ReadStatusRecent(ctx context.Context, key string, itemLimit int) []*model.StatusFile + Update(ctx context.Context, key, requestID string, status model.Status) error + ReadStatusRecent(ctx context.Context, key string, itemLimit int) []model.StatusFile ReadStatusToday(ctx context.Context, key string) (*model.Status, error) FindByRequestID(ctx context.Context, key string, requestID string) (*model.StatusFile, error) RemoveAll(ctx context.Context, key string) error diff --git a/internal/persistence/jsondb/jsondb.go b/internal/persistence/jsondb/jsondb.go index 1567f33af..01444385c 100644 --- a/internal/persistence/jsondb/jsondb.go +++ b/internal/persistence/jsondb/jsondb.go @@ -29,8 +29,6 @@ import ( ) var ( - _ persistence.HistoryStore = (*JSONDB)(nil) - errRequestIDNotFound = errors.New("request ID not found") errCreateNewDirectory = errors.New("failed to create new directory") errKeyEmpty = errors.New("dagFile is empty") @@ -63,6 +61,8 @@ func DefaultConfig() Config { } } +var _ persistence.HistoryStore = (*JSONDB)(nil) + // JSONDB manages DAGs status files in local storage. type JSONDB struct { location string @@ -89,7 +89,7 @@ func New(location string, cfg Config) *JSONDB { return db } -func (db *JSONDB) Update(ctx context.Context, key, requestID string, status *model.Status) error { +func (db *JSONDB) Update(ctx context.Context, key, requestID string, status model.Status) error { statusFile, err := db.FindByRequestID(ctx, key, requestID) if err != nil { return err @@ -122,7 +122,7 @@ func (db *JSONDB) Open(_ context.Context, key string, timestamp time.Time, reque return nil } -func (db *JSONDB) Write(_ context.Context, status *model.Status) error { +func (db *JSONDB) Write(_ context.Context, status model.Status) error { return db.writer.write(status) } @@ -144,8 +144,8 @@ func (db *JSONDB) Close(ctx context.Context) error { return db.writer.close() } -func (db *JSONDB) ReadStatusRecent(_ context.Context, key string, itemLimit int) []*model.StatusFile { - var ret []*model.StatusFile +func (db *JSONDB) ReadStatusRecent(_ context.Context, key string, itemLimit int) []model.StatusFile { + var ret []model.StatusFile files := db.getLatestMatches(db.globPattern(key), itemLimit) for _, file := range files { @@ -155,9 +155,9 @@ func (db *JSONDB) ReadStatusRecent(_ context.Context, key string, itemLimit int) if err != nil { continue } - ret = append(ret, &model.StatusFile{ + ret = append(ret, model.StatusFile{ File: file, - Status: status, + Status: *status, }) } @@ -195,7 +195,7 @@ func (db *JSONDB) FindByRequestID(_ context.Context, key string, requestID strin if status != nil && status.RequestID == requestID { return &model.StatusFile{ File: match, - Status: status, + Status: *status, }, nil } } @@ -251,7 +251,7 @@ func (db *JSONDB) Compact(_ context.Context, targetFilePath string) error { } defer writer.close() - if err := writer.write(status); err != nil { + if err := writer.write(*status); err != nil { if removeErr := os.Remove(tempFilePath); removeErr != nil { return fmt.Errorf("%w: %s", err, removeErr) } diff --git a/internal/persistence/jsondb/jsondb_test.go b/internal/persistence/jsondb/jsondb_test.go index 0fb8d8487..c8bc04809 100644 --- a/internal/persistence/jsondb/jsondb_test.go +++ b/internal/persistence/jsondb/jsondb_test.go @@ -17,6 +17,8 @@ import ( "github.com/stretchr/testify/require" ) +const testPID = 12345 + func TestJSONDB_Basic(t *testing.T) { th := testSetup(t) @@ -28,8 +30,9 @@ func TestJSONDB_Basic(t *testing.T) { err := th.DB.Open(th.Context, dag.Location, now, requestID) require.NoError(t, err) - status := model.NewStatus(dag.DAG, nil, scheduler.StatusRunning, 10000, nil, nil) - status.RequestID = requestID + status := model.NewStatusFactory(dag.DAG).Create( + requestID, scheduler.StatusRunning, testPID, time.Now(), + ) err = th.DB.Write(th.Context, status) require.NoError(t, err) @@ -46,8 +49,9 @@ func TestJSONDB_Basic(t *testing.T) { err := th.DB.Open(th.Context, dag.Location, now, requestID) require.NoError(t, err) - status := model.NewStatus(dag.DAG, nil, scheduler.StatusRunning, 10000, nil, nil) - status.RequestID = requestID + status := model.NewStatusFactory(dag.DAG).Create( + requestID, scheduler.StatusRunning, testPID, time.Now(), + ) err = th.DB.Write(th.Context, status) require.NoError(t, err) err = th.DB.Close(th.Context) @@ -79,7 +83,9 @@ func TestJSONDB_ReadStatus(t *testing.T) { err := th.DB.Open(th.Context, dag.Location, now, requestID) require.NoError(t, err) - status := model.NewStatus(dag.DAG, nil, scheduler.StatusRunning, 10000, nil, nil) + status := model.NewStatusFactory(dag.DAG).Create( + requestID, scheduler.StatusRunning, testPID, time.Now(), + ) status.RequestID = requestID err = th.DB.Write(th.Context, status) require.NoError(t, err) @@ -101,7 +107,9 @@ func TestJSONDB_ReadStatus(t *testing.T) { err := th.DB.Open(th.Context, dag.Location, now, requestID) require.NoError(t, err) - status := model.NewStatus(dag.DAG, nil, scheduler.StatusRunning, 10000, nil, nil) + status := model.NewStatusFactory(dag.DAG).Create( + requestID, scheduler.StatusRunning, testPID, time.Now(), + ) status.RequestID = requestID err = th.DB.Write(th.Context, status) require.NoError(t, err) @@ -134,8 +142,9 @@ func TestJSONDB_ReadStatusRecent_EdgeCases(t *testing.T) { err := th.DB.Open(th.Context, dag.Location, now, requestID) require.NoError(t, err) - status := model.NewStatus(dag.DAG, nil, scheduler.StatusRunning, 10000, nil, nil) - status.RequestID = requestID + status := model.NewStatusFactory(dag.DAG).Create( + requestID, scheduler.StatusRunning, testPID, time.Now(), + ) err = th.DB.Write(th.Context, status) require.NoError(t, err) err = th.DB.Close(th.Context) @@ -160,7 +169,9 @@ func TestJSONDB_ReadStatusToday_EdgeCases(t *testing.T) { err := th.DB.Open(th.Context, dag.Location, yesterdayTime, requestID) require.NoError(t, err) - status := model.NewStatus(dag.DAG, nil, scheduler.StatusSuccess, 10000, nil, nil) + status := model.NewStatusFactory(dag.DAG).Create( + requestID, scheduler.StatusSuccess, testPID, time.Now(), + ) status.RequestID = requestID err = th.DB.Write(th.Context, status) require.NoError(t, err) @@ -192,8 +203,9 @@ func TestJSONDB_RemoveAll(t *testing.T) { err := th.DB.Open(th.Context, dag.Location, now, requestID) require.NoError(t, err) - status := model.NewStatus(dag.DAG, nil, scheduler.StatusRunning, 10000, nil, nil) - status.RequestID = requestID + status := model.NewStatusFactory(dag.DAG).Create( + requestID, scheduler.StatusRunning, testPID, time.Now(), + ) err = th.DB.Write(th.Context, status) require.NoError(t, err) err = th.DB.Close(th.Context) @@ -227,14 +239,20 @@ func TestJSONDB_Update_EdgeCases(t *testing.T) { t.Run("UpdateNonExistentStatus", func(t *testing.T) { dag := th.DAG("test_update_nonexistent") - status := model.NewStatus(dag.DAG, nil, scheduler.StatusSuccess, 10000, nil, nil) + requestID := "request-id-nonexistent" + status := model.NewStatusFactory(dag.DAG).Create( + requestID, scheduler.StatusSuccess, testPID, time.Now(), + ) err := th.DB.Update(th.Context, dag.Location, "nonexistent-id", status) assert.ErrorIs(t, err, persistence.ErrRequestIDNotFound) }) t.Run("UpdateWithEmptyRequestID", func(t *testing.T) { dag := th.DAG("test_update_empty_id") - status := model.NewStatus(dag.DAG, nil, scheduler.StatusSuccess, 10000, nil, nil) + requestID := "" + status := model.NewStatusFactory(dag.DAG).Create( + requestID, scheduler.StatusSuccess, testPID, time.Now(), + ) err := th.DB.Update(th.Context, dag.Location, "", status) assert.ErrorIs(t, err, errRequestIDNotFound) }) @@ -275,8 +293,9 @@ func TestJSONDB_FileManagement(t *testing.T) { err := th.DB.Open(th.Context, dag.Location, oldTime, requestID) require.NoError(t, err) - status := model.NewStatus(dag.DAG, nil, scheduler.StatusSuccess, 10000, nil, nil) - status.RequestID = requestID + status := model.NewStatusFactory(dag.DAG).Create( + requestID, scheduler.StatusSuccess, testPID, time.Now(), + ) err = th.DB.Write(th.Context, status) require.NoError(t, err) @@ -309,8 +328,9 @@ func TestJSONDB_FileManagement(t *testing.T) { require.NoError(t, err) for i := 0; i < 3; i++ { - status := model.NewStatus(dag.DAG, nil, scheduler.StatusRunning, 10000, nil, nil) - status.RequestID = requestID + status := model.NewStatusFactory(dag.DAG).Create( + requestID, scheduler.StatusRunning, testPID, time.Now(), + ) err = th.DB.Write(th.Context, status) require.NoError(t, err) } diff --git a/internal/persistence/jsondb/setup_test.go b/internal/persistence/jsondb/setup_test.go index 7e9ec5437..445b01458 100644 --- a/internal/persistence/jsondb/setup_test.go +++ b/internal/persistence/jsondb/setup_test.go @@ -76,7 +76,7 @@ func (d dagTestHelper) Writer(t *testing.T, requestID string, startedAt time.Tim } } -func (w writerTestHelper) Write(t *testing.T, status *model.Status) { +func (w writerTestHelper) Write(t *testing.T, status model.Status) { t.Helper() err := w.Writer.write(status) diff --git a/internal/persistence/jsondb/writer.go b/internal/persistence/jsondb/writer.go index f3599f75b..cf8162d9c 100644 --- a/internal/persistence/jsondb/writer.go +++ b/internal/persistence/jsondb/writer.go @@ -57,7 +57,7 @@ func (w *writer) open() error { } // write appends the status to the local file. -func (w *writer) write(st *model.Status) error { +func (w *writer) write(st model.Status) error { w.mu.Lock() defer w.mu.Unlock() diff --git a/internal/persistence/jsondb/writer_test.go b/internal/persistence/jsondb/writer_test.go index 0bd165276..4822e80dd 100644 --- a/internal/persistence/jsondb/writer_test.go +++ b/internal/persistence/jsondb/writer_test.go @@ -21,10 +21,10 @@ func TestWriter(t *testing.T) { t.Run("WriteStatusToNewFile", func(t *testing.T) { dag := th.DAG("test_write_status") - status := model.NewStatus(dag.DAG, nil, scheduler.StatusRunning, 10000, nil, nil) requestID := fmt.Sprintf("request-id-%d", time.Now().Unix()) - status.RequestID = requestID - + status := model.NewStatusFactory(dag.DAG).Create( + requestID, scheduler.StatusRunning, testPID, time.Now(), + ) writer := dag.Writer(t, requestID, time.Now()) writer.Write(t, status) @@ -38,8 +38,9 @@ func TestWriter(t *testing.T) { writer := dag.Writer(t, requestID, startedAt) - status := model.NewStatus(dag.DAG, nil, scheduler.StatusCancel, 10000, nil, nil) - status.RequestID = requestID + status := model.NewStatusFactory(dag.DAG).Create( + requestID, scheduler.StatusCancel, testPID, time.Now(), + ) // Write initial status writer.Write(t, status) @@ -72,7 +73,8 @@ func TestWriterErrorHandling(t *testing.T) { require.NoError(t, writer.close()) dag := th.DAG("test_write_to_closed_writer") - status := model.NewStatus(dag.DAG, nil, scheduler.StatusRunning, 10000, nil, nil) + requestID := fmt.Sprintf("request-id-%d", time.Now().Unix()) + status := model.NewStatusFactory(dag.DAG).Create(requestID, scheduler.StatusRunning, testPID, time.Now()) assert.Error(t, writer.write(status)) }) @@ -90,7 +92,10 @@ func TestWriterRename(t *testing.T) { // Create a status file with old path dag := th.DAG("test_rename_old") writer := dag.Writer(t, "request-id-1", time.Now()) - status := model.NewStatus(dag.DAG, nil, scheduler.StatusRunning, 10000, nil, nil) + requestID := fmt.Sprintf("request-id-%d", time.Now().Unix()) + status := model.NewStatusFactory(dag.DAG).Create( + requestID, scheduler.StatusRunning, testPID, time.Now(), + ) writer.Write(t, status) writer.Close(t) require.FileExists(t, writer.FilePath) diff --git a/internal/persistence/model/status.go b/internal/persistence/model/status.go index c4fe22700..c470d0eaf 100644 --- a/internal/persistence/model/status.go +++ b/internal/persistence/model/status.go @@ -7,7 +7,6 @@ import ( "encoding/json" "fmt" "strings" - "sync" "time" "github.com/dagu-org/dagu/internal/digraph" @@ -15,6 +14,104 @@ import ( "github.com/dagu-org/dagu/internal/stringutil" ) +type StatusFactory struct { + dag *digraph.DAG +} + +func NewStatusFactory(dag *digraph.DAG) *StatusFactory { + return &StatusFactory{dag: dag} +} + +func (f *StatusFactory) CreateDefault() Status { + return Status{ + Name: f.dag.Name, + Status: scheduler.StatusNone, + StatusText: scheduler.StatusNone.String(), + PID: PID(pidNotRunning), + Nodes: FromSteps(f.dag.Steps), + OnExit: nodeOrNil(f.dag.HandlerOn.Exit), + OnSuccess: nodeOrNil(f.dag.HandlerOn.Success), + OnFailure: nodeOrNil(f.dag.HandlerOn.Failure), + OnCancel: nodeOrNil(f.dag.HandlerOn.Cancel), + Params: Params(f.dag.Params), + StartedAt: stringutil.FormatTime(time.Time{}), + FinishedAt: stringutil.FormatTime(time.Time{}), + } +} + +type StatusOption func(*Status) + +func WithNodes(nodes []scheduler.NodeData) StatusOption { + return func(s *Status) { + s.Nodes = FromNodes(nodes) + } +} + +func WithFinishedAt(t time.Time) StatusOption { + return func(s *Status) { + s.FinishedAt = FormatTime(t) + } +} + +func WithOnExitNode(node *scheduler.Node) StatusOption { + return func(s *Status) { + if node != nil { + s.OnExit = FromNode(node.Data()) + } + } +} + +func WithOnSuccessNode(node *scheduler.Node) StatusOption { + return func(s *Status) { + if node != nil { + s.OnSuccess = FromNode(node.Data()) + } + } +} + +func WithOnFailureNode(node *scheduler.Node) StatusOption { + return func(s *Status) { + if node != nil { + s.OnFailure = FromNode(node.Data()) + } + } +} + +func WithOnCancelNode(node *scheduler.Node) StatusOption { + return func(s *Status) { + if node != nil { + s.OnCancel = FromNode(node.Data()) + } + } +} + +func WithLogFilePath(logFilePath string) StatusOption { + return func(s *Status) { + s.Log = logFilePath + } +} + +func (f *StatusFactory) Create( + requestID string, + status scheduler.Status, + pid int, + startedAt time.Time, + opts ...StatusOption, +) Status { + statusObj := f.CreateDefault() + statusObj.RequestID = requestID + statusObj.Status = status + statusObj.StatusText = status.String() + statusObj.PID = PID(pid) + statusObj.StartedAt = FormatTime(startedAt) + + for _, opt := range opts { + opt(&statusObj) + } + + return statusObj +} + func StatusFromJSON(s string) (*Status, error) { status := new(Status) err := json.Unmarshal([]byte(s), status) @@ -24,16 +121,9 @@ func StatusFromJSON(s string) (*Status, error) { return status, err } -func FromNodesOrSteps(nodes []scheduler.NodeData, steps []digraph.Step) []*Node { - if len(nodes) != 0 { - return FromNodes(nodes) - } - return FromSteps(steps) -} - type StatusFile struct { File string - Status *Status + Status Status } type StatusResponse struct { @@ -55,41 +145,6 @@ type Status struct { FinishedAt string `json:"FinishedAt"` Log string `json:"Log"` Params string `json:"Params"` - mu sync.RWMutex -} - -func NewStatusDefault(dag *digraph.DAG) *Status { - return NewStatus( - dag, nil, scheduler.StatusNone, int(pidNotRunning), nil, nil, - ) -} - -func NewStatus( - dag *digraph.DAG, - nodes []scheduler.NodeData, - status scheduler.Status, - pid int, - startTime, endTime *time.Time, -) *Status { - statusObj := &Status{ - Name: dag.Name, - Status: status, - StatusText: status.String(), - PID: PID(pid), - Nodes: FromNodesOrSteps(nodes, dag.Steps), - OnExit: nodeOrNil(dag.HandlerOn.Exit), - OnSuccess: nodeOrNil(dag.HandlerOn.Success), - OnFailure: nodeOrNil(dag.HandlerOn.Failure), - OnCancel: nodeOrNil(dag.HandlerOn.Cancel), - Params: Params(dag.Params), - } - if startTime != nil { - statusObj.StartedAt = stringutil.FormatTime(*startTime) - } - if endTime != nil { - statusObj.FinishedAt = stringutil.FormatTime(*endTime) - } - return statusObj } func (st *Status) CorrectRunningStatus() { @@ -99,16 +154,6 @@ func (st *Status) CorrectRunningStatus() { } } -func (st *Status) ToJSON() ([]byte, error) { - st.mu.RLock() - defer st.mu.RUnlock() - js, err := json.Marshal(st) - if err != nil { - return []byte{}, err - } - return js, nil -} - func FormatTime(val time.Time) string { if val.IsZero() { return "" diff --git a/internal/persistence/model/status_test.go b/internal/persistence/model/status_test.go index 71532594c..abded8f65 100644 --- a/internal/persistence/model/status_test.go +++ b/internal/persistence/model/status_test.go @@ -26,12 +26,12 @@ func TestPID(t *testing.T) { } func TestStatusSerialization(t *testing.T) { - start, end := time.Now(), time.Now().Add(time.Second*1) + startedAt, finishedAt := time.Now(), time.Now().Add(time.Second*1) dag := &digraph.DAG{ HandlerOn: digraph.HandlerOn{}, Steps: []digraph.Step{ { - Name: "1", Description: "", Variables: []string{}, + Name: "1", Description: "", Dir: "dir", Command: "echo 1", Args: []string{}, Depends: []string{}, ContinueOn: digraph.ContinueOn{}, RetryPolicy: digraph.RetryPolicy{}, MailOnError: false, @@ -43,23 +43,26 @@ func TestStatusSerialization(t *testing.T) { InfoMail: &digraph.MailConfig{}, SMTP: &digraph.SMTPConfig{}, } - status := NewStatus(dag, nil, scheduler.StatusSuccess, 10000, &start, &end) + requestID := "request-id-testI" + statusToPersist := NewStatusFactory(dag).Create( + requestID, scheduler.StatusSuccess, 0, startedAt, WithFinishedAt(finishedAt), + ) - rawJSON, err := status.ToJSON() + rawJSON, err := json.Marshal(statusToPersist) require.NoError(t, err) - unmarshalled, err := StatusFromJSON(string(rawJSON)) + statusObject, err := StatusFromJSON(string(rawJSON)) require.NoError(t, err) - require.Equal(t, status.Name, unmarshalled.Name) - require.Equal(t, 1, len(unmarshalled.Nodes)) - require.Equal(t, dag.Steps[0].Name, unmarshalled.Nodes[0].Step.Name) + require.Equal(t, statusToPersist.Name, statusObject.Name) + require.Equal(t, 1, len(statusObject.Nodes)) + require.Equal(t, dag.Steps[0].Name, statusObject.Nodes[0].Step.Name) } func TestCorrectRunningStatus(t *testing.T) { dag := &digraph.DAG{Name: "test"} - status := NewStatus(dag, nil, scheduler.StatusRunning, - 10000, nil, nil) + requestID := "request-id-testII" + status := NewStatusFactory(dag).Create(requestID, scheduler.StatusRunning, 0, time.Now()) status.CorrectRunningStatus() require.Equal(t, scheduler.StatusError, status.Status) }