Skip to content

Commit

Permalink
Dense Union Reader + fixedChunkLength fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Dec 4, 2024
1 parent 675e745 commit 4eadb96
Show file tree
Hide file tree
Showing 25 changed files with 773 additions and 263 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ public enum CopyAttributeOperation {
CopyAttributeOperation.Flatten, // add flatten for now because web flattens all views
CopyAttributeOperation.Preview));

tempMap.put(BARRAGE_SCHEMA_ATTRIBUTE, EnumSet.of(
CopyAttributeOperation.Filter));

attributeToCopySet = Collections.unmodifiableMap(tempMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@
import java.time.Period;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -272,18 +274,12 @@ public <T extends WritableChunk<Values>> ChunkReader<T> newReaderPojo(
// maybe defaults to Map<String, Object>

if (typeId == ArrowType.ArrowTypeID.Union) {
// TODO: defaults to Object
final ArrowType.Union unionType = (ArrowType.Union) field.getType();
switch (unionType.getMode()) {
case Sparse:
// TODO NATE NOCOMMIT: implement
break;
case Dense:
// TODO NATE NOCOMMIT: implement
break;
default:
throw new IllegalArgumentException("Unexpected union mode: " + unionType.getMode());
}
final List<ChunkReader<WritableChunk<Values>>> innerReaders = new ArrayList<>();

// noinspection unchecked
return (ChunkReader<T>) new UnionChunkReader<T>(
UnionChunkReader.mode(unionType.getMode()), innerReaders);
}

throw new UnsupportedOperationException(String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ public <T extends Chunk<Values>> ChunkWriter<T> newWriterPojo(
.map(BarrageTypeInfo::chunkType)
.collect(Collectors.toList());

UnionChunkWriter.Mode mode = unionType.getMode() == UnionMode.Sparse ? UnionChunkWriter.Mode.Sparse
: UnionChunkWriter.Mode.Dense;
UnionChunkReader.Mode mode = unionType.getMode() == UnionMode.Sparse ? UnionChunkReader.Mode.Sparse
: UnionChunkReader.Mode.Dense;
// noinspection unchecked
return (ChunkWriter<T>) new UnionChunkWriter<>(mode, childClassMatcher, childWriters,
childChunkTypes);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableByteChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableIntChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.ChunkPositions;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.table.impl.chunkboxer.ChunkBoxer;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.SafeCloseableList;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import org.apache.arrow.vector.types.UnionMode;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.DataInput;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.PrimitiveIterator;

public class UnionChunkReader<T> extends BaseChunkReader<WritableObjectChunk<T, Values>> {
public enum Mode {
Dense, Sparse
}
public static Mode mode(UnionMode mode) {
return mode == UnionMode.Dense ? Mode.Dense : Mode.Sparse;
}

private static final String DEBUG_NAME = "UnionChunkReader";

private final Mode mode;
private final List<ChunkReader<WritableChunk<Values>>> readers;

public UnionChunkReader(
final Mode mode,
final List<ChunkReader<WritableChunk<Values>>> readers) {
this.mode = mode;
this.readers = readers;
// the specification doesn't allow the union column to have more than signed byte number of types
Assert.leq(readers.size(), "readers.size()", Byte.MAX_VALUE, "Byte.MAX_VALUE");
}

@Override
public WritableObjectChunk<T, Values> readChunk(
@NotNull final Iterator<ChunkWriter.FieldNodeInfo> fieldNodeIter,
@NotNull final PrimitiveIterator.OfLong bufferInfoIter,
@NotNull final DataInput is,
@Nullable final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {
final ChunkWriter.FieldNodeInfo nodeInfo = fieldNodeIter.next();
// column of interest buffer
final long coiBufferLength = bufferInfoIter.nextLong();
// if Dense we also have an offset buffer
final long offsetsBufferLength = mode == Mode.Dense ? bufferInfoIter.nextLong() : 0;

int numRows = nodeInfo.numElements;
if (numRows == 0) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, coiBufferLength + offsetsBufferLength));
for (final ChunkReader<WritableChunk<Values>> reader : readers) {
// noinspection EmptyTryBlock
try (final SafeCloseable ignored = reader.readChunk(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {
// do nothing; we need each reader to consume fieldNodeIter and bufferInfoIter
}
}
return WritableObjectChunk.makeWritableChunk(numRows);
}

try (final WritableByteChunk<ChunkPositions> columnsOfInterest =
WritableByteChunk.makeWritableChunk(numRows);
final WritableIntChunk<ChunkPositions> offsets = mode == Mode.Sparse
? null
: WritableIntChunk.makeWritableChunk(numRows);
final SafeCloseableList closeableList = new SafeCloseableList()) {

// noinspection unchecked
final ObjectChunk<T, Values>[] chunks = new ObjectChunk[readers.size()];

for (int ii = 0; ii < readers.size(); ++ii) {
final WritableChunk<Values> chunk =
readers.get(ii).readChunk(fieldNodeIter, bufferInfoIter, is, null, 0, 0);
closeableList.add(chunk);

final ChunkBoxer.BoxerKernel boxer = ChunkBoxer.getBoxer(chunk.getChunkType(), chunk.size());
closeableList.add(boxer);

// noinspection unchecked
chunks[ii] = (ObjectChunk<T, Values>) boxer.box(chunk);
}

final WritableObjectChunk<T, Values> result;
if (outChunk != null) {
result = outChunk.asWritableObjectChunk();
} else {
result = WritableObjectChunk.makeWritableChunk(numRows);
result.setSize(numRows);
}

for (int ii = 0; ii < result.size(); ++ii) {
final byte coi = columnsOfInterest.get(ii);
final int offset;
if (mode == Mode.Dense) {
offset = offsets.get(ii);
} else {
offset = ii;
}

result.set(ii, chunks[coi].get(offset));
}

return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,15 @@
import java.util.stream.Collectors;

public class UnionChunkWriter<T> extends BaseChunkWriter<ObjectChunk<T, Values>> {
public enum Mode {
Dense, Sparse
}

private static final String DEBUG_NAME = "UnionChunkWriter";

private final Mode mode;
private final UnionChunkReader.Mode mode;
private final List<Class<?>> classMatchers;
private final List<ChunkWriter<Chunk<Values>>> writers;
private final List<ChunkType> writerChunkTypes;

public UnionChunkWriter(
final Mode mode,
final UnionChunkReader.Mode mode,
final List<Class<?>> classMatchers,
final List<ChunkWriter<Chunk<Values>>> writers,
final List<ChunkType> writerChunkTypes) {
Expand All @@ -48,7 +44,7 @@ public UnionChunkWriter(
this.writers = writers;
this.writerChunkTypes = writerChunkTypes;
// the specification doesn't allow the union column to have more than signed byte number of types
Assert.leq(classMatchers.size(), "classMatchers.size()", 127);
Assert.leq(classMatchers.size(), "classMatchers.size()", Byte.MAX_VALUE, "Byte.MAX_VALUE");
}

@Override
Expand Down Expand Up @@ -88,7 +84,7 @@ private UnionChunkInputStream(
super(context, mySubset, options);
final int numColumns = classMatchers.size();
final ObjectChunk<T, Values> chunk = context.getChunk();
if (mode == Mode.Sparse) {
if (mode == UnionChunkReader.Mode.Sparse) {
columnOffset = null;
} else {
// noinspection resource
Expand All @@ -104,7 +100,7 @@ private UnionChunkInputStream(
// noinspection resource
innerChunks[ii] = WritableObjectChunk.makeWritableChunk(chunk.size());

if (mode == Mode.Sparse) {
if (mode == UnionChunkReader.Mode.Sparse) {
innerChunks[ii].fillWithNullValue(0, chunk.size());
} else {
innerChunks[ii].setSize(0);
Expand All @@ -115,7 +111,7 @@ private UnionChunkInputStream(
int jj;
for (jj = 0; jj < classMatchers.size(); ++jj) {
if (value.getClass().isAssignableFrom(classMatchers.get(jj))) {
if (mode == Mode.Sparse) {
if (mode == UnionChunkReader.Mode.Sparse) {
columnOfInterest.set(ii, (byte) jj);
innerChunks[jj].set(ii, value);
} else {
Expand Down Expand Up @@ -156,6 +152,10 @@ private UnionChunkInputStream(
try (ChunkWriter.Context<Chunk<Values>> innerContext = writer.makeContext(kernel != null
? (Chunk<Values>) kernel.unbox(innerChunk)
: innerChunk, 0)) {
if (kernel != null) {
// while we did steal the kernel's chunk after unboxing, now no one owns the original chunk
innerChunk.close();
}

innerColumns[ii] = writer.getInputStream(innerContext, null, options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import org.jetbrains.annotations.Nullable;

public class BooleanArrayExpansionKernel implements ArrayExpansionKernel<boolean[]> {
private final static boolean[] ZERO_LEN_ARRAY = new boolean[0];
public final static BooleanArrayExpansionKernel INSTANCE = new BooleanArrayExpansionKernel();
public static final BooleanArrayExpansionKernel INSTANCE = new BooleanArrayExpansionKernel();

private static final String DEBUG_NAME = "BooleanArrayExpansionKernel";
private static final boolean[] ZERO_LEN_ARRAY = new boolean[0];

@Override
public <A extends Any> WritableChunk<A> expand(
Expand All @@ -39,15 +41,17 @@ public <A extends Any> WritableChunk<A> expand(

long totalSize = 0;
for (int ii = 0; ii < typedSource.size(); ++ii) {
final boolean[] row = typedSource.get(ii);
int rowLen = row == null ? 0 : row.length;
if (fixedSizeLength > 0) {
rowLen = Math.min(rowLen, fixedSizeLength);
int rowLen;
if (fixedSizeLength != 0) {
rowLen = Math.abs(fixedSizeLength);
} else {
final boolean[] row = typedSource.get(ii);
rowLen = row == null ? 0 : row.length;
}
totalSize += rowLen;
}
final WritableByteChunk<A> result = WritableByteChunk.makeWritableChunk(
LongSizedDataStructure.intSize("ExpansionKernel", totalSize));
LongSizedDataStructure.intSize(DEBUG_NAME, totalSize));

int lenWritten = 0;
if (offsetsDest != null) {
Expand All @@ -58,18 +62,36 @@ public <A extends Any> WritableChunk<A> expand(
if (offsetsDest != null) {
offsetsDest.set(ii, lenWritten);
}
if (row == null) {
continue;
}
int rowLen = row.length;
if (fixedSizeLength > 0) {
rowLen = Math.min(rowLen, fixedSizeLength);
int written = 0;
if (row != null) {
int offset = 0;
if (fixedSizeLength != 0) {
// limit length to fixedSizeLength
written = Math.min(row.length, Math.abs(fixedSizeLength));
if (fixedSizeLength < 0 && written < row.length) {
// read from the end of the array when fixedSizeLength is negative
offset = row.length - written;
}
} else {
written = row.length;
}

// copy the row into the result
for (int j = 0; j < written; ++j) {
final byte value = row[j] ? BooleanUtils.TRUE_BOOLEAN_AS_BYTE : BooleanUtils.FALSE_BOOLEAN_AS_BYTE;
result.set(lenWritten + j, value);
}
}
for (int j = 0; j < rowLen; ++j) {
final byte value = row[j] ? BooleanUtils.TRUE_BOOLEAN_AS_BYTE : BooleanUtils.FALSE_BOOLEAN_AS_BYTE;
result.set(lenWritten + j, value);
if (fixedSizeLength != 0) {
final int toNull = LongSizedDataStructure.intSize(
DEBUG_NAME, Math.max(0, Math.abs(fixedSizeLength) - written));
if (toNull > 0) {
// fill the rest of the row with nulls
result.fillWithNullValue(lenWritten + written, toNull);
written += toNull;
}
}
lenWritten += rowLen;
lenWritten += written;
}
if (offsetsDest != null) {
offsetsDest.set(typedSource.size(), lenWritten);
Expand Down
Loading

0 comments on commit 4eadb96

Please sign in to comment.