Skip to content

Commit

Permalink
executor: Enable command substitution and environment variables for `…
Browse files Browse the repository at this point in the history
…ssh` and `subworkflow` executor config (#748)
  • Loading branch information
yohamta authored Dec 26, 2024
1 parent 6ca7f2a commit bb2c8ac
Show file tree
Hide file tree
Showing 39 changed files with 1,769 additions and 1,786 deletions.
4 changes: 2 additions & 2 deletions cmd/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (td *testDAG) AssertCurrentStatus(t *testing.T, expected scheduler.Status)
dag, err := digraph.Load(td.Context, td.Config.Paths.BaseConfig, td.Path, "")
require.NoError(t, err)

cli := td.Client()
cli := td.Client
require.Eventually(t, func() bool {
status, err := cli.GetCurrentStatus(td.Context, dag)
require.NoError(t, err)
Expand All @@ -59,7 +59,7 @@ func (td *testDAG) AssertCurrentStatus(t *testing.T, expected scheduler.Status)
func (th *testDAG) AssertLastStatus(t *testing.T, expected scheduler.Status) {
t.Helper()

hs := th.DataStore().HistoryStore()
hs := th.DataStores.HistoryStore()
require.Eventually(t, func() bool {
status := hs.ReadStatusRecent(th.Context, th.Path, 1)
if len(status) < 1 {
Expand Down
2 changes: 1 addition & 1 deletion cmd/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestRetryCommand(t *testing.T) {
th.RunCommand(t, startCmd(), cmdTest{args: args})

// Find the request ID.
cli := th.Client()
cli := th.Client
ctx := context.Background()
status, err := cli.GetStatus(ctx, dagFile.Path)
require.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions cmd/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import (
)

func TestStartCommand(t *testing.T) {
t.Parallel()

th := testSetup(t)

tests := []cmdTest{
{
name: "StartDAG",
Expand Down
2 changes: 1 addition & 1 deletion cmd/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestStatusCommand(t *testing.T) {
close(done)
}()

hs := th.DataStore().HistoryStore()
hs := th.DataStores.HistoryStore()
require.Eventually(t, func() bool {
status := hs.ReadStatusRecent(th.Context, dagFile.Path, 1)
if len(status) < 1 {
Expand Down
20 changes: 10 additions & 10 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (a *Agent) Run(ctx context.Context) error {

// Check if the DAG is already running.
if err := a.checkIsAlreadyRunning(ctx); err != nil {
a.scheduler.Cancel(ctx, a.graph)
return err
}

Expand Down Expand Up @@ -203,7 +204,7 @@ func (a *Agent) Run(ctx context.Context) error {

// Send the execution report if necessary.
a.lastErr = lastErr
if err := a.reporter.send(a.dag, finishedStatus, lastErr); err != nil {
if err := a.reporter.send(ctx, a.dag, finishedStatus, lastErr); err != nil {
logger.Error(ctx, "Mail notification failed", "err", err)
}

Expand Down Expand Up @@ -314,14 +315,13 @@ func (a *Agent) setup(ctx context.Context) error {
defer a.lock.Unlock()

a.scheduler = a.newScheduler()
a.reporter = newReporter(
mailer.New(mailer.Config{
Host: a.dag.SMTP.Host,
Port: a.dag.SMTP.Port,
Username: a.dag.SMTP.Username,
Password: a.dag.SMTP.Password,
}),
)
mailer := mailer.New(mailer.Config{
Host: a.dag.SMTP.Host,
Port: a.dag.SMTP.Port,
Username: a.dag.SMTP.Username,
Password: a.dag.SMTP.Password,
})
a.reporter = newReporter(mailer)

return a.setupGraph(ctx)
}
Expand Down Expand Up @@ -482,7 +482,7 @@ func (a *Agent) checkPreconditions(ctx context.Context) error {
// If one of the conditions does not met, cancel the execution.
if err := digraph.EvalConditions(a.dag.Preconditions); err != nil {
logger.Error(ctx, "Preconditions are not met", "err", err)
a.scheduler.Cancel(a.graph)
a.scheduler.Cancel(ctx, a.graph)
return err
}
return nil
Expand Down
Loading

0 comments on commit bb2c8ac

Please sign in to comment.