Skip to content

Commit

Permalink
add dsName PARTITION ARRAY|RANGE|Numeric syntax to FROM context every…
Browse files Browse the repository at this point in the history
…where
  • Loading branch information
PastorGL committed Jan 20, 2025
1 parent d87dd6b commit be3f4f5
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
import io.github.pastorgl.datacooker.scripting.Function.Binary;
import io.github.pastorgl.datacooker.scripting.Function.Ternary;
import io.github.pastorgl.datacooker.scripting.Function.Unary;
import org.apache.commons.lang3.ArrayUtils;

import java.util.Arrays;
import java.util.Deque;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

@SuppressWarnings("unused")
public class ArrayFunctions {
Expand Down Expand Up @@ -105,4 +108,51 @@ public String descr() {
return "Make an ARRAY from all arguments in their given order";
}
}

public static class MakeRange extends Binary<ArrayWrap, Number, Number> {
@Override
public ArrayWrap call(Deque<Object> args) {
Number a = Evaluator.popNumeric(args);
Number b = Evaluator.popNumeric(args);

Object[] values = getRange(a, b);
return new ArrayWrap(values);
}

public static Object[] getRange(Number a, Number b) {
Object[] values;
if ((a instanceof Integer) && (b instanceof Integer)) {
int ia = a.intValue();
int ib = b.intValue();

if (ia > ib) {
values = IntStream.rangeClosed(ib, ia).boxed().toArray();
ArrayUtils.reverse(values);
}
values = IntStream.rangeClosed(ia, ib).boxed().toArray();
} else {
long la = a.longValue();
long lb = b.longValue();

if (la > lb) {
values = LongStream.rangeClosed(lb, la).boxed().toArray();
ArrayUtils.reverse(values);
}
values = LongStream.rangeClosed(la, lb).boxed().toArray();
}

return values;
}

@Override
public String name() {
return "ARR_RANGE";
}

@Override
public String descr() {
return "Make a RANGE with both boundaries included, with order preserved. If both are Integer," +
" all RANGE values are Integer, and Long otherwise";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,17 @@ public ListOrderedMap<String, DataStream> getAll(String... templates) {
return ret;
}

public DataStream getDsParts(String name, int[] partitions) {
DataStream ds = store.get(name);

if (partitions != null) {
ds = new DataStreamBuilder(ds.name, ds.attributes())
.filtered("PARTITION", ds)
.build(RetainerRDD.retain(ds.rdd, partitions));
}
return ds;
}

public void put(String name, DataStream ds) {
store.put(name, ds);
}
Expand Down Expand Up @@ -171,7 +182,7 @@ public ListOrderedMap<String, StreamInfo> createDataStreams(String adapter, Stri
}
}

public void copyDataStream(String adapter, String outputName, String path, Map<String, Object> params, int[] partitions) {
public void copyDataStream(String adapter, String outputName, int[] partitions, String path, Map<String, Object> params) {
try {
DataStream ds = store.get(outputName);

Expand Down Expand Up @@ -1009,6 +1020,7 @@ public void analyze(Map<String, DataStream> dataStreams, List<Expressions.ExprIt
.repartition(1)
.mapToPair(t -> t)
.rightOuterJoin(empties)
.sortByKey()
.mapValues(w -> w._1.orElse(w._2));
deepMetrics = deepMetrics.persist(sl);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import org.apache.spark.rdd.CoalescedRDD;
import org.apache.spark.rdd.CoalescedRDDPartition;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.TaskLocation;
import scala.Option;
import scala.Tuple2;
import scala.collection.Seq;
import scala.reflect.ClassManifestFactory$;
import scala.reflect.ClassTag;

Expand All @@ -30,7 +32,8 @@ public RetainerRDD(RDD<?> oneParent, int[] partsToRetain, ClassTag ct) {
public Partition[] getPartitions() {
Partition[] ret = new Partition[partsToRetain.length];
for (int i = 0; i < ret.length; i++) {
ret[i] = new CoalescedRDDPartition(i, prev, new int[]{partsToRetain[i]}, Option.apply(prev.context().getPreferredLocs(prev, partsToRetain[i]).head().host()));
Seq<TaskLocation> preferredLocs = prev.context().getPreferredLocs(prev, partsToRetain[i]);
ret[i] = new CoalescedRDDPartition(i, prev, new int[]{partsToRetain[i]}, Option.apply(preferredLocs.isEmpty() ? null : preferredLocs.head().host()));
}
return ret;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.github.pastorgl.datacooker.scripting;

import io.github.pastorgl.datacooker.Options;
import io.github.pastorgl.datacooker.commons.functions.ArrayFunctions;
import io.github.pastorgl.datacooker.config.Configuration;
import io.github.pastorgl.datacooker.config.InvalidConfigurationException;
import io.github.pastorgl.datacooker.data.*;
Expand All @@ -14,13 +15,12 @@
import org.antlr.v4.runtime.tree.ParseTree;
import org.antlr.v4.runtime.tree.TerminalNode;
import org.apache.commons.collections4.map.ListOrderedMap;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.spark.api.java.JavaPairRDD;
import scala.Function1;
import scala.Tuple2;

import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import static io.github.pastorgl.datacooker.Constants.CWD_VAR;
import static io.github.pastorgl.datacooker.Constants.STAR;
Expand Down Expand Up @@ -513,7 +513,7 @@ private void copy(TDL4.Copy_stmtContext ctx) {
partitions = getParts(ctx.ds_parts().expression().children, variables);
}

dataContext.copyDataStream(outVerb, dataStream, path, params, partitions);
dataContext.copyDataStream(outVerb, dataStream, partitions, path, params);

if (verbose) {
System.out.println("Lineage:");
Expand Down Expand Up @@ -794,14 +794,10 @@ private List<Expressions.ExprItem<?>> expression(List<ParseTree> exprChildren, E
if (exprItem instanceof TDL4.ArrayContext array) {
Object[] values = null;
if (array.S_RANGE() != null) {
long a = resolveNumericLiteral(array.L_NUMERIC(0)).longValue();
long b = resolveNumericLiteral(array.L_NUMERIC(1)).longValue();
Number a = resolveNumericLiteral(array.L_NUMERIC(0));
Number b = resolveNumericLiteral(array.L_NUMERIC(1));

if (a > b) {
values = LongStream.rangeClosed(b, a).boxed().toArray();
ArrayUtils.reverse(values);
}
values = LongStream.rangeClosed(a, b).boxed().toArray();
values = ArrayFunctions.MakeRange.getRange(a, b);
} else {
if ((rules == ExpressionRules.AT) || (rules == ExpressionRules.LET)) {
if (!array.L_IDENTIFIER().isEmpty()) {
Expand Down Expand Up @@ -953,12 +949,18 @@ private void select(TDL4.Select_stmtContext ctx) {
}

ListOrderedMap<String, int[]> fromList = new ListOrderedMap<>();
for (TDL4.Ds_partsContext e : from.ds_parts()) {
String s = resolveName(e.L_IDENTIFIER());
fromList.put(s, (e.K_PARTITION() != null) ? getParts(e.expression().children, variables) : null);
}
if (starFrom) {
dataContext.getNames(fromList.get(0) + STAR);
TDL4.Ds_partsContext ds0 = from.ds_parts(0);

List<String> names = dataContext.getNames(resolveName(ds0.L_IDENTIFIER()) + STAR);
int[] parts = (ds0.K_PARTITION() != null) ? getParts(ds0.expression().children, variables) : null;
for (String name : names) {
fromList.put(name, parts);
}
} else {
for (TDL4.Ds_partsContext e : from.ds_parts()) {
fromList.put(resolveName(e.L_IDENTIFIER()), (e.K_PARTITION() != null) ? getParts(e.expression().children, variables) : null);
}
}

List<SelectItem> items = new ArrayList<>();
Expand Down Expand Up @@ -1039,7 +1041,7 @@ private void select(TDL4.Select_stmtContext ctx) {
System.out.println("Duplicated DS " + fromList.get(0) + ": " + dataContext.streamInfo(fromList.get(0)).describe(ut));
}

result = dataContext.rdd(firstStream);
result = dataContext.getDsParts(fromList.get(0), fromList.getValue(0)).rdd();
resultColumns = firstStream.attributes();
} else {
if (verbose) {
Expand Down Expand Up @@ -1177,9 +1179,13 @@ private void callOperation(String opVerb, Map<String, Object> params, TDL4.Opera
}

if (fromScope.S_STAR() != null) {
String prefix = resolveName(fromScope.ds_parts(0).L_IDENTIFIER());
TDL4.Ds_partsContext dsCtx = fromScope.ds_parts(0);
String prefix = resolveName(dsCtx.L_IDENTIFIER());
prefixLen = prefix.length();
inputMap = dataContext.getAll(prefix + STAR);
inputMap = new ListOrderedMap<>();
for (String dsName : dataContext.getNames(prefix + STAR)) {
inputMap.put(dsName, dataContext.getDsParts(dsName, (dsCtx.K_PARTITION() != null) ? getParts(dsCtx.expression().children, variables) : null));
}

if (inputMap.isEmpty()) {
throw new InvalidConfigurationException("CALL " + opVerb + "() INPUT from positional wildcard reference found zero matching DataStreams");
Expand All @@ -1190,7 +1196,7 @@ private void callOperation(String opVerb, Map<String, Object> params, TDL4.Opera
String dsName = resolveName(dsCtx.L_IDENTIFIER());

if (dataContext.has(dsName)) {
inputMap.put(dsName, dataContext.get(dsName));
inputMap.put(dsName, dataContext.getDsParts(dsName, (dsCtx.K_PARTITION() != null) ? getParts(dsCtx.expression().children, variables) : null));
} else {
throw new InvalidConfigurationException("CALL " + opVerb + "() INPUT refers to unknown positional DataStream \"" + dsName + "\"");
}
Expand All @@ -1217,11 +1223,12 @@ private void callOperation(String opVerb, Map<String, Object> params, TDL4.Opera
throw new InvalidConfigurationException("CALL " + opVerb + "() INPUT requires aliased DataStream references");
}

LinkedHashMap<String, String> dsMappings = new LinkedHashMap<>();
List<TDL4.Ds_partsContext> ds_name = fromNamed.ds_parts();
for (int i = 0; i < ds_name.size(); i++) {
LinkedHashMap<String, Tuple2<String, int[]>> dsMappings = new LinkedHashMap<>();
List<TDL4.Ds_partsContext> dsParts = fromNamed.ds_parts();
for (int i = 0; i < dsParts.size(); i++) {
TDL4.Ds_partsContext dsCtx = dsParts.get(i);
dsMappings.put(resolveName(fromNamed.ds_alias(i).L_IDENTIFIER()),
resolveName(ds_name.get(i).L_IDENTIFIER()));
new Tuple2<>(resolveName(dsCtx.L_IDENTIFIER()), (dsCtx.K_PARTITION() != null) ? getParts(dsCtx.expression().children, variables) : null));
}

for (Map.Entry<String, DataStreamMeta> ns : nsm.streams.entrySet()) {
Expand All @@ -1234,22 +1241,22 @@ private void callOperation(String opVerb, Map<String, Object> params, TDL4.Opera
}

inputMap = new ListOrderedMap<>();
for (Map.Entry<String, String> dsm : dsMappings.entrySet()) {
for (Map.Entry<String, Tuple2<String, int[]>> dsm : dsMappings.entrySet()) {
String alias = dsm.getKey();
String dsName = dsm.getValue();
Tuple2<String, int[]> dsNameParts = dsm.getValue();

if (!nsm.streams.containsKey(alias)) {
throw new InvalidConfigurationException("CALL " + opVerb + "() INPUT " + alias + " FROM refers to unknown DataStream \"" + dsName + "\"");
throw new InvalidConfigurationException("CALL " + opVerb + "() INPUT " + alias + " FROM refers to unknown DataStream \"" + dsNameParts + "\"");
}

DataStream inputDs = dataContext.get(dsName);
DataStream inputDs = dataContext.get(dsNameParts._1);
if (!Arrays.asList(nsm.streams.get(alias).type).contains(inputDs.streamType)) {
throw new InvalidConfigurationException("CALL " + opVerb + "() doesn't accept INPUT " + alias + " FROM DataStream \""
+ dsName + "\" of type " + inputDs.streamType);
+ dsNameParts + "\" of type " + inputDs.streamType);
}

inputMap.put(alias, inputDs);
inputList.add(dsName);
inputMap.put(alias, dataContext.getDsParts(dsNameParts._1, dsNameParts._2));
inputList.add(dsNameParts._1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ public ListOrderedMap<String, StreamInfo> createDataStreams(String adapter, Stri
}

@Override
public void copyDataStream(String adapter, String outputName, String path, Map<String, Object> params, int[] partitions) {
public void copyDataStream(String adapter, String outputName, int[] partitions, String path, Map<String, Object> params) {
path = System.getProperty("java.io.tmpdir") + "/" + new Date().getTime() + "." + new Random().nextLong() + "/" + path;
tempDirs.add(path);

super.copyDataStream(adapter, outputName, path, params, partitions);
super.copyDataStream(adapter, outputName, partitions, path, params);
}

public void deleteTempDirs() {
Expand Down

0 comments on commit be3f4f5

Please sign in to comment.