Skip to content

Commit

Permalink
fix defs to handle config ARRAYs as Object[]
Browse files Browse the repository at this point in the history
  • Loading branch information
PastorGL committed Jan 13, 2025
1 parent a8e1f8a commit 50b796c
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class SplitByAttrsOperation extends Operation {
private String outputNameTemplate;
private String outputDistinctSplits;

private String[] splitAttrs;
private Object[] splitAttrs;

@Override
public OperationMeta meta() {
Expand All @@ -47,7 +47,7 @@ public OperationMeta meta() {
.build(),

new DefinitionMetaBuilder()
.def(SPLIT_ATTRS, "Attributes to split the DataStream by their unique value combinations", String[].class)
.def(SPLIT_ATTRS, "Attributes to split the DataStream by their unique value combinations", Object[].class)
.def(SPLIT_TEMPLATE, "Format string for output names' wildcard part. Must contain all split attributes in form of '\\{split_attr\\}'")
.build(),

Expand Down Expand Up @@ -76,7 +76,7 @@ public void configure(Configuration params) throws InvalidConfigurationException

splitAttrs = params.get(SPLIT_ATTRS);

for (String attr : splitAttrs) {
for (Object attr : splitAttrs) {
if (!outputNameTemplate.contains("{" + attr + "}")) {
throw new InvalidConfigurationException("Split output name template '" + outputNameTemplate + "' must include split attribute reference {"
+ attr + "} for the Operation '" + meta.verb + "'");
Expand All @@ -92,7 +92,7 @@ public ListOrderedMap<String, DataStream> execute() {
DataStream input = inputStreams.getValue(0);
input.surpassUsages();

final List<String> _splitColumnNames = Arrays.stream(splitAttrs).collect(Collectors.toList());
final List<String> _splitColumnNames = Arrays.stream(splitAttrs).map(String::valueOf).collect(Collectors.toList());

JavaPairRDD<Object, DataRecord<?>> distinctSplits = input.rdd
.mapPartitionsToPair(it -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public InputAdapterMeta meta() {
" will be split to different streams", Boolean.class, false,
"By default, don't split")
.def(COLUMNS, "Columns to select from the built-in schema",
String[].class, null, "By default, don't select columns from the schema")
Object[].class, null, "By default, don't select columns from the schema")
.build()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import java.util.*;
import java.util.stream.Collectors;

import static io.github.pastorgl.datacooker.data.ObjLvl.VALUE;
import static io.github.pastorgl.datacooker.Constants.UNDERSCORE;
import static io.github.pastorgl.datacooker.data.ObjLvl.VALUE;
import static io.github.pastorgl.datacooker.storage.hadoop.HadoopStorage.COLUMNS;
import static io.github.pastorgl.datacooker.storage.hadoop.HadoopStorage.DELIMITER;

Expand Down Expand Up @@ -51,11 +51,11 @@ public InputAdapterMeta meta() {
.def(SCHEMA_DEFAULT, "Loose schema for delimited text (just column names," +
" optionally with placeholders to skip some, denoted by underscores _)." +
" Required if " + SCHEMA_FROM_FILE + " is set to false",
String[].class, null, "By default, don't set the schema")
Object[].class, null, "By default, don't set the schema")
.def(DELIMITER, "Column delimiter for delimited text",
String.class, "\t", "By default, tabulation character")
.def(COLUMNS, "Columns to select from the schema",
String[].class, null, "By default, don't select columns from the schema")
Object[].class, null, "By default, don't select columns from the schema")
.build()
);
}
Expand All @@ -68,15 +68,20 @@ protected void configure(Configuration params) throws InvalidConfigurationExcept

schemaFromFile = params.get(SCHEMA_FROM_FILE);
if (!schemaFromFile) {
schemaDefault = params.get(SCHEMA_DEFAULT);
Object[] schDef = params.get(SCHEMA_DEFAULT);

if (schemaDefault == null) {
if (schDef == null) {
throw new InvalidConfigurationException("Neither '" + SCHEMA_FROM_FILE + "' is true nor '"
+ SCHEMA_DEFAULT + "' is specified for Input Adapter '" + meta.verb + "'");
} else {
schemaDefault = Arrays.stream(schDef).map(String::valueOf).toArray(String[]::new);
}
}

dsColumns = params.get(COLUMNS);
Object[] cols = params.get(COLUMNS);
if (cols != null) {
dsColumns = Arrays.stream(cols).map(String::valueOf).toArray(String[]::new);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import java.io.IOException;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.stream.Collectors;

import static io.github.pastorgl.datacooker.storage.hadoop.HadoopStorage.*;

Expand All @@ -33,15 +35,18 @@ public OutputAdapterMeta meta() {
.def(CODEC, "Codec to compress the output", Codec.class, Codec.NONE,
"By default, use no compression")
.def(COLUMNS, "Columns to write",
String[].class, null, "By default, select all columns")
Object[].class, null, "By default, select all columns")
.build()
);
}

protected void configure(Configuration params) throws InvalidConfigurationException {
super.configure(params);

columns = params.get(COLUMNS);
Object[] cols = params.get(COLUMNS);
if (cols != null) {
columns = Arrays.stream(cols).map(String::valueOf).toArray(String[]::new);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.io.IOException;
import java.io.StringWriter;
import java.util.Arrays;

import static io.github.pastorgl.datacooker.storage.hadoop.HadoopStorage.*;

Expand All @@ -33,7 +34,7 @@ public OutputAdapterMeta meta() {
.def(CODEC, "Codec to compress the output", Codec.class, Codec.NONE,
"By default, use no compression")
.def(COLUMNS, "Columns to write",
String[].class, null, "By default, select all columns")
Object[].class, null, "By default, select all columns")
.def(DELIMITER, "Record column delimiter",
String.class, "\t", "By default, tabulation character")
.build()
Expand All @@ -43,7 +44,10 @@ public OutputAdapterMeta meta() {
protected void configure(Configuration params) throws InvalidConfigurationException {
super.configure(params);

columns = params.get(COLUMNS);
Object[] cols = params.get(COLUMNS);
if (cols != null) {
columns = Arrays.stream(cols).map(String::valueOf).toArray(String[]::new);
}
delimiter = params.get(DELIMITER);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static io.github.pastorgl.datacooker.data.ObjLvl.VALUE;

Expand Down Expand Up @@ -48,7 +49,7 @@ public OperationMeta meta() {
.build(),

new DefinitionMetaBuilder()
.def(CALC_RESULTS, "List of resulting column names", String[].class)
.def(CALC_RESULTS, "List of resulting column names", Object[].class)
.dynDef(SOURCE_ATTR_PREFIX, "Column with Double values to use as series source", String.class)
.dynDef(CALC_FUNCTION_PREFIX, "The mathematical function to perform over the series", KeyedMath.class)
.dynDef(CALC_CONST_PREFIX, "An optional constant value for the selected function", Double.class)
Expand All @@ -65,7 +66,7 @@ public OperationMeta meta() {

@Override
public void configure(Configuration params) throws InvalidConfigurationException {
resultingColumns = params.get(CALC_RESULTS);
resultingColumns = Arrays.stream((Object[]) params.get(CALC_RESULTS)).map(String::valueOf).toArray(String[]::new);

sourceAttrs = new String[resultingColumns.length];
keyedFunctions = new KeyedFunction[resultingColumns.length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
import scala.Tuple2;

import java.util.*;
import java.util.stream.Collectors;

import static io.github.pastorgl.datacooker.data.ObjLvl.VALUE;

@SuppressWarnings("unused")
public class CountUniquesOperation extends Operation {
static final String COUNT_ATTRS = "count_attrs";

protected String[] countAttrs;
protected Object[] countAttrs;

@Override
public OperationMeta meta() {
Expand All @@ -37,7 +38,7 @@ public OperationMeta meta() {
.build(),

new DefinitionMetaBuilder()
.def(COUNT_ATTRS, "Attributes to count unique values under same keys", String[].class)
.def(COUNT_ATTRS, "Attributes to count unique values under same keys", Object[].class)
.build(),

new PositionalStreamsMetaBuilder()
Expand All @@ -60,7 +61,7 @@ public ListOrderedMap<String, DataStream> execute() {
throw new InvalidConfigurationException("Operation '" + meta.verb + "' requires same amount of INPUT and OUTPUT streams");
}

final List<String> outputColumns = Arrays.asList(countAttrs);
final List<String> outputColumns = Arrays.stream(countAttrs).map(String::valueOf).collect(Collectors.toList());
final int l = countAttrs.length;

ListOrderedMap<String, DataStream> outputs = new ListOrderedMap<>();
Expand Down

0 comments on commit 50b796c

Please sign in to comment.