From 50b796c9b7ea1b7952c80e946d1af8c95fe34ac8 Mon Sep 17 00:00:00 2001 From: Alexey Evdokimov Date: Mon, 13 Jan 2025 15:37:18 +0400 Subject: [PATCH] fix defs to handle config ARRAYs as Object[] --- .../operations/SplitByAttrsOperation.java | 8 ++++---- .../hadoop/input/ParquetColumnarInput.java | 2 +- .../storage/hadoop/input/TextColumnarInput.java | 17 +++++++++++------ .../hadoop/output/ColumnarParquetOutput.java | 9 +++++++-- .../storage/hadoop/output/HadoopTextOutput.java | 8 ++++++-- .../math/operations/KeyedMathOperation.java | 5 +++-- .../populations/CountUniquesOperation.java | 7 ++++--- 7 files changed, 36 insertions(+), 20 deletions(-) diff --git a/commons/src/main/java/io/github/pastorgl/datacooker/commons/operations/SplitByAttrsOperation.java b/commons/src/main/java/io/github/pastorgl/datacooker/commons/operations/SplitByAttrsOperation.java index ec23f53..841cd67 100644 --- a/commons/src/main/java/io/github/pastorgl/datacooker/commons/operations/SplitByAttrsOperation.java +++ b/commons/src/main/java/io/github/pastorgl/datacooker/commons/operations/SplitByAttrsOperation.java @@ -32,7 +32,7 @@ public class SplitByAttrsOperation extends Operation { private String outputNameTemplate; private String outputDistinctSplits; - private String[] splitAttrs; + private Object[] splitAttrs; @Override public OperationMeta meta() { @@ -47,7 +47,7 @@ public OperationMeta meta() { .build(), new DefinitionMetaBuilder() - .def(SPLIT_ATTRS, "Attributes to split the DataStream by their unique value combinations", String[].class) + .def(SPLIT_ATTRS, "Attributes to split the DataStream by their unique value combinations", Object[].class) .def(SPLIT_TEMPLATE, "Format string for output names' wildcard part. Must contain all split attributes in form of '\\{split_attr\\}'") .build(), @@ -76,7 +76,7 @@ public void configure(Configuration params) throws InvalidConfigurationException splitAttrs = params.get(SPLIT_ATTRS); - for (String attr : splitAttrs) { + for (Object attr : splitAttrs) { if (!outputNameTemplate.contains("{" + attr + "}")) { throw new InvalidConfigurationException("Split output name template '" + outputNameTemplate + "' must include split attribute reference {" + attr + "} for the Operation '" + meta.verb + "'"); @@ -92,7 +92,7 @@ public ListOrderedMap execute() { DataStream input = inputStreams.getValue(0); input.surpassUsages(); - final List _splitColumnNames = Arrays.stream(splitAttrs).collect(Collectors.toList()); + final List _splitColumnNames = Arrays.stream(splitAttrs).map(String::valueOf).collect(Collectors.toList()); JavaPairRDD> distinctSplits = input.rdd .mapPartitionsToPair(it -> { diff --git a/commons/src/main/java/io/github/pastorgl/datacooker/storage/hadoop/input/ParquetColumnarInput.java b/commons/src/main/java/io/github/pastorgl/datacooker/storage/hadoop/input/ParquetColumnarInput.java index fd39db6..a28a589 100644 --- a/commons/src/main/java/io/github/pastorgl/datacooker/storage/hadoop/input/ParquetColumnarInput.java +++ b/commons/src/main/java/io/github/pastorgl/datacooker/storage/hadoop/input/ParquetColumnarInput.java @@ -36,7 +36,7 @@ public InputAdapterMeta meta() { " will be split to different streams", Boolean.class, false, "By default, don't split") .def(COLUMNS, "Columns to select from the built-in schema", - String[].class, null, "By default, don't select columns from the schema") + Object[].class, null, "By default, don't select columns from the schema") .build() ); } diff --git a/commons/src/main/java/io/github/pastorgl/datacooker/storage/hadoop/input/TextColumnarInput.java b/commons/src/main/java/io/github/pastorgl/datacooker/storage/hadoop/input/TextColumnarInput.java index 79c1322..957a4a6 100644 --- a/commons/src/main/java/io/github/pastorgl/datacooker/storage/hadoop/input/TextColumnarInput.java +++ b/commons/src/main/java/io/github/pastorgl/datacooker/storage/hadoop/input/TextColumnarInput.java @@ -19,8 +19,8 @@ import java.util.*; import java.util.stream.Collectors; -import static io.github.pastorgl.datacooker.data.ObjLvl.VALUE; import static io.github.pastorgl.datacooker.Constants.UNDERSCORE; +import static io.github.pastorgl.datacooker.data.ObjLvl.VALUE; import static io.github.pastorgl.datacooker.storage.hadoop.HadoopStorage.COLUMNS; import static io.github.pastorgl.datacooker.storage.hadoop.HadoopStorage.DELIMITER; @@ -51,11 +51,11 @@ public InputAdapterMeta meta() { .def(SCHEMA_DEFAULT, "Loose schema for delimited text (just column names," + " optionally with placeholders to skip some, denoted by underscores _)." + " Required if " + SCHEMA_FROM_FILE + " is set to false", - String[].class, null, "By default, don't set the schema") + Object[].class, null, "By default, don't set the schema") .def(DELIMITER, "Column delimiter for delimited text", String.class, "\t", "By default, tabulation character") .def(COLUMNS, "Columns to select from the schema", - String[].class, null, "By default, don't select columns from the schema") + Object[].class, null, "By default, don't select columns from the schema") .build() ); } @@ -68,15 +68,20 @@ protected void configure(Configuration params) throws InvalidConfigurationExcept schemaFromFile = params.get(SCHEMA_FROM_FILE); if (!schemaFromFile) { - schemaDefault = params.get(SCHEMA_DEFAULT); + Object[] schDef = params.get(SCHEMA_DEFAULT); - if (schemaDefault == null) { + if (schDef == null) { throw new InvalidConfigurationException("Neither '" + SCHEMA_FROM_FILE + "' is true nor '" + SCHEMA_DEFAULT + "' is specified for Input Adapter '" + meta.verb + "'"); + } else { + schemaDefault = Arrays.stream(schDef).map(String::valueOf).toArray(String[]::new); } } - dsColumns = params.get(COLUMNS); + Object[] cols = params.get(COLUMNS); + if (cols != null) { + dsColumns = Arrays.stream(cols).map(String::valueOf).toArray(String[]::new); + } } @Override diff --git a/commons/src/main/java/io/github/pastorgl/datacooker/storage/hadoop/output/ColumnarParquetOutput.java b/commons/src/main/java/io/github/pastorgl/datacooker/storage/hadoop/output/ColumnarParquetOutput.java index 1fdda1b..e2d32da 100644 --- a/commons/src/main/java/io/github/pastorgl/datacooker/storage/hadoop/output/ColumnarParquetOutput.java +++ b/commons/src/main/java/io/github/pastorgl/datacooker/storage/hadoop/output/ColumnarParquetOutput.java @@ -14,6 +14,8 @@ import java.io.IOException; import java.io.StringWriter; +import java.util.Arrays; +import java.util.stream.Collectors; import static io.github.pastorgl.datacooker.storage.hadoop.HadoopStorage.*; @@ -33,7 +35,7 @@ public OutputAdapterMeta meta() { .def(CODEC, "Codec to compress the output", Codec.class, Codec.NONE, "By default, use no compression") .def(COLUMNS, "Columns to write", - String[].class, null, "By default, select all columns") + Object[].class, null, "By default, select all columns") .build() ); } @@ -41,7 +43,10 @@ public OutputAdapterMeta meta() { protected void configure(Configuration params) throws InvalidConfigurationException { super.configure(params); - columns = params.get(COLUMNS); + Object[] cols = params.get(COLUMNS); + if (cols != null) { + columns = Arrays.stream(cols).map(String::valueOf).toArray(String[]::new); + } } @Override diff --git a/commons/src/main/java/io/github/pastorgl/datacooker/storage/hadoop/output/HadoopTextOutput.java b/commons/src/main/java/io/github/pastorgl/datacooker/storage/hadoop/output/HadoopTextOutput.java index 190139d..70408b1 100644 --- a/commons/src/main/java/io/github/pastorgl/datacooker/storage/hadoop/output/HadoopTextOutput.java +++ b/commons/src/main/java/io/github/pastorgl/datacooker/storage/hadoop/output/HadoopTextOutput.java @@ -14,6 +14,7 @@ import java.io.IOException; import java.io.StringWriter; +import java.util.Arrays; import static io.github.pastorgl.datacooker.storage.hadoop.HadoopStorage.*; @@ -33,7 +34,7 @@ public OutputAdapterMeta meta() { .def(CODEC, "Codec to compress the output", Codec.class, Codec.NONE, "By default, use no compression") .def(COLUMNS, "Columns to write", - String[].class, null, "By default, select all columns") + Object[].class, null, "By default, select all columns") .def(DELIMITER, "Record column delimiter", String.class, "\t", "By default, tabulation character") .build() @@ -43,7 +44,10 @@ public OutputAdapterMeta meta() { protected void configure(Configuration params) throws InvalidConfigurationException { super.configure(params); - columns = params.get(COLUMNS); + Object[] cols = params.get(COLUMNS); + if (cols != null) { + columns = Arrays.stream(cols).map(String::valueOf).toArray(String[]::new); + } delimiter = params.get(DELIMITER); } diff --git a/math/src/main/java/io/github/pastorgl/datacooker/math/operations/KeyedMathOperation.java b/math/src/main/java/io/github/pastorgl/datacooker/math/operations/KeyedMathOperation.java index c8cd360..e2777bc 100644 --- a/math/src/main/java/io/github/pastorgl/datacooker/math/operations/KeyedMathOperation.java +++ b/math/src/main/java/io/github/pastorgl/datacooker/math/operations/KeyedMathOperation.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import static io.github.pastorgl.datacooker.data.ObjLvl.VALUE; @@ -48,7 +49,7 @@ public OperationMeta meta() { .build(), new DefinitionMetaBuilder() - .def(CALC_RESULTS, "List of resulting column names", String[].class) + .def(CALC_RESULTS, "List of resulting column names", Object[].class) .dynDef(SOURCE_ATTR_PREFIX, "Column with Double values to use as series source", String.class) .dynDef(CALC_FUNCTION_PREFIX, "The mathematical function to perform over the series", KeyedMath.class) .dynDef(CALC_CONST_PREFIX, "An optional constant value for the selected function", Double.class) @@ -65,7 +66,7 @@ public OperationMeta meta() { @Override public void configure(Configuration params) throws InvalidConfigurationException { - resultingColumns = params.get(CALC_RESULTS); + resultingColumns = Arrays.stream((Object[]) params.get(CALC_RESULTS)).map(String::valueOf).toArray(String[]::new); sourceAttrs = new String[resultingColumns.length]; keyedFunctions = new KeyedFunction[resultingColumns.length]; diff --git a/populations/src/main/java/io/github/pastorgl/datacooker/populations/CountUniquesOperation.java b/populations/src/main/java/io/github/pastorgl/datacooker/populations/CountUniquesOperation.java index e4238f5..b259d19 100644 --- a/populations/src/main/java/io/github/pastorgl/datacooker/populations/CountUniquesOperation.java +++ b/populations/src/main/java/io/github/pastorgl/datacooker/populations/CountUniquesOperation.java @@ -16,6 +16,7 @@ import scala.Tuple2; import java.util.*; +import java.util.stream.Collectors; import static io.github.pastorgl.datacooker.data.ObjLvl.VALUE; @@ -23,7 +24,7 @@ public class CountUniquesOperation extends Operation { static final String COUNT_ATTRS = "count_attrs"; - protected String[] countAttrs; + protected Object[] countAttrs; @Override public OperationMeta meta() { @@ -37,7 +38,7 @@ public OperationMeta meta() { .build(), new DefinitionMetaBuilder() - .def(COUNT_ATTRS, "Attributes to count unique values under same keys", String[].class) + .def(COUNT_ATTRS, "Attributes to count unique values under same keys", Object[].class) .build(), new PositionalStreamsMetaBuilder() @@ -60,7 +61,7 @@ public ListOrderedMap execute() { throw new InvalidConfigurationException("Operation '" + meta.verb + "' requires same amount of INPUT and OUTPUT streams"); } - final List outputColumns = Arrays.asList(countAttrs); + final List outputColumns = Arrays.stream(countAttrs).map(String::valueOf).collect(Collectors.toList()); final int l = countAttrs.length; ListOrderedMap outputs = new ListOrderedMap<>();