diff --git a/datacooker-commons/src/main/java/io/github/pastorgl/datacooker/commons/functions/ArrayFunctions.java b/datacooker-commons/src/main/java/io/github/pastorgl/datacooker/commons/functions/ArrayFunctions.java index b3ce81d..86104bd 100644 --- a/datacooker-commons/src/main/java/io/github/pastorgl/datacooker/commons/functions/ArrayFunctions.java +++ b/datacooker-commons/src/main/java/io/github/pastorgl/datacooker/commons/functions/ArrayFunctions.java @@ -10,10 +10,13 @@ import io.github.pastorgl.datacooker.scripting.Function.Binary; import io.github.pastorgl.datacooker.scripting.Function.Ternary; import io.github.pastorgl.datacooker.scripting.Function.Unary; +import org.apache.commons.lang3.ArrayUtils; import java.util.Arrays; import java.util.Deque; import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; @SuppressWarnings("unused") public class ArrayFunctions { @@ -105,4 +108,51 @@ public String descr() { return "Make an ARRAY from all arguments in their given order"; } } + + public static class MakeRange extends Binary { + @Override + public ArrayWrap call(Deque args) { + Number a = Evaluator.popNumeric(args); + Number b = Evaluator.popNumeric(args); + + Object[] values = getRange(a, b); + return new ArrayWrap(values); + } + + public static Object[] getRange(Number a, Number b) { + Object[] values; + if ((a instanceof Integer) && (b instanceof Integer)) { + int ia = a.intValue(); + int ib = b.intValue(); + + if (ia > ib) { + values = IntStream.rangeClosed(ib, ia).boxed().toArray(); + ArrayUtils.reverse(values); + } + values = IntStream.rangeClosed(ia, ib).boxed().toArray(); + } else { + long la = a.longValue(); + long lb = b.longValue(); + + if (la > lb) { + values = LongStream.rangeClosed(lb, la).boxed().toArray(); + ArrayUtils.reverse(values); + } + values = LongStream.rangeClosed(la, lb).boxed().toArray(); + } + + return values; + } + + @Override + public String name() { + return "ARR_RANGE"; + } + + @Override + public String descr() { + return "Make a RANGE with both boundaries included, with order preserved. If both are Integer," + + " all RANGE values are Integer, and Long otherwise"; + } + } } diff --git a/datacooker-commons/src/main/java/io/github/pastorgl/datacooker/data/DataContext.java b/datacooker-commons/src/main/java/io/github/pastorgl/datacooker/data/DataContext.java index 8a82f17..8b4149c 100644 --- a/datacooker-commons/src/main/java/io/github/pastorgl/datacooker/data/DataContext.java +++ b/datacooker-commons/src/main/java/io/github/pastorgl/datacooker/data/DataContext.java @@ -136,6 +136,17 @@ public ListOrderedMap getAll(String... templates) { return ret; } + public DataStream getDsParts(String name, int[] partitions) { + DataStream ds = store.get(name); + + if (partitions != null) { + ds = new DataStreamBuilder(ds.name, ds.attributes()) + .filtered("PARTITION", ds) + .build(RetainerRDD.retain(ds.rdd, partitions)); + } + return ds; + } + public void put(String name, DataStream ds) { store.put(name, ds); } @@ -171,7 +182,7 @@ public ListOrderedMap createDataStreams(String adapter, Stri } } - public void copyDataStream(String adapter, String outputName, String path, Map params, int[] partitions) { + public void copyDataStream(String adapter, String outputName, int[] partitions, String path, Map params) { try { DataStream ds = store.get(outputName); @@ -1009,6 +1020,7 @@ public void analyze(Map dataStreams, List t) .rightOuterJoin(empties) + .sortByKey() .mapValues(w -> w._1.orElse(w._2)); deepMetrics = deepMetrics.persist(sl); diff --git a/datacooker-commons/src/main/java/io/github/pastorgl/datacooker/data/RetainerRDD.java b/datacooker-commons/src/main/java/io/github/pastorgl/datacooker/data/RetainerRDD.java index 6c243e0..7529f93 100644 --- a/datacooker-commons/src/main/java/io/github/pastorgl/datacooker/data/RetainerRDD.java +++ b/datacooker-commons/src/main/java/io/github/pastorgl/datacooker/data/RetainerRDD.java @@ -9,8 +9,10 @@ import org.apache.spark.rdd.CoalescedRDD; import org.apache.spark.rdd.CoalescedRDDPartition; import org.apache.spark.rdd.RDD; +import org.apache.spark.scheduler.TaskLocation; import scala.Option; import scala.Tuple2; +import scala.collection.Seq; import scala.reflect.ClassManifestFactory$; import scala.reflect.ClassTag; @@ -30,7 +32,8 @@ public RetainerRDD(RDD oneParent, int[] partsToRetain, ClassTag ct) { public Partition[] getPartitions() { Partition[] ret = new Partition[partsToRetain.length]; for (int i = 0; i < ret.length; i++) { - ret[i] = new CoalescedRDDPartition(i, prev, new int[]{partsToRetain[i]}, Option.apply(prev.context().getPreferredLocs(prev, partsToRetain[i]).head().host())); + Seq preferredLocs = prev.context().getPreferredLocs(prev, partsToRetain[i]); + ret[i] = new CoalescedRDDPartition(i, prev, new int[]{partsToRetain[i]}, Option.apply(preferredLocs.isEmpty() ? null : preferredLocs.head().host())); } return ret; } diff --git a/datacooker-commons/src/main/java/io/github/pastorgl/datacooker/scripting/TDL4Interpreter.java b/datacooker-commons/src/main/java/io/github/pastorgl/datacooker/scripting/TDL4Interpreter.java index 6502744..44b7e52 100644 --- a/datacooker-commons/src/main/java/io/github/pastorgl/datacooker/scripting/TDL4Interpreter.java +++ b/datacooker-commons/src/main/java/io/github/pastorgl/datacooker/scripting/TDL4Interpreter.java @@ -5,6 +5,7 @@ package io.github.pastorgl.datacooker.scripting; import io.github.pastorgl.datacooker.Options; +import io.github.pastorgl.datacooker.commons.functions.ArrayFunctions; import io.github.pastorgl.datacooker.config.Configuration; import io.github.pastorgl.datacooker.config.InvalidConfigurationException; import io.github.pastorgl.datacooker.data.*; @@ -14,13 +15,12 @@ import org.antlr.v4.runtime.tree.ParseTree; import org.antlr.v4.runtime.tree.TerminalNode; import org.apache.commons.collections4.map.ListOrderedMap; -import org.apache.commons.lang3.ArrayUtils; import org.apache.spark.api.java.JavaPairRDD; import scala.Function1; +import scala.Tuple2; import java.util.*; import java.util.stream.Collectors; -import java.util.stream.LongStream; import static io.github.pastorgl.datacooker.Constants.CWD_VAR; import static io.github.pastorgl.datacooker.Constants.STAR; @@ -513,7 +513,7 @@ private void copy(TDL4.Copy_stmtContext ctx) { partitions = getParts(ctx.ds_parts().expression().children, variables); } - dataContext.copyDataStream(outVerb, dataStream, path, params, partitions); + dataContext.copyDataStream(outVerb, dataStream, partitions, path, params); if (verbose) { System.out.println("Lineage:"); @@ -794,14 +794,10 @@ private List> expression(List exprChildren, E if (exprItem instanceof TDL4.ArrayContext array) { Object[] values = null; if (array.S_RANGE() != null) { - long a = resolveNumericLiteral(array.L_NUMERIC(0)).longValue(); - long b = resolveNumericLiteral(array.L_NUMERIC(1)).longValue(); + Number a = resolveNumericLiteral(array.L_NUMERIC(0)); + Number b = resolveNumericLiteral(array.L_NUMERIC(1)); - if (a > b) { - values = LongStream.rangeClosed(b, a).boxed().toArray(); - ArrayUtils.reverse(values); - } - values = LongStream.rangeClosed(a, b).boxed().toArray(); + values = ArrayFunctions.MakeRange.getRange(a, b); } else { if ((rules == ExpressionRules.AT) || (rules == ExpressionRules.LET)) { if (!array.L_IDENTIFIER().isEmpty()) { @@ -953,12 +949,18 @@ private void select(TDL4.Select_stmtContext ctx) { } ListOrderedMap fromList = new ListOrderedMap<>(); - for (TDL4.Ds_partsContext e : from.ds_parts()) { - String s = resolveName(e.L_IDENTIFIER()); - fromList.put(s, (e.K_PARTITION() != null) ? getParts(e.expression().children, variables) : null); - } if (starFrom) { - dataContext.getNames(fromList.get(0) + STAR); + TDL4.Ds_partsContext ds0 = from.ds_parts(0); + + List names = dataContext.getNames(resolveName(ds0.L_IDENTIFIER()) + STAR); + int[] parts = (ds0.K_PARTITION() != null) ? getParts(ds0.expression().children, variables) : null; + for (String name : names) { + fromList.put(name, parts); + } + } else { + for (TDL4.Ds_partsContext e : from.ds_parts()) { + fromList.put(resolveName(e.L_IDENTIFIER()), (e.K_PARTITION() != null) ? getParts(e.expression().children, variables) : null); + } } List items = new ArrayList<>(); @@ -1039,7 +1041,7 @@ private void select(TDL4.Select_stmtContext ctx) { System.out.println("Duplicated DS " + fromList.get(0) + ": " + dataContext.streamInfo(fromList.get(0)).describe(ut)); } - result = dataContext.rdd(firstStream); + result = dataContext.getDsParts(fromList.get(0), fromList.getValue(0)).rdd(); resultColumns = firstStream.attributes(); } else { if (verbose) { @@ -1177,9 +1179,13 @@ private void callOperation(String opVerb, Map params, TDL4.Opera } if (fromScope.S_STAR() != null) { - String prefix = resolveName(fromScope.ds_parts(0).L_IDENTIFIER()); + TDL4.Ds_partsContext dsCtx = fromScope.ds_parts(0); + String prefix = resolveName(dsCtx.L_IDENTIFIER()); prefixLen = prefix.length(); - inputMap = dataContext.getAll(prefix + STAR); + inputMap = new ListOrderedMap<>(); + for (String dsName : dataContext.getNames(prefix + STAR)) { + inputMap.put(dsName, dataContext.getDsParts(dsName, (dsCtx.K_PARTITION() != null) ? getParts(dsCtx.expression().children, variables) : null)); + } if (inputMap.isEmpty()) { throw new InvalidConfigurationException("CALL " + opVerb + "() INPUT from positional wildcard reference found zero matching DataStreams"); @@ -1190,7 +1196,7 @@ private void callOperation(String opVerb, Map params, TDL4.Opera String dsName = resolveName(dsCtx.L_IDENTIFIER()); if (dataContext.has(dsName)) { - inputMap.put(dsName, dataContext.get(dsName)); + inputMap.put(dsName, dataContext.getDsParts(dsName, (dsCtx.K_PARTITION() != null) ? getParts(dsCtx.expression().children, variables) : null)); } else { throw new InvalidConfigurationException("CALL " + opVerb + "() INPUT refers to unknown positional DataStream \"" + dsName + "\""); } @@ -1217,11 +1223,12 @@ private void callOperation(String opVerb, Map params, TDL4.Opera throw new InvalidConfigurationException("CALL " + opVerb + "() INPUT requires aliased DataStream references"); } - LinkedHashMap dsMappings = new LinkedHashMap<>(); - List ds_name = fromNamed.ds_parts(); - for (int i = 0; i < ds_name.size(); i++) { + LinkedHashMap> dsMappings = new LinkedHashMap<>(); + List dsParts = fromNamed.ds_parts(); + for (int i = 0; i < dsParts.size(); i++) { + TDL4.Ds_partsContext dsCtx = dsParts.get(i); dsMappings.put(resolveName(fromNamed.ds_alias(i).L_IDENTIFIER()), - resolveName(ds_name.get(i).L_IDENTIFIER())); + new Tuple2<>(resolveName(dsCtx.L_IDENTIFIER()), (dsCtx.K_PARTITION() != null) ? getParts(dsCtx.expression().children, variables) : null)); } for (Map.Entry ns : nsm.streams.entrySet()) { @@ -1234,22 +1241,22 @@ private void callOperation(String opVerb, Map params, TDL4.Opera } inputMap = new ListOrderedMap<>(); - for (Map.Entry dsm : dsMappings.entrySet()) { + for (Map.Entry> dsm : dsMappings.entrySet()) { String alias = dsm.getKey(); - String dsName = dsm.getValue(); + Tuple2 dsNameParts = dsm.getValue(); if (!nsm.streams.containsKey(alias)) { - throw new InvalidConfigurationException("CALL " + opVerb + "() INPUT " + alias + " FROM refers to unknown DataStream \"" + dsName + "\""); + throw new InvalidConfigurationException("CALL " + opVerb + "() INPUT " + alias + " FROM refers to unknown DataStream \"" + dsNameParts + "\""); } - DataStream inputDs = dataContext.get(dsName); + DataStream inputDs = dataContext.get(dsNameParts._1); if (!Arrays.asList(nsm.streams.get(alias).type).contains(inputDs.streamType)) { throw new InvalidConfigurationException("CALL " + opVerb + "() doesn't accept INPUT " + alias + " FROM DataStream \"" - + dsName + "\" of type " + inputDs.streamType); + + dsNameParts + "\" of type " + inputDs.streamType); } - inputMap.put(alias, inputDs); - inputList.add(dsName); + inputMap.put(alias, dataContext.getDsParts(dsNameParts._1, dsNameParts._2)); + inputList.add(dsNameParts._1); } } diff --git a/datacooker-commons/src/test/java/io/github/pastorgl/datacooker/scripting/TestDataContext.java b/datacooker-commons/src/test/java/io/github/pastorgl/datacooker/scripting/TestDataContext.java index c3c29c9..88193d1 100644 --- a/datacooker-commons/src/test/java/io/github/pastorgl/datacooker/scripting/TestDataContext.java +++ b/datacooker-commons/src/test/java/io/github/pastorgl/datacooker/scripting/TestDataContext.java @@ -30,11 +30,11 @@ public ListOrderedMap createDataStreams(String adapter, Stri } @Override - public void copyDataStream(String adapter, String outputName, String path, Map params, int[] partitions) { + public void copyDataStream(String adapter, String outputName, int[] partitions, String path, Map params) { path = System.getProperty("java.io.tmpdir") + "/" + new Date().getTime() + "." + new Random().nextLong() + "/" + path; tempDirs.add(path); - super.copyDataStream(adapter, outputName, path, params, partitions); + super.copyDataStream(adapter, outputName, partitions, path, params); } public void deleteTempDirs() {