Skip to content

Commit

Permalink
get rid of some really bad legacy in DS type system and enhance ANALYZE
Browse files Browse the repository at this point in the history
  • Loading branch information
PastorGL committed Jan 10, 2025
1 parent f0890ba commit a8e1f8a
Show file tree
Hide file tree
Showing 71 changed files with 763 additions and 679 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,46 +49,16 @@ public String highlight() {
text.append(token.getText());
} else {
switch (highlight) {
case OPERATOR: {
cls = "o";
break;
}
case KEYWORD: {
cls = "s";
break;
}
case NULL: {
cls = "u";
break;
}
case BOOLEAN: {
cls = "b";
break;
}
case TYPE: {
cls = "c";
break;
}
case IDENTIFIER: {
cls = "i";
break;
}
case SIGIL: {
cls = "g";
break;
}
case NUMERIC: {
cls = "n";
break;
}
case STRING: {
cls = "t";
break;
}
case COMMENT: {
cls = "m";
break;
}
case OPERATOR -> cls = "o";
case KEYWORD -> cls = "s";
case NULL -> cls = "u";
case BOOLEAN -> cls = "b";
case TYPE -> cls = "c";
case IDENTIFIER -> cls = "i";
case SIGIL -> cls = "g";
case NUMERIC -> cls = "n";
case STRING -> cls = "t";
case COMMENT -> cls = "m";
}

text.append("<c c=").append(cls).append(">").append(token.getText()).append("</c>");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static io.github.pastorgl.datacooker.Constants.OBJLVL_VALUE;
import static io.github.pastorgl.datacooker.data.ObjLvl.VALUE;
import static io.github.pastorgl.datacooker.scripting.TDL4.*;

public class ReplCompleter implements Completer {
Expand Down Expand Up @@ -321,7 +321,7 @@ private void completeTDL4(LineReader reader, ReplParsedLine rpl, List<Candidate>
case K_KEY: {
StreamInfo ds = dsFromTokens(stmtToks);
if (ds != null) {
ds.attrs.get(OBJLVL_VALUE).forEach(s -> candidates.add(new Candidate("KEY " + escapeId(s))));
ds.attrs.get(VALUE).forEach(s -> candidates.add(new Candidate("KEY " + escapeId(s))));
}

break;
Expand Down Expand Up @@ -376,7 +376,7 @@ private void completeTDL4(LineReader reader, ReplParsedLine rpl, List<Candidate>
case S_EQ: {
StreamInfo ds = dsFromTokens(stmtToks);
if (ds != null) {
ds.attrs.get(OBJLVL_VALUE).forEach(s -> candidates.add(new Candidate("= " + escapeId(s))));
ds.attrs.get(VALUE).forEach(s -> candidates.add(new Candidate("= " + escapeId(s))));
}
vp.getAll().forEach(s -> candidates.add(new Candidate("= $" + escapeId(s))));

Expand Down Expand Up @@ -450,7 +450,7 @@ private void completeTDL4(LineReader reader, ReplParsedLine rpl, List<Candidate>
case S_EQ: {
StreamInfo ds = dsFromTokens(stmtToks);
if (ds != null) {
ds.attrs.get(OBJLVL_VALUE).forEach(s -> candidates.add(new Candidate(escapeId(s))));
ds.attrs.get(VALUE).forEach(s -> candidates.add(new Candidate(escapeId(s))));
}
vp.getAll().forEach(s -> candidates.add(new Candidate("$" + escapeId(s))));

Expand All @@ -464,7 +464,7 @@ private void completeTDL4(LineReader reader, ReplParsedLine rpl, List<Candidate>
case K_KEY: {
StreamInfo ds = dsFromTokens(stmtToks);
if (ds != null) {
ds.attrs.get(OBJLVL_VALUE).forEach(s -> candidates.add(new Candidate(escapeId(s))));
ds.attrs.get(VALUE).forEach(s -> candidates.add(new Candidate(escapeId(s))));
}

break;
Expand Down Expand Up @@ -883,7 +883,7 @@ private void completeTDL4(LineReader reader, ReplParsedLine rpl, List<Candidate>
case K_KEY: {
StreamInfo ds = dsFromTokens(stmtToks.subList(stmtIndex, tokPos));
if (ds != null) {
ds.attrs.get(OBJLVL_VALUE).forEach(s -> candidates.add(new Candidate("KEY " + escapeId(s) + ";")));
ds.attrs.get(VALUE).forEach(s -> candidates.add(new Candidate("KEY " + escapeId(s) + ";")));
}

break;
Expand All @@ -894,7 +894,7 @@ private void completeTDL4(LineReader reader, ReplParsedLine rpl, List<Candidate>
} else {
StreamInfo ds = dsFromTokens(stmtToks.subList(stmtIndex, tokPos));
if (ds != null) {
ds.attrs.get(OBJLVL_VALUE).forEach(s -> candidates.add(new Candidate(escapeId(s))));
ds.attrs.get(VALUE).forEach(s -> candidates.add(new Candidate(escapeId(s))));
}
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ if_stmt
;

analyze_stmt
: K_ANALYZE K_DS? ds_name S_STAR? ( K_KEY attr )?
: K_ANALYZE K_DS? ds_name S_STAR? ( K_KEY attr )? K_PARTITION?
;

options_stmt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,4 @@ public class Constants {
public static final String UNDERSCORE = "_";
public static final String METRICS_DS = "_metrics";
public static final String CWD_VAR = "CWD";
public static final String OBJLVL_VALUE = "value";
public static final String OBJLVL_POINT = "point";
public static final String OBJLVL_TRACK = "track";
public static final String OBJLVL_SEGMENT = "segment";
public static final String OBJLVL_POLYGON = "polygon";
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.util.Collections;
import java.util.List;

import static io.github.pastorgl.datacooker.Constants.OBJLVL_VALUE;
import static io.github.pastorgl.datacooker.data.ObjLvl.VALUE;

@SuppressWarnings("unused")
public class CountByKeyOperation extends Operation {
Expand Down Expand Up @@ -60,8 +60,8 @@ public ListOrderedMap<String, DataStream> execute() {
.reduceByKey(Long::sum)
.mapToPair(t -> new Tuple2<>(t._1, new Columnar(indices, new Object[]{t._2})));

outputs.put(outputStreams.get(i), new DataStreamBuilder(outputStreams.get(i), StreamType.Columnar, Collections.singletonMap(OBJLVL_VALUE, indices))
.generated(meta.verb, input)
outputs.put(outputStreams.get(i), new DataStreamBuilder(outputStreams.get(i), Collections.singletonMap(VALUE, indices))
.generated(meta.verb, StreamType.Columnar, input)
.build(count)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.*;
import java.util.stream.Collectors;

import static io.github.pastorgl.datacooker.Constants.OBJLVL_VALUE;
import static io.github.pastorgl.datacooker.data.ObjLvl.VALUE;

@SuppressWarnings("unused")
public class SplitByAttrsOperation extends Operation {
Expand Down Expand Up @@ -114,8 +114,8 @@ public ListOrderedMap<String, DataStream> execute() {
.distinct();

if (outputStreams.containsKey(OUTPUT_SPLITS)) {
outputs.put(outputStreams.get(OUTPUT_SPLITS), new DataStreamBuilder(outputStreams.get(OUTPUT_SPLITS), StreamType.Columnar, Collections.singletonMap(OBJLVL_VALUE, _splitColumnNames))
.generated(meta.verb, input)
outputs.put(outputStreams.get(OUTPUT_SPLITS), new DataStreamBuilder(outputStreams.get(OUTPUT_SPLITS), Collections.singletonMap(VALUE, _splitColumnNames))
.generated(meta.verb, StreamType.Columnar, input)
.build(distinctSplits)
);
}
Expand Down Expand Up @@ -150,7 +150,7 @@ public ListOrderedMap<String, DataStream> execute() {
return ret.iterator();
});

outputs.put(splitName, new DataStreamBuilder(splitName, input.streamType, input.accessor.attributes())
outputs.put(splitName, new DataStreamBuilder(splitName, input.attributes())
.filtered(meta.verb, input)
.build(split)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

import java.util.*;

import static io.github.pastorgl.datacooker.Constants.OBJLVL_POINT;
import static io.github.pastorgl.datacooker.Constants.OBJLVL_VALUE;
import static io.github.pastorgl.datacooker.data.ObjLvl.POINT;
import static io.github.pastorgl.datacooker.data.ObjLvl.VALUE;

@SuppressWarnings("unused")
public class ColumnarToPointTransform extends Transform {
Expand Down Expand Up @@ -46,9 +46,9 @@ public TransformMeta meta() {
@Override
public StreamConverter converter() {
return (ds, newColumns, params) -> {
List<String> valueColumns = newColumns.get(OBJLVL_POINT);
List<String> valueColumns = newColumns.get(POINT);
if (valueColumns == null) {
valueColumns = ds.accessor.attributes(OBJLVL_VALUE);
valueColumns = ds.attributes(VALUE);
}

final List<String> _outputColumns = valueColumns;
Expand All @@ -65,8 +65,8 @@ public StreamConverter converter() {

final CoordinateSequenceFactory csFactory = SpatialRecord.FACTORY.getCoordinateSequenceFactory();

return new DataStreamBuilder(ds.name, StreamType.Point, Collections.singletonMap(OBJLVL_POINT, _outputColumns))
.transformed(meta.verb, ds)
return new DataStreamBuilder(ds.name, Collections.singletonMap(POINT, _outputColumns))
.transformed(meta.verb, StreamType.Point, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import java.util.Collections;
import java.util.List;

import static io.github.pastorgl.datacooker.Constants.OBJLVL_VALUE;
import static io.github.pastorgl.datacooker.data.ObjLvl.VALUE;

@SuppressWarnings("unused")
public class ColumnarToStructuredTransform extends Transform {
Expand All @@ -38,14 +38,14 @@ public StreamConverter converter() {
return (ds, newColumns, params) -> {
final String template = params.get(TEMPLATE);

List<String> valueColumns = newColumns.get(OBJLVL_VALUE);
List<String> valueColumns = newColumns.get(VALUE);
if (valueColumns == null) {
valueColumns = ds.accessor.attributes(OBJLVL_VALUE);
valueColumns = ds.attributes(VALUE);
}

final List<String> _outputColumns = valueColumns;
return new DataStreamBuilder(ds.name, StreamType.Structured, Collections.singletonMap(OBJLVL_VALUE, _outputColumns))
.transformed(meta.verb, ds)
return new DataStreamBuilder(ds.name, Collections.singletonMap(VALUE, _outputColumns))
.transformed(meta.verb, StreamType.Structured, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.List;
import java.util.stream.Collectors;

import static io.github.pastorgl.datacooker.Constants.OBJLVL_VALUE;
import static io.github.pastorgl.datacooker.data.ObjLvl.VALUE;

@SuppressWarnings("unused")
public class ColumnarToTextTransform extends Transform {
Expand Down Expand Up @@ -45,16 +45,16 @@ public StreamConverter converter() {
final String delimiter = params.get(DELIMITER);
final char _delimiter = delimiter.charAt(0);

List<String> valueColumns = newColumns.get(OBJLVL_VALUE);
List<String> valueColumns = newColumns.get(VALUE);
if (valueColumns == null) {
valueColumns = ds.accessor.attributes(OBJLVL_VALUE);
valueColumns = ds.attributes(VALUE);
}

final List<String> _outputColumns = valueColumns;
final int len = _outputColumns.size();

return new DataStreamBuilder(ds.name, StreamType.PlainText, null)
.transformed(meta.verb, ds)
return new DataStreamBuilder(ds.name, null)
.transformed(meta.verb, StreamType.PlainText, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

import java.util.*;

import static io.github.pastorgl.datacooker.Constants.*;

@SuppressWarnings("unused")
public class ColumnarToTrackTransform extends Transform {
static final String LAT_COLUMN = "lat_column";
Expand Down Expand Up @@ -68,9 +66,9 @@ public StreamConverter converter() {
final String _useridColumn = params.get(USERID_COLUMN);
final String _trackColumn = params.get(TRACKID_COLUMN);

List<String> pointColumns = newColumns.get(OBJLVL_POINT);
List<String> pointColumns = newColumns.get(ObjLvl.POINT);
if (pointColumns == null) {
pointColumns = ds.accessor.attributes(OBJLVL_VALUE);
pointColumns = ds.attributes(ObjLvl.VALUE);
}
final List<String> _pointColumns = pointColumns;

Expand Down Expand Up @@ -231,19 +229,21 @@ public StreamConverter converter() {
}, true)
.mapToPair(t -> t);

Map<String, List<String>> outputColumns = new HashMap<>();
outputColumns.put(OBJLVL_TRACK, Collections.singletonList(GEN_USERID));
Map<ObjLvl, List<String>> outputColumns = new HashMap<>();
outputColumns.put(ObjLvl.TRACK, Collections.singletonList(GEN_USERID));
List<String> segmentProps = new ArrayList<>();
segmentProps.add(GEN_USERID);
if (isSegmented) {
segmentProps.add(GEN_TRACKID);
}
outputColumns.put(OBJLVL_SEGMENT, segmentProps);
outputColumns.put(ObjLvl.SEGMENT, segmentProps);
List<String> pointProps = new ArrayList<>(_pointColumns);
pointProps.add(GEN_TIMESTAMP);
outputColumns.put(OBJLVL_POINT, pointProps);
outputColumns.put(ObjLvl.POINT, pointProps);

return new DataStreamBuilder(ds.name, StreamType.Track, outputColumns).transformed(meta.verb, ds).build(output);
return new DataStreamBuilder(ds.name, outputColumns)
.transformed(meta.verb, StreamType.Track, ds)
.build(output);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import java.util.*;

import static io.github.pastorgl.datacooker.Constants.OBJLVL_POINT;
import static io.github.pastorgl.datacooker.data.ObjLvl.POINT;

@SuppressWarnings("unused")
public class GeoJsonToPointTransform extends Transform {
Expand Down Expand Up @@ -48,10 +48,10 @@ public StreamConverter converter() {
String radiusColumn = params.get(RADIUS_PROP);
final double defaultRadius = params.get(RADIUS_DEFAULT);

List<String> _outputColumns = newColumns.get(OBJLVL_POINT);
List<String> _outputColumns = newColumns.get(POINT);

return new DataStreamBuilder(ds.name, StreamType.Point, newColumns)
.transformed(meta.verb, ds)
return new DataStreamBuilder(ds.name, newColumns)
.transformed(meta.verb, StreamType.Point, ds)
.build(ds.rdd.flatMapToPair(line -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import java.util.*;

import static io.github.pastorgl.datacooker.Constants.OBJLVL_POLYGON;
import static io.github.pastorgl.datacooker.data.ObjLvl.POLYGON;

@SuppressWarnings("unused")
public class GeoJsonToPolygonTransform extends Transform {
Expand All @@ -37,10 +37,10 @@ public TransformMeta meta() {
@Override
public StreamConverter converter() {
return (ds, newColumns, params) -> {
List<String> _outputColumns = newColumns.get(OBJLVL_POLYGON);
List<String> _outputColumns = newColumns.get(POLYGON);

return new DataStreamBuilder(ds.name, StreamType.Polygon, newColumns)
.transformed(meta.verb, ds)
return new DataStreamBuilder(ds.name, newColumns)
.transformed(meta.verb, StreamType.Polygon, ds)
.build(ds.rdd.flatMapToPair(line -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public StreamConverter converter() {
final String useridAttr = params.get(USERID_ATTR);
final String tsAttr = params.get(TIMESTAMP_ATTR);

return new DataStreamBuilder(ds.name, StreamType.Track, newColumns)
.transformed(meta.verb, ds)
return new DataStreamBuilder(ds.name, newColumns)
.transformed(meta.verb, StreamType.Track, ds)
.build(ds.rdd.flatMapToPair(line -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public TransformMeta meta() {

@Override
public StreamConverter converter() {
return (ds, newColumns, params) -> new DataStreamBuilder(ds.name, StreamType.Structured, newColumns)
.transformed(meta.verb, ds)
return (ds, newColumns, params) -> new DataStreamBuilder(ds.name, newColumns)
.transformed(meta.verb, StreamType.Structured, ds)
.build(ds.rdd.mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

Expand Down
Loading

0 comments on commit a8e1f8a

Please sign in to comment.