diff --git a/README.md b/README.md index 93e6295..41139f3 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,47 @@ _export: +show3: echo>: "hoge: ${typeof(hoge) == 'undefined' ? 'None' : hoge}, a.b: ${typeof(a) == 'undefined' ? 'None' : typeof(a.b) == 'undefined' ? 'None' : a.b}" ++eval: + _export: + a: aaa + b: bbb + c: ccc + d: ${a}-${b}-${c} + + +a: + _export: + e: ${d} + f: + g: ${d} + h: + - ${d} + +b: + echo>: ${d} + +c: + echo>: ${e} + +d: + echo>: ${f.g} + +e: + param_eval>: f.g + +f: + echo>: ${f.g} + +g: + +h: + for_each>: {i: "${f.h}"} + _do: + echo>: ${i} + +i: + echo>: ${f.h} + +j: + param_eval>: f.h + +k: + +l: + for_each>: {i: "${f.h}"} + _do: + echo>: ${i} + +m: + echo>: ${f.h} + ``` # Configuration @@ -56,6 +97,13 @@ _export: - **param_reset>**: Param name to reset. (string, required) +## Configuration for `param_eval>` operator + +### Options + +- **param_eval>**: Param name to eval. (string, required) + - **NOTE**: This operator is a workaround for the issue: [Exported vars are not evaluated recursively in the context of nested params](https://github.com/treasure-data/digdag/issues/862) + # Development ## Run an Example diff --git a/example/example.dig b/example/example.dig index 102590b..0fee7af 100644 --- a/example/example.dig +++ b/example/example.dig @@ -27,3 +27,44 @@ _export: +show3: echo>: "hoge: ${typeof(hoge) == 'undefined' ? 'None' : hoge}, a.b: ${typeof(a) == 'undefined' ? 'None' : typeof(a.b) == 'undefined' ? 'None' : a.b}" ++eval: + _export: + a: aaa + b: bbb + c: ccc + d: ${a}-${b}-${c} + + +a: + _export: + e: ${d} + f: + g: ${d} + h: + - ${d} + +b: + echo>: ${d} + +c: + echo>: ${e} + +d: + echo>: ${f.g} + +e: + param_eval>: f.g + +f: + echo>: ${f.g} + +g: + +h: + for_each>: {i: "${f.h}"} + _do: + echo>: ${i} + +i: + echo>: ${f.h} + +j: + param_eval>: f.h + +k: + +l: + for_each>: {i: "${f.h}"} + _do: + echo>: ${i} + +m: + echo>: ${f.h} + diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/param/ParamPlugin.scala b/src/main/scala/pro/civitaspo/digdag/plugin/param/ParamPlugin.scala index bd0f275..3297486 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/param/ParamPlugin.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/param/ParamPlugin.scala @@ -3,23 +3,30 @@ package pro.civitaspo.digdag.plugin.param import java.lang.reflect.Constructor import java.util.{Arrays => JArrays, List => JList} -import io.digdag.spi.{Operator, OperatorContext, OperatorFactory, OperatorProvider, Plugin} -import pro.civitaspo.digdag.plugin.param.operator.{AbstractParamOperator, ParamResetOperator, ParamStoreOperator} +import io.digdag.spi.{Operator, OperatorContext, OperatorFactory, OperatorProvider, Plugin, TemplateEngine} +import javax.inject.Inject +import pro.civitaspo.digdag.plugin.param.operator.{AbstractParamOperator, ParamEvalOperator, ParamResetOperator, ParamStoreOperator} object ParamPlugin { class AthenaOperatorProvider extends OperatorProvider { + @Inject protected var templateEngine: TemplateEngine = null + override def get(): JList[OperatorFactory] = { - JArrays.asList(operatorFactory("param_reset", classOf[ParamResetOperator]), operatorFactory("param_store", classOf[ParamStoreOperator])) + JArrays.asList( + operatorFactory("param_reset", classOf[ParamResetOperator]), + operatorFactory("param_store", classOf[ParamStoreOperator]), + operatorFactory("param_eval", classOf[ParamEvalOperator]) + ) } private def operatorFactory[T <: AbstractParamOperator](operatorName: String, klass: Class[T]): OperatorFactory = { new OperatorFactory { override def getType: String = operatorName override def newOperator(context: OperatorContext): Operator = { - val constructor: Constructor[T] = klass.getConstructor(classOf[String], classOf[OperatorContext]) - constructor.newInstance(operatorName, context) + val constructor: Constructor[T] = klass.getConstructor(classOf[String], classOf[OperatorContext], classOf[TemplateEngine]) + constructor.newInstance(operatorName, context, templateEngine) } } } diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/param/operator/AbstractParamOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/param/operator/AbstractParamOperator.scala index c6ba46a..eb9f12b 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/param/operator/AbstractParamOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/param/operator/AbstractParamOperator.scala @@ -1,13 +1,14 @@ package pro.civitaspo.digdag.plugin.param.operator -import io.digdag.client.config.Config -import io.digdag.spi.OperatorContext +import io.digdag.client.config.{Config, ConfigFactory} +import io.digdag.spi.{OperatorContext, TemplateEngine} import io.digdag.util.BaseOperator import org.slf4j.{Logger, LoggerFactory} -abstract class AbstractParamOperator(operatorName: String, context: OperatorContext) extends BaseOperator(context) { +abstract class AbstractParamOperator(operatorName: String, context: OperatorContext, templateEngine: TemplateEngine) extends BaseOperator(context) { protected val logger: Logger = LoggerFactory.getLogger(operatorName) + protected val cf: ConfigFactory = request.getConfig.getFactory protected val params: Config = request.getConfig } diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/param/operator/ParamEvalOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/param/operator/ParamEvalOperator.scala new file mode 100644 index 0000000..056b74f --- /dev/null +++ b/src/main/scala/pro/civitaspo/digdag/plugin/param/operator/ParamEvalOperator.scala @@ -0,0 +1,48 @@ +package pro.civitaspo.digdag.plugin.param.operator +import java.nio.charset.StandardCharsets.UTF_8 + +import com.google.common.collect.ImmutableList +import io.digdag.client.config.{Config, ConfigKey} +import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine} + +class ParamEvalOperator(operatorName: String, context: OperatorContext, templateEngine: TemplateEngine) + extends AbstractParamOperator(operatorName, context, templateEngine) { + + protected val key: String = params.get("_command", classOf[String]) + + override def runTask(): TaskResult = { + val evaluated: Config = eval(params) + val paramsToStore: Config = cf.create() + + val elems: Seq[String] = key.split("\\.") + val parents: Seq[String] = elems.reverse.tail.reverse + val child: String = elems.last + + if (parents.isEmpty) paramsToStore.set(child, evaluated.get(child, classOf[Object])) + else { + val getter = parents.foldLeft(evaluated) { (nested: Config, k: String) => + nested.getNested(k) + } + val setter = parents.foldLeft(paramsToStore) { (nested: Config, k: String) => + nested.getNestedOrSetEmpty(k) + } + setter.set(child, getter.get(child, classOf[Object])) + } + + val builder = TaskResult.defaultBuilder(cf) + builder.resetStoreParams(ImmutableList.of(ConfigKey.parse(key))) + builder.storeParams(paramsToStore) + builder.build() + } + + protected def eval(params: Config): Config = { + val tmpFile: String = workspace.createTempFile("param_eval", ".json") + + val writer = workspace.newBufferedWriter(tmpFile, UTF_8) + try writer.write(params.toString) + finally writer.close() + + val content = workspace.templateFile(templateEngine, tmpFile, UTF_8, params) + cf.fromJsonString(content) + } +} diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/param/operator/ParamResetOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/param/operator/ParamResetOperator.scala index b4f494f..42fa291 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/param/operator/ParamResetOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/param/operator/ParamResetOperator.scala @@ -2,9 +2,10 @@ package pro.civitaspo.digdag.plugin.param.operator import com.google.common.collect.ImmutableList import io.digdag.client.config.ConfigKey -import io.digdag.spi.{OperatorContext, TaskResult} +import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine} -class ParamResetOperator(operatorName: String, context: OperatorContext) extends AbstractParamOperator(operatorName, context) { +class ParamResetOperator(operatorName: String, context: OperatorContext, templateEngine: TemplateEngine) + extends AbstractParamOperator(operatorName, context, templateEngine) { protected val resetKey: String = params.get("_command", classOf[String]) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/param/operator/ParamStoreOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/param/operator/ParamStoreOperator.scala index c84294b..a77d042 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/param/operator/ParamStoreOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/param/operator/ParamStoreOperator.scala @@ -1,9 +1,10 @@ package pro.civitaspo.digdag.plugin.param.operator import io.digdag.client.config.Config -import io.digdag.spi.{OperatorContext, TaskResult} +import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine} -class ParamStoreOperator(operatorName: String, context: OperatorContext) extends AbstractParamOperator(operatorName, context) { +class ParamStoreOperator(operatorName: String, context: OperatorContext, templateEngine: TemplateEngine) + extends AbstractParamOperator(operatorName, context, templateEngine) { protected val newParams: Config = params.get("_command", classOf[Config])