Skip to content

Commit

Permalink
small rename, refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
devinrsmith committed Jan 20, 2025
1 parent c4b42b4 commit e1f6cb0
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public interface Builder {
final void checkMapping() {
for (Map.Entry<String, ColumnDescriptor> e : mapping().entrySet()) {
final ColumnDescriptor columnDescriptor = e.getValue();
if (!ParquetUtil.contains(schema(), columnDescriptor)) {
if (!ParquetSchemaUtil.contains(schema(), columnDescriptor)) {
throw new IllegalArgumentException(
String.format("schema does not contain Deephaven columnName=%s columnDescriptor=%s", e.getKey(),
columnDescriptor));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private ParquetFieldIdColumnResolverFactory(Map<Integer, Set<String>> fieldIdsTo
*/
public ParquetColumnResolverMap of(MessageType schema) {
final FieldIdMappingVisitor visitor = new FieldIdMappingVisitor();
ParquetUtil.walk(schema, visitor);
ParquetSchemaUtil.walk(schema, visitor);
return ParquetColumnResolverMap.builder()
.schema(schema)
.putAllMapping(visitor.nameToColumnDescriptor)
Expand All @@ -115,7 +115,7 @@ public ParquetColumnResolverMap of(TableKey tableKey, ParquetTableLocationKey ta
return of(tableLocationKey.getSchema());
}

private class FieldIdMappingVisitor implements ParquetUtil.Visitor {
private class FieldIdMappingVisitor implements ParquetSchemaUtil.Visitor {
private final Map<String, ColumnDescriptor> nameToColumnDescriptor = new HashMap<>();

@Override
Expand All @@ -133,7 +133,7 @@ public void accept(Collection<Type> path, PrimitiveType primitiveType) {
if (set == null) {
continue;
}
final ColumnDescriptor columnDescriptor = ParquetUtil.makeColumnDescriptor(path, primitiveType);
final ColumnDescriptor columnDescriptor = ParquetSchemaUtil.makeColumnDescriptor(path, primitiveType);
for (String columnName : set) {
final ColumnDescriptor existing = nameToColumnDescriptor.putIfAbsent(columnName, columnDescriptor);
if (existing != null && !existing.equals(columnDescriptor)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.function.Consumer;
import java.util.function.Predicate;

final class ParquetUtil {
final class ParquetSchemaUtil {

interface Visitor {

Expand Down Expand Up @@ -108,8 +108,8 @@ static ColumnDescriptor getColumnDescriptor(MessageType schema, String[] path) {

static ColumnDescriptor makeColumnDescriptor(Collection<Type> path, PrimitiveType primitiveType) {
final String[] namePath = path.stream().map(Type::getName).toArray(String[]::new);
final int maxRep = (int) path.stream().filter(ParquetUtil::isRepeated).count();
final int maxDef = (int) path.stream().filter(Predicate.not(ParquetUtil::isRequired)).count();
final int maxRep = (int) path.stream().filter(ParquetSchemaUtil::isRepeated).count();
final int maxDef = (int) path.stream().filter(Predicate.not(ParquetSchemaUtil::isRequired)).count();
return new ColumnDescriptor(namePath, primitiveType, maxRep, maxDef);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,25 +185,27 @@ public List<SortColumn> getSortedColumns() {
@Override
@NotNull
protected ColumnLocation makeColumnLocation(@NotNull final String columnName) {
final List<String> nameList;
final String parquetColumnName = readInstructions.getParquetColumnNameFromColumnNameOrDefault(columnName);
if (resolver == null) {
final String[] columnPath = parquetColumnNameToPath.get(parquetColumnName);
nameList = columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath);
} else {
// empty list will result in exists=false
nameList = resolver.of(columnName)
.map(Arrays::asList)
.orElse(List.of());
}
final List<String> columnPath = getColumnPath(columnName, parquetColumnName);
final ColumnChunkReader[] columnChunkReaders = Arrays.stream(getRowGroupReaders())
.map(rgr -> rgr.getColumnChunk(columnName, nameList))
.map(rgr -> rgr.getColumnChunk(columnName, columnPath))
.toArray(ColumnChunkReader[]::new);
final boolean exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0);
return new ParquetColumnLocation<>(this, columnName, parquetColumnName,
exists ? columnChunkReaders : null);
}

private List<String> getColumnPath(@NotNull String columnName, String parquetColumnNameOrDefault) {
if (resolver != null) {
// empty list will result in exists=false
return resolver.of(columnName).map(Arrays::asList).orElse(List.of());
}
final String[] columnPath = parquetColumnNameToPath.get(parquetColumnNameOrDefault);
return columnPath == null
? Collections.singletonList(parquetColumnNameOrDefault)
: Arrays.asList(columnPath);
}

private RowSet computeIndex() {
final RowSetBuilderSequential sequentialBuilder = RowSetFactory.builderSequential();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import static org.apache.parquet.schema.Types.required;
import static org.assertj.core.api.Assertions.assertThat;

public class ParquetUtilTest {
public class ParquetSchemaUtilTest {

private static final MessageType SCHEMA;

Expand Down Expand Up @@ -57,36 +57,36 @@ public class ParquetUtilTest {
@Test
public void getColumnsEmpty() {
final MessageType schema = Types.buildMessage().named("root");
final List<ColumnDescriptor> columns = ParquetUtil.getColumns(schema);
final List<ColumnDescriptor> columns = ParquetSchemaUtil.getColumns(schema);
assertThat(columns)
.usingElementComparator(equalityMethod(ParquetUtil::columnDescriptorEquals))
.usingElementComparator(equalityMethod(ParquetSchemaUtil::columnDescriptorEquals))
.isEqualTo(schema.getColumns());
}

@Test
public void getColumns() {
final List<ColumnDescriptor> columns = ParquetUtil.getColumns(SCHEMA);
final List<ColumnDescriptor> columns = ParquetSchemaUtil.getColumns(SCHEMA);
assertThat(columns)
.usingElementComparator(equalityMethod(ParquetUtil::columnDescriptorEquals))
.usingElementComparator(equalityMethod(ParquetSchemaUtil::columnDescriptorEquals))
.isEqualTo(SCHEMA.getColumns());

}

@Test
public void getColumnDescriptor() {
for (ColumnDescriptor expected : ParquetUtil.getColumns(SCHEMA)) {
assertThat(ParquetUtil.getColumnDescriptor(SCHEMA, expected.getPath()))
.usingComparator(equalityMethod(ParquetUtil::columnDescriptorEquals))
for (ColumnDescriptor expected : ParquetSchemaUtil.getColumns(SCHEMA)) {
assertThat(ParquetSchemaUtil.getColumnDescriptor(SCHEMA, expected.getPath()))
.usingComparator(equalityMethod(ParquetSchemaUtil::columnDescriptorEquals))
.isEqualTo(expected);
}
}

@Test
public void contains() {
for (ColumnDescriptor column : ParquetUtil.getColumns(SCHEMA)) {
assertThat(ParquetUtil.contains(SCHEMA, column)).isTrue();
for (ColumnDescriptor column : ParquetSchemaUtil.getColumns(SCHEMA)) {
assertThat(ParquetSchemaUtil.contains(SCHEMA, column)).isTrue();
}
assertThat(ParquetUtil.contains(SCHEMA,
assertThat(ParquetSchemaUtil.contains(SCHEMA,
new ColumnDescriptor(new String[] {"Required"}, required(INT32).named("Required"), 0, 0))).isTrue();
for (ColumnDescriptor column : new ColumnDescriptor[] {
new ColumnDescriptor(new String[] {"Required"}, optional(INT32).named("Required"), 0, 0),
Expand All @@ -102,7 +102,7 @@ public void contains() {
new ColumnDescriptor(new String[] {"Required"}, optional(INT32).named("Required"), 0, 0),
new ColumnDescriptor(new String[] {}, repeated(INT32).named("Required"), 0, 0)
}) {
assertThat(ParquetUtil.contains(SCHEMA, column)).isFalse();
assertThat(ParquetSchemaUtil.contains(SCHEMA, column)).isFalse();
}
}

Expand Down

0 comments on commit e1f6cb0

Please sign in to comment.