Skip to content

Commit

Permalink
a whole lot of fixes regarding data context (TRANSFORM PARTITION / AN…
Browse files Browse the repository at this point in the history
…ALYZE) and ARRAY handling
  • Loading branch information
PastorGL committed Jan 14, 2025
1 parent 50b796c commit 2d03f95
Show file tree
Hide file tree
Showing 61 changed files with 331 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ public StreamInfo get(String dsName) {

@Override
public Stream<String> sample(String dsName, int limit) {
return dataContext.get(dsName).rdd.takeSample(false, limit).stream()
return dataContext.rdd(dsName).takeSample(false, limit).stream()
.map(t -> t._1 + " => " + t._2);
}

@Override
public Stream<String> part(String dsName, final int part, final int limit) {
return DataHelper.takeFromPart(dataContext.get(dsName).rdd, part, limit);
return DataHelper.takeFromPart(dataContext.rdd(dsName), part, limit);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public StreamInfo info(@QueryParam("name") @NotEmpty String name) {
@Produces(MediaType.APPLICATION_JSON)
public List<String> sample(@QueryParam("name") @NotEmpty String name,
@QueryParam("limit") @PositiveOrZero @NotNull Integer limit) {
return dc.get(name).rdd.takeSample(false, limit).stream()
return dc.rdd(name).takeSample(false, limit).stream()
.map(r -> r._1 + " => " + r._2)
.collect(Collectors.toList());
}
Expand All @@ -63,7 +63,7 @@ public List<String> sample(@QueryParam("name") @NotEmpty String name,
public List<String> part(@QueryParam("name") @NotEmpty String name,
@QueryParam("part") @PositiveOrZero @NotNull Integer part,
@QueryParam("limit") @PositiveOrZero @NotNull Integer limit) {
return DataHelper.takeFromPart(dc.get(name).rdd, part, limit).collect(Collectors.toList());
return DataHelper.takeFromPart(dc.rdd(name), part, limit).collect(Collectors.toList());
}

@POST
Expand Down
2 changes: 1 addition & 1 deletion cli/src/test/resources/function.vm
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Record Key (implicit). See description for any additional arguments
#elseif( $op.arity == -3 )
Record Object of type <code>${op.argTypes[0]}</code> (implicit). See description for any additional arguments
#elseif( $op.arity == -4 )
Record Key (implicit), Record Object of type <code>${op.argTypes[0]}</code> (implicit). See description for any additional arguments
Record Key (implicit), Record Object (implicit). See description for any additional arguments
#end
</p>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@ statement
;

create_stmt
: K_CREATE K_DS? ds_name func_expr K_FROM expression partition? ( K_BY ( S_HASHCODE | K_SOURCE | S_RANDOM ) )?
;

partition
: K_PARTITION expression
: K_CREATE K_DS? ds_name func_expr K_FROM expression ( K_PARTITION expression )? ( K_BY ( S_HASHCODE | K_SOURCE | S_RANDOM ) )?
;

transform_stmt
: K_TRANSFORM K_DS? ds_name S_STAR? func_expr columns_item* key_item? partition?
: K_TRANSFORM K_DS? ds_name S_STAR? func_expr columns_item* key_item? ( K_PARTITION expression? )?
;

columns_item
Expand Down Expand Up @@ -183,7 +179,7 @@ if_stmt
;

analyze_stmt
: K_ANALYZE K_DS? ds_name S_STAR? ( K_KEY attr )? K_PARTITION?
: K_ANALYZE K_DS? ds_name S_STAR? key_item? K_PARTITION?
;

options_stmt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

@SuppressWarnings("unused")
public class ArrayFunctions {
public static class Slice extends Ternary<Object, Object, Integer, Integer> {
public static class Slice extends Ternary<ArrayWrap, Object, Integer, Integer> {
@Override
public ArrayWrap call(Deque<Object> args) {
ArrayWrap a = Evaluator.popArray(args);
Expand Down Expand Up @@ -89,7 +89,7 @@ public String descr() {
}
}

public static class Make extends ArbitrAry<Object, Object> {
public static class Make extends ArbitrAry<ArrayWrap, Object> {
@Override
public ArrayWrap call(Deque<Object> args) {
return new ArrayWrap(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.github.pastorgl.datacooker.data.DataRecord;
import io.github.pastorgl.datacooker.scripting.Evaluator;
import io.github.pastorgl.datacooker.scripting.Function;
import io.github.pastorgl.datacooker.scripting.Function.RecordKey;
import io.github.pastorgl.datacooker.scripting.Function.RecordObject;

Expand Down Expand Up @@ -81,4 +82,25 @@ public String descr() {
return "Returns the specified Attribute of DS Record as is";
}
}

public static class PIVOT extends Function.WholeRecord<Object[]> {
@Override
public Object[] call(Deque<Object> args) {
//discard bot record and key; this should work only in SELECT context for whole records
args.pop();
args.pop();
return Evaluator.popArray(args).data();
}

@Override
public String name() {
return "PIVOT";
}

@Override
public String descr() {
return "Creates top-level Attributes of DS Record from ARRAY passed as an argument suffixed with ARRAY" +
" element indices";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package io.github.pastorgl.datacooker.commons.functions;

import io.github.pastorgl.datacooker.data.ArrayWrap;
import io.github.pastorgl.datacooker.scripting.Evaluator;
import io.github.pastorgl.datacooker.scripting.Function.ArbitrAry;
import io.github.pastorgl.datacooker.scripting.Function.Binary;
Expand Down Expand Up @@ -53,11 +54,11 @@ public String descr() {
}
}

public static class Split extends Ternary<String[], String, String, Integer> {
public static class Split extends Ternary<ArrayWrap, String, String, Integer> {
@Override
public String[] call(Deque<Object> args) {
public ArrayWrap call(Deque<Object> args) {
String subject = Evaluator.popString(args);
return subject.split(Evaluator.popString(args), Evaluator.popInt(args));
return new ArrayWrap(subject.split(Evaluator.popString(args), Evaluator.popInt(args)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public ListOrderedMap<String, DataStream> execute() {
ListOrderedMap<String, DataStream> outputs = new ListOrderedMap<>();
for (int i = 0, len = inputStreams.size(); i < len; i++) {
DataStream input = inputStreams.getValue(i);
JavaPairRDD<Object, DataRecord<?>> count = input.rdd
JavaPairRDD<Object, DataRecord<?>> count = input.rdd()
.mapToPair(t -> new Tuple2<>(t._1, 1L))
.reduceByKey(Long::sum)
.mapToPair(t -> new Tuple2<>(t._1, new Columnar(indices, new Object[]{t._2})));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public ListOrderedMap<String, DataStream> execute() {

final List<String> _splitColumnNames = Arrays.stream(splitAttrs).map(String::valueOf).collect(Collectors.toList());

JavaPairRDD<Object, DataRecord<?>> distinctSplits = input.rdd
JavaPairRDD<Object, DataRecord<?>> distinctSplits = input.rdd()
.mapPartitionsToPair(it -> {
Set<Tuple2<Object, DataRecord<?>>> ret = new HashSet<>();

Expand Down Expand Up @@ -131,7 +131,7 @@ public ListOrderedMap<String, DataStream> execute() {
}

int hash = (Integer) u.getKey();
JavaPairRDD<Object, DataRecord<?>> split = input.rdd.mapPartitionsToPair(it -> {
JavaPairRDD<Object, DataRecord<?>> split = input.rdd().mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

while (it.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public StreamConverter converter() {

return new DataStreamBuilder(ds.name, Collections.singletonMap(POINT, _outputColumns))
.transformed(meta.verb, StreamType.Point, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
.build(ds.rdd().mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

while (it.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public StreamConverter converter() {
final List<String> _outputColumns = valueColumns;
return new DataStreamBuilder(ds.name, Collections.singletonMap(VALUE, _outputColumns))
.transformed(meta.verb, StreamType.Structured, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
.build(ds.rdd().mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

ObjectMapper om = new ObjectMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public StreamConverter converter() {

return new DataStreamBuilder(ds.name, null)
.transformed(meta.verb, StreamType.PlainText, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
.build(ds.rdd().mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

while (it.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public StreamConverter converter() {
}
final List<String> _pointColumns = pointColumns;

JavaPairRDD<Object, DataRecord<?>> signalsInput = ds.rdd;
JavaPairRDD<Object, DataRecord<?>> signalsInput = ds.rdd();
int _numPartitions = signalsInput.getNumPartitions();

final boolean isSegmented = (_trackColumn != null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public StreamConverter converter() {

return new DataStreamBuilder(ds.name, newColumns)
.transformed(meta.verb, StreamType.Point, ds)
.build(ds.rdd.flatMapToPair(line -> {
.build(ds.rdd().flatMapToPair(line -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

GeoJSONReader reader = new GeoJSONReader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public StreamConverter converter() {

return new DataStreamBuilder(ds.name, newColumns)
.transformed(meta.verb, StreamType.Polygon, ds)
.build(ds.rdd.flatMapToPair(line -> {
.build(ds.rdd().flatMapToPair(line -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

GeoJSONReader reader = new GeoJSONReader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public StreamConverter converter() {

return new DataStreamBuilder(ds.name, newColumns)
.transformed(meta.verb, StreamType.Track, ds)
.build(ds.rdd.flatMapToPair(line -> {
.build(ds.rdd().flatMapToPair(line -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

String l = String.valueOf(line._2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public TransformMeta meta() {
public StreamConverter converter() {
return (ds, newColumns, params) -> new DataStreamBuilder(ds.name, newColumns)
.transformed(meta.verb, StreamType.Structured, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
.build(ds.rdd().mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

ObjectMapper om = new ObjectMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public StreamConverter converter() {

return new DataStreamBuilder(ds.name, newColumns)
.filtered(meta.verb, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
.build(ds.rdd().mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

while (it.hasNext()) {
Expand Down Expand Up @@ -68,7 +68,7 @@ public StreamConverter converter() {

return new DataStreamBuilder(ds.name, newColumns)
.filtered(meta().verb, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
.build(ds.rdd().mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

while (it.hasNext()) {
Expand Down Expand Up @@ -99,7 +99,7 @@ public StreamConverter converter() {

return new DataStreamBuilder(ds.name, newColumns)
.filtered(meta().verb, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
.build(ds.rdd().mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

while (it.hasNext()) {
Expand Down Expand Up @@ -156,7 +156,7 @@ public StreamConverter converter() {
default: {
return new DataStreamBuilder(ds.name, ds.attributes())
.passedthru(meta.verb, ds)
.build(ds.rdd);
.build(ds.rdd());
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public StreamConverter converter() {

return new DataStreamBuilder(ds.name, Collections.singletonMap(VALUE, _outputColumns))
.transformed(meta.verb, StreamType.Columnar, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
.build(ds.rdd().mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

while (it.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public StreamConverter converter() {

return new DataStreamBuilder(ds.name, null)
.transformed(meta.verb, StreamType.PlainText, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
.build(ds.rdd().mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

while (it.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public StreamConverter converter() {

return new DataStreamBuilder(ds.name, null)
.transformed(meta.verb, StreamType.PlainText, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
.build(ds.rdd().mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

Function<Coordinate[], double[][]> convert = (Coordinate[] coordinates) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public StreamConverter converter() {

return new DataStreamBuilder(ds.name, newColumns)
.transformed(meta.verb, StreamType.Columnar, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
.build(ds.rdd().mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

while (it.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public TransformMeta meta() {
public StreamConverter converter() {
return (ds, newColumns, params) -> new DataStreamBuilder(ds.name, null)
.transformed(meta.verb, StreamType.PlainText, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
.build(ds.rdd().mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

ObjectMapper om = new ObjectMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public StreamConverter converter() {

return new DataStreamBuilder(ds.name, Collections.singletonMap(VALUE, outputColumns))
.transformed(meta.verb, StreamType.Columnar, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
.build(ds.rdd().mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

CSVParser parser = new CSVParserBuilder().withSeparator(_inputDelimiter).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public StreamConverter converter() {

return new DataStreamBuilder(ds.name, null)
.transformed(meta.verb, StreamType.PlainText, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
.build(ds.rdd().mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

GPX.Writer writer = GPX.writer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public StreamConverter converter() {

return new DataStreamBuilder(ds.name, Collections.singletonMap(ObjLvl.POINT, outColumns))
.transformed(meta.verb, StreamType.Point, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
.build(ds.rdd().mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

while (it.hasNext()) {
Expand Down
Loading

0 comments on commit 2d03f95

Please sign in to comment.