Skip to content

Commit

Permalink
persistence: refactor: Avoid persisting unnecessary data of DAG steps (
Browse files Browse the repository at this point in the history
  • Loading branch information
yohamta authored Dec 27, 2024
1 parent bb2c8ac commit ff64bef
Show file tree
Hide file tree
Showing 26 changed files with 292 additions and 233 deletions.
2 changes: 1 addition & 1 deletion cmd/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func executeRetry(ctx context.Context, execCtx *executionContext, cfg *config.Co
logFile.Name(),
cli,
execCtx.dataStore,
&agent.Options{RetryTarget: execCtx.originalState.Status},
&agent.Options{RetryTarget: &execCtx.originalState.Status},
)

listenSignals(ctx, agt)
Expand Down
47 changes: 17 additions & 30 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package agent

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -222,7 +223,7 @@ func (a *Agent) PrintSummary(ctx context.Context) {
}

// Status collects the current running status of the DAG and returns it.
func (a *Agent) Status() *model.Status {
func (a *Agent) Status() model.Status {
// Lock to avoid race condition.
a.lock.RLock()
defer a.lock.RUnlock()
Expand All @@ -234,34 +235,20 @@ func (a *Agent) Status() *model.Status {
}

// Create the status object to record the current status.
status := &model.Status{
RequestID: a.requestID,
Name: a.dag.Name,
Status: schedulerStatus,
StatusText: schedulerStatus.String(),
PID: model.PID(os.Getpid()),
Nodes: model.FromNodesOrSteps(a.graph.NodeData(), a.dag.Steps),
StartedAt: model.FormatTime(a.graph.StartAt()),
FinishedAt: model.FormatTime(a.graph.FinishAt()),
Log: a.logFile,
Params: model.Params(a.dag.Params),
}

// Collect the handler nodes.
if node := a.scheduler.HandlerNode(digraph.HandlerOnExit); node != nil {
status.OnExit = model.FromNode(node.Data())
}
if node := a.scheduler.HandlerNode(digraph.HandlerOnSuccess); node != nil {
status.OnSuccess = model.FromNode(node.Data())
}
if node := a.scheduler.HandlerNode(digraph.HandlerOnFailure); node != nil {
status.OnFailure = model.FromNode(node.Data())
}
if node := a.scheduler.HandlerNode(digraph.HandlerOnCancel); node != nil {
status.OnCancel = model.FromNode(node.Data())
}

return status
return model.NewStatusFactory(a.dag).
Create(
a.requestID,
schedulerStatus,
os.Getpid(),
a.graph.StartAt(),
model.WithFinishedAt(a.graph.FinishAt()),
model.WithNodes(a.graph.NodeData()),
model.WithLogFilePath(a.logFile),
model.WithOnExitNode(a.scheduler.HandlerNode(digraph.HandlerOnExit)),
model.WithOnSuccessNode(a.scheduler.HandlerNode(digraph.HandlerOnSuccess)),
model.WithOnFailureNode(a.scheduler.HandlerNode(digraph.HandlerOnFailure)),
model.WithOnCancelNode(a.scheduler.HandlerNode(digraph.HandlerOnCancel)),
)
}

// Signal sends the signal to the processes running
Expand All @@ -284,7 +271,7 @@ func (a *Agent) HandleHTTP(ctx context.Context) sock.HTTPHandlerFunc {
// Return the current status of the execution.
status := a.Status()
status.Status = scheduler.StatusRunning
statusJSON, err := status.ToJSON()
statusJSON, err := json.Marshal(status)
if err != nil {
encodeError(w, err)
return
Expand Down
2 changes: 1 addition & 1 deletion internal/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestAgent_Retry(t *testing.T) {

// Retry the DAG and check if it is successful
dagAgent = dag.Agent(test.WithAgentOptions(&agent.Options{
RetryTarget: status,
RetryTarget: &status,
}))
dagAgent.RunSuccess(t)

Expand Down
8 changes: 4 additions & 4 deletions internal/agent/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func newReporter(sender Sender) *reporter {

// reportStep is a function that reports the status of a step.
func (r *reporter) reportStep(
ctx context.Context, dag *digraph.DAG, status *model.Status, node *scheduler.Node,
ctx context.Context, dag *digraph.DAG, status model.Status, node *scheduler.Node,
) error {
nodeStatus := node.State().Status
if nodeStatus != scheduler.NodeStatusNone {
Expand All @@ -49,7 +49,7 @@ func (r *reporter) reportStep(
}

// report is a function that reports the status of the scheduler.
func (r *reporter) getSummary(_ context.Context, status *model.Status, err error) string {
func (r *reporter) getSummary(_ context.Context, status model.Status, err error) string {
var buf bytes.Buffer
_, _ = buf.Write([]byte("\n"))
_, _ = buf.Write([]byte("Summary ->\n"))
Expand All @@ -61,7 +61,7 @@ func (r *reporter) getSummary(_ context.Context, status *model.Status, err error
}

// send is a function that sends a report mail.
func (r *reporter) send(ctx context.Context, dag *digraph.DAG, status *model.Status, err error) error {
func (r *reporter) send(ctx context.Context, dag *digraph.DAG, status model.Status, err error) error {
if err != nil || status.Status == scheduler.StatusError {
if dag.MailOn != nil && dag.MailOn.Failure {
fromAddress := dag.ErrorMail.From
Expand Down Expand Up @@ -94,7 +94,7 @@ var dagHeader = table.Row{
"Error",
}

func renderDAGSummary(status *model.Status, err error) string {
func renderDAGSummary(status model.Status, err error) string {
dataRow := table.Row{
status.RequestID,
status.Name,
Expand Down
12 changes: 4 additions & 8 deletions internal/agent/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func testErrorMail(t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*model.
dag.MailOn.Failure = true
dag.MailOn.Success = false

_ = rp.send(context.Background(), dag, &model.Status{
_ = rp.send(context.Background(), dag, model.Status{
Status: scheduler.StatusError,
Nodes: nodes,
}, fmt.Errorf("Error"))
Expand All @@ -92,7 +92,7 @@ func testNoErrorMail(t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*mode
dag.MailOn.Failure = false
dag.MailOn.Success = true

err := rp.send(context.Background(), dag, &model.Status{
err := rp.send(context.Background(), dag, model.Status{
Status: scheduler.StatusError,
Nodes: nodes,
}, nil)
Expand All @@ -107,7 +107,7 @@ func testSuccessMail(t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*mode
dag.MailOn.Failure = true
dag.MailOn.Success = true

err := rp.send(context.Background(), dag, &model.Status{
err := rp.send(context.Background(), dag, model.Status{
Status: scheduler.StatusSuccess,
Nodes: nodes,
}, nil)
Expand All @@ -121,11 +121,7 @@ func testSuccessMail(t *testing.T, rp *reporter, dag *digraph.DAG, nodes []*mode
}

func testRenderSummary(t *testing.T, _ *reporter, dag *digraph.DAG, nodes []*model.Node) {
status := &model.Status{
Name: dag.Name,
Status: scheduler.StatusError,
Nodes: nodes,
}
status := model.NewStatusFactory(dag).Create("request-id", scheduler.StatusError, 0, time.Now())
summary := renderDAGSummary(status, errors.New("test error"))
require.Contains(t, summary, "test error")
require.Contains(t, summary, dag.Name)
Expand Down
47 changes: 28 additions & 19 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func New(
}
}

var _ Client = (*client)(nil)

type client struct {
dataStore persistence.DataStores
executable string
Expand Down Expand Up @@ -174,7 +176,9 @@ func (*client) GetCurrentStatus(_ context.Context, dag *digraph.DAG) (*model.Sta
if errors.Is(err, sock.ErrTimeout) {
return nil, err
}
return model.NewStatusDefault(dag), nil
// The DAG is not running so return the default status
status := model.NewStatusFactory(dag).CreateDefault()
return &status, nil
}
return model.StatusFromJSON(ret)
}
Expand All @@ -191,7 +195,7 @@ func (e *client) GetStatusByRequestID(ctx context.Context, dag *digraph.DAG, req
// if the request id is not matched then correct the status
ret.Status.CorrectRunningStatus()
}
return ret.Status, err
return &ret.Status, err
}

func (*client) currentStatus(_ context.Context, dag *digraph.DAG) (*model.Status, error) {
Expand All @@ -203,28 +207,30 @@ func (*client) currentStatus(_ context.Context, dag *digraph.DAG) (*model.Status
return model.StatusFromJSON(ret)
}

func (e *client) GetLatestStatus(ctx context.Context, dag *digraph.DAG) (*model.Status, error) {
func (e *client) GetLatestStatus(ctx context.Context, dag *digraph.DAG) (model.Status, error) {
currStatus, _ := e.currentStatus(ctx, dag)
if currStatus != nil {
return currStatus, nil
return *currStatus, nil
}
status, err := e.dataStore.HistoryStore().ReadStatusToday(ctx, dag.Location)
if errors.Is(err, persistence.ErrNoStatusDataToday) ||
errors.Is(err, persistence.ErrNoStatusData) {
return model.NewStatusDefault(dag), nil
}
if err != nil {
return model.NewStatusDefault(dag), err
status := model.NewStatusFactory(dag).CreateDefault()
if errors.Is(err, persistence.ErrNoStatusDataToday) ||
errors.Is(err, persistence.ErrNoStatusData) {
// No status for today
return status, nil
}
return status, err
}
status.CorrectRunningStatus()
return status, nil
return *status, nil
}

func (e *client) GetRecentHistory(ctx context.Context, dag *digraph.DAG, n int) []*model.StatusFile {
func (e *client) GetRecentHistory(ctx context.Context, dag *digraph.DAG, n int) []model.StatusFile {
return e.dataStore.HistoryStore().ReadStatusRecent(ctx, dag.Location, n)
}

func (e *client) UpdateStatus(ctx context.Context, dag *digraph.DAG, status *model.Status) error {
func (e *client) UpdateStatus(ctx context.Context, dag *digraph.DAG, status model.Status) error {
client := sock.NewClient(dag.SockAddr())
res, err := client.Request("GET", "/status")
if err != nil {
Expand Down Expand Up @@ -256,12 +262,12 @@ func (e *client) DeleteDAG(ctx context.Context, name, loc string) error {
}

func (e *client) GetAllStatus(ctx context.Context) (
statuses []*DAGStatus, errs []string, err error,
statuses []DAGStatus, errs []string, err error,
) {
dagStore := e.dataStore.DAGStore()
dagList, errs, err := dagStore.List(ctx)

var ret []*DAGStatus
var ret []DAGStatus
for _, d := range dagList {
status, err := e.readStatus(ctx, d)
if err != nil {
Expand All @@ -277,13 +283,12 @@ func (e *client) getPageCount(total int, limit int) int {
return (total-1)/(limit) + 1
}

func (e *client) GetAllStatusPagination(ctx context.Context, params dags.ListDagsParams) ([]*DAGStatus, *DagListPaginationSummaryResult, error) {
func (e *client) GetAllStatusPagination(ctx context.Context, params dags.ListDagsParams) ([]DAGStatus, *DagListPaginationSummaryResult, error) {
var (
dagListPaginationResult *persistence.DagListPaginationResult
err error
dagStore = e.dataStore.DAGStore()
dagStatusList = make([]*DAGStatus, 0)
currentStatus *DAGStatus
dagStatusList = make([]DAGStatus, 0)
)

page := 1
Expand All @@ -305,6 +310,10 @@ func (e *client) GetAllStatusPagination(ctx context.Context, params dags.ListDag
}

for _, currentDag := range dagListPaginationResult.DagList {
var (
currentStatus DAGStatus
err error
)
if currentStatus, err = e.readStatus(ctx, currentDag); err != nil {
dagListPaginationResult.ErrorList = append(dagListPaginationResult.ErrorList, err.Error())
}
Expand All @@ -323,7 +332,7 @@ func (e *client) getDAG(ctx context.Context, name string) (*digraph.DAG, error)
return e.emptyDAGIfNil(dagDetail, name), err
}

func (e *client) GetStatus(ctx context.Context, id string) (*DAGStatus, error) {
func (e *client) GetStatus(ctx context.Context, id string) (DAGStatus, error) {
dag, err := e.getDAG(ctx, id)
if dag == nil {
// TODO: fix not to use location
Expand All @@ -344,7 +353,7 @@ func (e *client) ToggleSuspend(_ context.Context, id string, suspend bool) error
return flagStore.ToggleSuspend(id, suspend)
}

func (e *client) readStatus(ctx context.Context, dag *digraph.DAG) (*DAGStatus, error) {
func (e *client) readStatus(ctx context.Context, dag *digraph.DAG) (DAGStatus, error) {
latestStatus, err := e.GetLatestStatus(ctx, dag)
id := strings.TrimSuffix(
filepath.Base(dag.Location),
Expand Down
31 changes: 18 additions & 13 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package client_test

import (
"encoding/json"
"fmt"
"net/http"
"path/filepath"
Expand All @@ -29,14 +30,20 @@ func TestClient_GetStatus(t *testing.T) {
dag := th.LoadDAGFile(t, "valid.yaml")
ctx := th.Context

requestID := fmt.Sprintf("request-id-%d", time.Now().Unix())
socketServer, _ := sock.NewServer(
dag.SockAddr(),
func(w http.ResponseWriter, _ *http.Request) {
status := model.NewStatus(dag.DAG, nil,
scheduler.StatusRunning, 0, nil, nil)
status := model.NewStatusFactory(dag.DAG).Create(
requestID, scheduler.StatusRunning, 0, time.Now(),
)
w.WriteHeader(http.StatusOK)
b, _ := status.ToJSON()
_, _ = w.Write(b)
jsonData, err := json.Marshal(status)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
_, _ = w.Write(jsonData)
},
)

Expand Down Expand Up @@ -83,9 +90,9 @@ func TestClient_GetStatus(t *testing.T) {
_ = historyStore.Close(ctx)

// Get the status and check if it is the same as the one we wrote.
status, err = cli.GetStatusByRequestID(ctx, dag.DAG, requestID)
statusToCheck, err := cli.GetStatusByRequestID(ctx, dag.DAG, requestID)
require.NoError(t, err)
require.Equal(t, scheduler.NodeStatusSuccess, status.Nodes[0].Status)
require.Equal(t, scheduler.NodeStatusSuccess, statusToCheck.Nodes[0].Status)

// Update the status.
newStatus := scheduler.NodeStatusError
Expand Down Expand Up @@ -305,14 +312,12 @@ func TestClient_ReadHistory(t *testing.T) {
})
}

func testNewStatus(dag *digraph.DAG, requestID string, status scheduler.Status, nodeStatus scheduler.NodeStatus) *model.Status {
nodeData := scheduler.NodeData{
State: scheduler.NodeState{Status: nodeStatus},
}
func testNewStatus(dag *digraph.DAG, requestID string, status scheduler.Status, nodeStatus scheduler.NodeStatus) model.Status {
nodes := []scheduler.NodeData{{State: scheduler.NodeState{Status: nodeStatus}}}
startedAt := model.Time(time.Now())
statusModel := model.NewStatus(dag, []scheduler.NodeData{nodeData}, status, 0, startedAt, nil)
statusModel.RequestID = requestID
return statusModel
return model.NewStatusFactory(dag).Create(
requestID, status, 0, *startedAt, model.WithNodes(nodes),
)
}

func TestClient_GetTagList(t *testing.T) {
Expand Down
Loading

0 comments on commit ff64bef

Please sign in to comment.