Skip to content

Commit

Permalink
dag: enable parameters as map[string]string
Browse files Browse the repository at this point in the history
  • Loading branch information
yohamta committed Dec 30, 2024
1 parent d0e2732 commit c5adbdd
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 47 deletions.
188 changes: 142 additions & 46 deletions internal/digraph/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,72 +10,163 @@ import (
"regexp"
"strconv"
"strings"

"github.com/dagu-org/dagu/internal/cmdutil"
)

// buildParams builds the parameters for the DAG.
func buildParams(ctx BuildContext, spec *definition, dag *DAG) (err error) {
dag.DefaultParams = spec.Params
func buildParams(ctx BuildContext, spec *definition, dag *DAG) error {
var (
paramPairs []paramPair
envs []string
)

if err := parseParams(ctx, spec.Params, &paramPairs, &envs); err != nil {
return err
}

// Create default parameters string in the form of "key=value key=value ..."
var paramsToJoin []string
for _, paramPair := range paramPairs {
paramsToJoin = append(paramsToJoin, paramPair.Escaped())
}
dag.DefaultParams = strings.Join(paramsToJoin, " ")

params := dag.DefaultParams
if ctx.opts.parameters != "" {
params = ctx.opts.parameters
// Parse the parameters from the command line and override the default parameters
var (
overridePairs []paramPair
overrideEnvs []string
)
if err := parseParams(ctx, ctx.opts.parameters, &overridePairs, &overrideEnvs); err != nil {
return err
}
// Override the default parameters with the command line parameters
pairsIndex := make(map[string]int)
for i, paramPair := range paramPairs {
if paramPair.name != "" {
pairsIndex[paramPair.name] = i
}
}
for i, paramPair := range overridePairs {
if paramPair.name == "" {
// For positional parameters
if i < len(paramPairs) {
paramPairs[i] = paramPair
} else {
paramPairs = append(paramPairs, paramPair)
}
continue
}

if foundIndex, ok := pairsIndex[paramPair.name]; ok {
paramPairs[foundIndex] = paramPair
} else {
paramPairs = append(paramPairs, paramPair)
}
}

envsIndex := make(map[string]int)
for i, env := range envs {
envsIndex[env] = i
}
for _, env := range overrideEnvs {
if i, ok := envsIndex[env]; !ok {
envs = append(envs, env)
} else {
envs[i] = env
}
}
}

var envs []string
dag.Params, envs, err = parseParams(ctx, params)
if err == nil {
dag.Env = append(dag.Env, envs...)
// Convert the parameters to a string in the form of "key=value"
var paramStrs []string
for _, paramPair := range paramPairs {
paramStrs = append(paramStrs, paramPair.String())
}

return
// Set the parameters as environment variables for the command
dag.Env = append(dag.Env, envs...)
dag.Params = append(dag.Params, paramStrs...)

return nil
}

// parseParams parses and processes the parameters for the DAG.
func parseParams(ctx BuildContext, value string) (
params []string,
envs []string,
err error,
) {
var parsedParams []paramPair

parsedParams, err = parseParamValue(ctx, value)
func parseParams(ctx BuildContext, value any, params *[]paramPair, envs *[]string) error {
var parsedPairs []paramPair

parsedPairs, err := parseParamValue(ctx, value)
if err != nil {
return
return fmt.Errorf("%w: %s", errInvalidParamValue, err)
}

var ret []string
for i, p := range parsedParams {
for index, paramPair := range parsedPairs {
if !ctx.opts.noEval {
p.value = os.ExpandEnv(p.value)
paramPair.value = os.ExpandEnv(paramPair.value)
}

strParam := stringifyParam(p)
ret = append(ret, strParam)
*params = append(*params, paramPair)

if p.name == "" {
strParam = p.value
}
paramStr := paramPair.String()

if err = os.Setenv(strconv.Itoa(i+1), strParam); err != nil {
return
// Set the parameter as an environment variable for the command
// $1, $2, $3, ...
if err := os.Setenv(strconv.Itoa(index+1), paramStr); err != nil {
return fmt.Errorf("failed to set environment variable: %w", err)
}

if !ctx.opts.noEval && p.name != "" {
envs = append(envs, strParam)
err = os.Setenv(p.name, p.value)
if err != nil {
return
if !ctx.opts.noEval && paramPair.name != "" {
*envs = append(*envs, paramStr)
if err := os.Setenv(paramPair.name, paramPair.value); err != nil {
return fmt.Errorf("failed to set environment variable: %w", err)
}
}
}

return ret, envs, nil
return nil
}

// parseParamValue parses the parameters for the DAG.
func parseParamValue(
ctx BuildContext, input string,
) ([]paramPair, error) {
func parseParamValue(ctx BuildContext, input any) ([]paramPair, error) {
switch v := input.(type) {
case nil:
return nil, nil

case string:
return parseStringParams(ctx, v)

case []map[string]string:
return parseMapParams(ctx, v)

default:
return nil, fmt.Errorf("%w: %T", errInvalidParamValue, v)

}
}

func parseMapParams(ctx BuildContext, input []map[string]string) ([]paramPair, error) {
var params []paramPair

for _, m := range input {
for name, value := range m {
if !ctx.opts.noEval {
parsed, err := cmdutil.SubstituteWithEnvExpand(value)
if err != nil {
return nil, fmt.Errorf("%w: %s", errInvalidParamValue, err)
}
value = parsed
}

paramPair := paramPair{name, value}
params = append(params, paramPair)
}
}

return params, nil
}

func parseStringParams(ctx BuildContext, input string) ([]paramPair, error) {
paramRegex := regexp.MustCompile(
`(?:([^\s=]+)=)?("(?:\\"|[^"])*"|` + "`(" + `?:\\"|[^"]*)` + "`" + `|[^"\s]+)`,
)
Expand Down Expand Up @@ -125,20 +216,25 @@ func parseParamValue(
return params, nil
}

// stringifyParam converts a paramPair to a string representation.
func stringifyParam(param paramPair) string {
if param.name != "" {
return fmt.Sprintf("%s=%s", param.name, param.value)
}
return param.value
}

// paramPair represents a key-value pair for the parameters.
type paramPair struct {
name string
value string
}

func (p paramPair) String() string {
if p.name != "" {
return fmt.Sprintf("%s=%s", p.name, p.value)
}
return p.value
}

func (p paramPair) Escaped() string {
if p.name != "" {
return fmt.Sprintf("%s=%q", p.name, p.value)
}
return fmt.Sprintf("%q", p.value)
}

var (
// paramRegex is a regex to match the parameters in the command.
paramRegex = regexp.MustCompile(`\$\w+`)
Expand Down
2 changes: 1 addition & 1 deletion internal/digraph/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type definition struct {
// MaxActiveRuns is the maximum number of concurrent steps.
MaxActiveRuns int
// Params is the default parameters for the steps.
Params string
Params any
// MaxCleanUpTimeSec is the maximum time in seconds to clean up the DAG.
// It is a wait time to kill the processes when it is requested to stop.
// If the time is exceeded, the process is killed.
Expand Down

0 comments on commit c5adbdd

Please sign in to comment.