diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BaseChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BaseChunkWriter.java index d36ebd73834..33300a800b3 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BaseChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BaseChunkWriter.java @@ -33,16 +33,20 @@ public interface IsRowNullProvider> { protected final int elementSize; /** whether we can use the wire value as a deephaven null for clients that support dh nulls */ protected final boolean dhNullable; + /** whether the field is nullable */ + protected final boolean fieldNullable; BaseChunkWriter( @NotNull final IsRowNullProvider isRowNullProvider, @NotNull final Supplier emptyChunkSupplier, final int elementSize, - final boolean dhNullable) { + final boolean dhNullable, + final boolean fieldNullable) { this.isRowNullProvider = isRowNullProvider; this.emptyChunkSupplier = emptyChunkSupplier; this.elementSize = elementSize; this.dhNullable = dhNullable; + this.fieldNullable = fieldNullable; } @Override @@ -127,12 +131,12 @@ public int available() throws IOException { * the consumer understands which value is the assigned NULL. */ protected boolean sendValidityBuffer() { - return nullCount() != 0; + return !fieldNullable || nullCount() != 0; } @Override public int nullCount() { - return nullCount; + return fieldNullable ? nullCount : 0; } protected long writeValidityBuffer(final DataOutput dos) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BooleanChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BooleanChunkWriter.java index a892a4d01da..8ab636b44be 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BooleanChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BooleanChunkWriter.java @@ -20,10 +20,15 @@ public class BooleanChunkWriter extends BaseChunkWriter> { private static final String DEBUG_NAME = "BooleanChunkWriter"; - public static final BooleanChunkWriter INSTANCE = new BooleanChunkWriter(); + private static final BooleanChunkWriter NULLABLE_IDENTITY_INSTANCE = new BooleanChunkWriter(true); + private static final BooleanChunkWriter NON_NULLABLE_IDENTITY_INSTANCE = new BooleanChunkWriter(false); - public BooleanChunkWriter() { - super(ByteChunk::isNull, ByteChunk::getEmptyChunk, 0, false); + public static BooleanChunkWriter getIdentity(boolean isNullable) { + return isNullable ? NULLABLE_IDENTITY_INSTANCE : NON_NULLABLE_IDENTITY_INSTANCE; + } + + private BooleanChunkWriter(final boolean isNullable) { + super(ByteChunk::isNull, ByteChunk::getEmptyChunk, 0, false, isNullable); } @Override diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkWriter.java index f15d0a0b7c6..b608f71fe2e 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkWriter.java @@ -24,8 +24,15 @@ public class ByteChunkWriter> extends BaseChunkWriter { private static final String DEBUG_NAME = "ByteChunkWriter"; - public static final ByteChunkWriter> IDENTITY_INSTANCE = new ByteChunkWriter<>( - ByteChunk::isNull, ByteChunk::getEmptyChunk, ByteChunk::get); + private static final ByteChunkWriter> NULLABLE_IDENTITY_INSTANCE = new ByteChunkWriter<>( + ByteChunk::isNull, ByteChunk::getEmptyChunk, ByteChunk::get, false); + private static final ByteChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new ByteChunkWriter<>( + ByteChunk::isNull, ByteChunk::getEmptyChunk, ByteChunk::get, true); + + + public static ByteChunkWriter> getIdentity(boolean isNullable) { + return isNullable ? NULLABLE_IDENTITY_INSTANCE : NON_NULLABLE_IDENTITY_INSTANCE; + } @FunctionalInterface public interface ToByteTransformFunction> { @@ -37,8 +44,9 @@ public interface ToByteTransformFunction> public ByteChunkWriter( @NotNull final IsRowNullProvider isRowNullProvider, @NotNull final Supplier emptyChunkSupplier, - @Nullable final ToByteTransformFunction transform) { - super(isRowNullProvider, emptyChunkSupplier, Byte.BYTES, true); + @Nullable final ToByteTransformFunction transform, + final boolean fieldNullable) { + super(isRowNullProvider, emptyChunkSupplier, Byte.BYTES, true, fieldNullable); this.transform = transform; } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkWriter.java index 60117620765..dd96e696bfe 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkWriter.java @@ -20,8 +20,15 @@ public class CharChunkWriter> extends BaseChunkWriter { private static final String DEBUG_NAME = "CharChunkWriter"; - public static final CharChunkWriter> IDENTITY_INSTANCE = new CharChunkWriter<>( - CharChunk::isNull, CharChunk::getEmptyChunk, CharChunk::get); + private static final CharChunkWriter> NULLABLE_IDENTITY_INSTANCE = new CharChunkWriter<>( + CharChunk::isNull, CharChunk::getEmptyChunk, CharChunk::get, false); + private static final CharChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new CharChunkWriter<>( + CharChunk::isNull, CharChunk::getEmptyChunk, CharChunk::get, true); + + + public static CharChunkWriter> getIdentity(boolean isNullable) { + return isNullable ? NULLABLE_IDENTITY_INSTANCE : NON_NULLABLE_IDENTITY_INSTANCE; + } @FunctionalInterface public interface ToCharTransformFunction> { @@ -33,8 +40,9 @@ public interface ToCharTransformFunction> public CharChunkWriter( @NotNull final IsRowNullProvider isRowNullProvider, @NotNull final Supplier emptyChunkSupplier, - @Nullable final ToCharTransformFunction transform) { - super(isRowNullProvider, emptyChunkSupplier, Character.BYTES, true); + @Nullable final ToCharTransformFunction transform, + final boolean fieldNullable) { + super(isRowNullProvider, emptyChunkSupplier, Character.BYTES, true, fieldNullable); this.transform = transform; } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkWriterFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkWriterFactory.java index 7eda2504013..f18d56e4a6f 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkWriterFactory.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkWriterFactory.java @@ -32,6 +32,7 @@ import io.deephaven.vector.Vector; import org.apache.arrow.vector.PeriodDuration; import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.UnionMode; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; @@ -191,6 +192,7 @@ public > ChunkWriter newWriterPojo( if (toStringUnknownTypes) { // noinspection unchecked return (ChunkWriter) new VarBinaryChunkWriter<>( + field.isNullable(), (out, item) -> out.write(item.toString().getBytes(StandardCharsets.UTF_8))); } throw new UnsupportedOperationException(String.format( @@ -253,7 +255,8 @@ public > ChunkWriter newWriterPojo( final ChunkWriter> componentWriter = newWriterPojo(componentTypeInfo); // noinspection unchecked - return (ChunkWriter) new ListChunkWriter<>(mode, fixedSizeLength, kernel, componentWriter); + return (ChunkWriter) new ListChunkWriter<>( + mode, fixedSizeLength, kernel, componentWriter, field.isNullable()); } if (typeId == ArrowType.ArrowTypeID.Map) { @@ -267,7 +270,7 @@ public > ChunkWriter newWriterPojo( // noinspection unchecked return (ChunkWriter) new MapChunkWriter<>( - keyWriter, valueWriter, keyTypeInfo.chunkType(), valueTypeInfo.chunkType()); + keyWriter, valueWriter, keyTypeInfo.chunkType(), valueTypeInfo.chunkType(), field.isNullable()); } // TODO: if (typeId == ArrowType.ArrowTypeID.Struct) { @@ -290,8 +293,10 @@ public > ChunkWriter newWriterPojo( .map(BarrageTypeInfo::chunkType) .collect(Collectors.toList()); + UnionChunkWriter.Mode mode = unionType.getMode() == UnionMode.Sparse ? UnionChunkWriter.Mode.Sparse + : UnionChunkWriter.Mode.Dense; // noinspection unchecked - return (ChunkWriter) new UnionChunkWriter<>(unionType.getMode(), childClassMatcher, childWriters, + return (ChunkWriter) new UnionChunkWriter<>(mode, childClassMatcher, childWriters, childChunkTypes); } @@ -317,37 +322,44 @@ protected void register( registeredFactories.computeIfAbsent(arrowType, k -> new HashMap<>()) .put(Byte.class, typeInfo -> new ByteChunkWriter>( ObjectChunk::isNull, ObjectChunk::getEmptyChunk, - (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)))); + (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)), + typeInfo.arrowField().isNullable())); } else if (deephavenType == short.class) { registeredFactories.computeIfAbsent(arrowType, k -> new HashMap<>()) .put(Short.class, typeInfo -> new ShortChunkWriter>( ObjectChunk::isNull, ObjectChunk::getEmptyChunk, - (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)))); + (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)), + typeInfo.arrowField().isNullable())); } else if (deephavenType == int.class) { registeredFactories.computeIfAbsent(arrowType, k -> new HashMap<>()) .put(Integer.class, typeInfo -> new IntChunkWriter>( ObjectChunk::isNull, ObjectChunk::getEmptyChunk, - (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)))); + (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)), + typeInfo.arrowField().isNullable())); } else if (deephavenType == long.class) { registeredFactories.computeIfAbsent(arrowType, k -> new HashMap<>()) .put(Long.class, typeInfo -> new LongChunkWriter>( ObjectChunk::isNull, ObjectChunk::getEmptyChunk, - (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)))); + (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)), + typeInfo.arrowField().isNullable())); } else if (deephavenType == char.class) { registeredFactories.computeIfAbsent(arrowType, k -> new HashMap<>()) .put(Character.class, typeInfo -> new CharChunkWriter>( ObjectChunk::isNull, ObjectChunk::getEmptyChunk, - (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)))); + (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)), + typeInfo.arrowField().isNullable())); } else if (deephavenType == float.class) { registeredFactories.computeIfAbsent(arrowType, k -> new HashMap<>()) .put(Float.class, typeInfo -> new FloatChunkWriter>( ObjectChunk::isNull, ObjectChunk::getEmptyChunk, - (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)))); + (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)), + typeInfo.arrowField().isNullable())); } else if (deephavenType == double.class) { registeredFactories.computeIfAbsent(arrowType, k -> new HashMap<>()) .put(Double.class, typeInfo -> new DoubleChunkWriter>( ObjectChunk::isNull, ObjectChunk::getEmptyChunk, - (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)))); + (chunk, ii) -> TypeUtils.unbox(chunk.get(ii)), + typeInfo.arrowField().isNullable())); } } @@ -389,7 +401,7 @@ private static ChunkWriter> timestampFromLong( final ZonedDateTime value = source.asObjectChunk().get(offset); return value == null ? QueryConstants.NULL_LONG : DateTimeUtils.epochNanos(value) / factor; - }); + }, typeInfo.arrowField().isNullable()); } private static ChunkWriter> timestampFromInstant( @@ -398,7 +410,7 @@ private static ChunkWriter> timestampFromInstant( return new LongChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (source, offset) -> { final Instant value = source.get(offset); return value == null ? QueryConstants.NULL_LONG : DateTimeUtils.epochNanos(value) / factor; - }); + }, typeInfo.arrowField().isNullable()); } private static ChunkWriter> timestampFromZonedDateTime( @@ -408,33 +420,36 @@ private static ChunkWriter> timestampFromZone return new LongChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (source, offset) -> { final ZonedDateTime value = source.get(offset); return value == null ? QueryConstants.NULL_LONG : DateTimeUtils.epochNanos(value) / factor; - }); + }, typeInfo.arrowField().isNullable()); } private static ChunkWriter> utf8FromString( final BarrageTypeInfo typeInfo) { - return new VarBinaryChunkWriter<>((out, item) -> out.write(item.getBytes(StandardCharsets.UTF_8))); + return new VarBinaryChunkWriter<>(typeInfo.arrowField().isNullable(), + (out, item) -> out.write(item.getBytes(StandardCharsets.UTF_8))); } private static ChunkWriter> utf8FromObject( final BarrageTypeInfo typeInfo) { - return new VarBinaryChunkWriter<>((out, item) -> out.write(item.toString().getBytes(StandardCharsets.UTF_8))); + return new VarBinaryChunkWriter<>(typeInfo.arrowField().isNullable(), + (out, item) -> out.write(item.toString().getBytes(StandardCharsets.UTF_8))); } private static ChunkWriter> utf8FromPyObject( final BarrageTypeInfo typeInfo) { - return new VarBinaryChunkWriter<>((out, item) -> out.write(item.toString().getBytes(StandardCharsets.UTF_8))); + return new VarBinaryChunkWriter<>(typeInfo.arrowField().isNullable(), + (out, item) -> out.write(item.toString().getBytes(StandardCharsets.UTF_8))); } private static ChunkWriter> durationFromLong( final BarrageTypeInfo typeInfo) { final long factor = factorForTimeUnit(((ArrowType.Duration) typeInfo.arrowField().getType()).getUnit()); return factor == 1 - ? LongChunkWriter.IDENTITY_INSTANCE + ? LongChunkWriter.getIdentity(typeInfo.arrowField().isNullable()) : new LongChunkWriter<>(LongChunk::isNull, LongChunk::getEmptyChunk, (source, offset) -> { final long value = source.get(offset); return value == QueryConstants.NULL_LONG ? QueryConstants.NULL_LONG : value / factor; - }); + }, typeInfo.arrowField().isNullable()); } private static ChunkWriter> durationFromDuration( @@ -443,7 +458,7 @@ private static ChunkWriter> durationFromDuration( return new LongChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (source, offset) -> { final Duration value = source.get(offset); return value == null ? QueryConstants.NULL_LONG : value.toNanos() / factor; - }); + }, typeInfo.arrowField().isNullable()); } private static ChunkWriter> floatingPointFromFloat( @@ -456,14 +471,15 @@ private static ChunkWriter> floatingPointFromFloat( return value == QueryConstants.NULL_FLOAT ? QueryConstants.NULL_SHORT : Float16.toFloat16((float) value); - }); + }, typeInfo.arrowField().isNullable()); case SINGLE: - return FloatChunkWriter.IDENTITY_INSTANCE; + return FloatChunkWriter.getIdentity(typeInfo.arrowField().isNullable()); case DOUBLE: return new DoubleChunkWriter<>(FloatChunk::isNull, FloatChunk::getEmptyChunk, - (source, offset) -> QueryLanguageFunctionUtils.doubleCast(source.get(offset))); + (source, offset) -> QueryLanguageFunctionUtils.doubleCast(source.get(offset)), + typeInfo.arrowField().isNullable()); default: throw new IllegalArgumentException("Unexpected floating point precision: " + fpType.getPrecision()); @@ -480,13 +496,14 @@ private static ChunkWriter> floatingPointFromDouble( return value == QueryConstants.NULL_DOUBLE ? QueryConstants.NULL_SHORT : Float16.toFloat16((float) value); - }); + }, typeInfo.arrowField().isNullable()); case SINGLE: return new FloatChunkWriter<>(DoubleChunk::isNull, DoubleChunk::getEmptyChunk, - (source, offset) -> QueryLanguageFunctionUtils.floatCast(source.get(offset))); + (source, offset) -> QueryLanguageFunctionUtils.floatCast(source.get(offset)), + typeInfo.arrowField().isNullable()); case DOUBLE: - return DoubleChunkWriter.IDENTITY_INSTANCE; + return DoubleChunkWriter.getIdentity(typeInfo.arrowField().isNullable()); default: throw new IllegalArgumentException("Unexpected floating point precision: " + fpType.getPrecision()); @@ -503,15 +520,17 @@ private static ChunkWriter> floatingPointFromBig return value == null ? QueryConstants.NULL_SHORT : Float16.toFloat16(value.floatValue()); - }); + }, typeInfo.arrowField().isNullable()); case SINGLE: return new FloatChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, - (source, offset) -> QueryLanguageFunctionUtils.floatCast(source.get(offset))); + (source, offset) -> QueryLanguageFunctionUtils.floatCast(source.get(offset)), + typeInfo.arrowField().isNullable()); case DOUBLE: return new DoubleChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, - (source, offset) -> QueryLanguageFunctionUtils.doubleCast(source.get(offset))); + (source, offset) -> QueryLanguageFunctionUtils.doubleCast(source.get(offset)), + typeInfo.arrowField().isNullable()); default: throw new IllegalArgumentException("Unexpected floating point precision: " + fpType.getPrecision()); @@ -520,31 +539,35 @@ private static ChunkWriter> floatingPointFromBig private static ChunkWriter> binaryFromByteArray( final BarrageTypeInfo typeInfo) { - return new VarBinaryChunkWriter<>(OutputStream::write); + return new VarBinaryChunkWriter<>(typeInfo.arrowField().isNullable(), + OutputStream::write); } private static ChunkWriter> binaryFromBigInt( final BarrageTypeInfo typeInfo) { - return new VarBinaryChunkWriter<>((out, item) -> out.write(item.toByteArray())); + return new VarBinaryChunkWriter<>(typeInfo.arrowField().isNullable(), + (out, item) -> out.write(item.toByteArray())); } private static ChunkWriter> binaryFromBigDecimal( final BarrageTypeInfo typeInfo) { - return new VarBinaryChunkWriter<>((out, item) -> { - final BigDecimal normal = item.stripTrailingZeros(); - final int v = normal.scale(); - // Write as little endian, arrow endianness. - out.write(0xFF & v); - out.write(0xFF & (v >> 8)); - out.write(0xFF & (v >> 16)); - out.write(0xFF & (v >> 24)); - out.write(normal.unscaledValue().toByteArray()); - }); + return new VarBinaryChunkWriter<>(typeInfo.arrowField().isNullable(), + (out, item) -> { + final BigDecimal normal = item.stripTrailingZeros(); + final int v = normal.scale(); + // Write as little endian, arrow endianness. + out.write(0xFF & v); + out.write(0xFF & (v >> 8)); + out.write(0xFF & (v >> 16)); + out.write(0xFF & (v >> 24)); + out.write(normal.unscaledValue().toByteArray()); + }); } private static ChunkWriter> binaryFromSchema( final BarrageTypeInfo typeInfo) { - return new VarBinaryChunkWriter<>(ArrowIpcUtil::serialize); + return new VarBinaryChunkWriter<>(typeInfo.arrowField().isNullable(), + ArrowIpcUtil::serialize); } private static ChunkWriter> timeFromLong( @@ -560,13 +583,13 @@ private static ChunkWriter> timeFromLong( long value = chunk.get(ii); value = value == QueryConstants.NULL_LONG ? QueryConstants.NULL_LONG : value / factor; return QueryLanguageFunctionUtils.intCast(value); - }); + }, typeInfo.arrowField().isNullable()); case 64: return new LongChunkWriter<>(LongChunk::isNull, LongChunk::getEmptyChunk, (chunk, ii) -> { long value = chunk.get(ii); return value == QueryConstants.NULL_LONG ? QueryConstants.NULL_LONG : value / factor; - }); + }, typeInfo.arrowField().isNullable()); default: throw new IllegalArgumentException("Unexpected bit width: " + bitWidth); @@ -601,13 +624,13 @@ private static ChunkWriter> timeFromLocalTime( final LocalTime lt = chunk.get(ii); final long value = lt == null ? QueryConstants.NULL_LONG : lt.toNanoOfDay() / factor; return QueryLanguageFunctionUtils.intCast(value); - }); + }, typeInfo.arrowField().isNullable()); case 64: return new LongChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (chunk, ii) -> { final LocalTime lt = chunk.get(ii); return lt == null ? QueryConstants.NULL_LONG : lt.toNanoOfDay() / factor; - }); + }, typeInfo.arrowField().isNullable()); default: throw new IllegalArgumentException("Unexpected bit width: " + bitWidth); @@ -626,6 +649,7 @@ private static ChunkWriter> decimalFromByte( .negate(); return new FixedWidthChunkWriter<>(ByteChunk::isNull, ByteChunk::getEmptyChunk, byteWidth, false, + typeInfo.arrowField().isNullable(), (out, chunk, offset) -> { byte value = chunk.get(offset); if (value == QueryConstants.NULL_BYTE) { @@ -649,6 +673,7 @@ private static ChunkWriter> decimalFromChar( .negate(); return new FixedWidthChunkWriter<>(CharChunk::isNull, CharChunk::getEmptyChunk, byteWidth, false, + typeInfo.arrowField().isNullable(), (out, chunk, offset) -> { char value = chunk.get(offset); if (value == QueryConstants.NULL_CHAR) { @@ -672,6 +697,7 @@ private static ChunkWriter> decimalFromShort( .negate(); return new FixedWidthChunkWriter<>(ShortChunk::isNull, ShortChunk::getEmptyChunk, byteWidth, false, + typeInfo.arrowField().isNullable(), (out, chunk, offset) -> { short value = chunk.get(offset); if (value == QueryConstants.NULL_SHORT) { @@ -695,6 +721,7 @@ private static ChunkWriter> decimalFromInt( .negate(); return new FixedWidthChunkWriter<>(IntChunk::isNull, IntChunk::getEmptyChunk, byteWidth, false, + typeInfo.arrowField().isNullable(), (out, chunk, offset) -> { int value = chunk.get(offset); if (value == QueryConstants.NULL_INT) { @@ -718,6 +745,7 @@ private static ChunkWriter> decimalFromLong( .negate(); return new FixedWidthChunkWriter<>(LongChunk::isNull, LongChunk::getEmptyChunk, byteWidth, false, + typeInfo.arrowField().isNullable(), (out, chunk, offset) -> { long value = chunk.get(offset); if (value == QueryConstants.NULL_LONG) { @@ -741,6 +769,7 @@ private static ChunkWriter> decimalFromBigIntege .negate(); return new FixedWidthChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, byteWidth, false, + typeInfo.arrowField().isNullable(), (out, chunk, offset) -> { BigInteger value = chunk.get(offset); if (value == null) { @@ -764,6 +793,7 @@ private static ChunkWriter> decimalFromFloat( .negate(); return new FixedWidthChunkWriter<>(FloatChunk::isNull, FloatChunk::getEmptyChunk, byteWidth, false, + typeInfo.arrowField().isNullable(), (out, chunk, offset) -> { float value = chunk.get(offset); if (value == QueryConstants.NULL_FLOAT) { @@ -787,6 +817,7 @@ private static ChunkWriter> decimalFromDouble( .negate(); return new FixedWidthChunkWriter<>(DoubleChunk::isNull, DoubleChunk::getEmptyChunk, byteWidth, false, + typeInfo.arrowField().isNullable(), (out, chunk, offset) -> { double value = chunk.get(offset); if (value == QueryConstants.NULL_DOUBLE) { @@ -810,6 +841,7 @@ private static ChunkWriter> decimalFromBigDecima .negate(); return new FixedWidthChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, byteWidth, false, + typeInfo.arrowField().isNullable(), (out, chunk, offset) -> { BigDecimal value = chunk.get(offset); if (value == null) { @@ -848,16 +880,19 @@ private static ChunkWriter> intFromByte( switch (bitWidth) { case 8: - return ByteChunkWriter.IDENTITY_INSTANCE; + return ByteChunkWriter.getIdentity(typeInfo.arrowField().isNullable()); case 16: return new ShortChunkWriter<>(ByteChunk::isNull, ByteChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 32: return new IntChunkWriter<>(ByteChunk::isNull, ByteChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 64: return new LongChunkWriter<>(ByteChunk::isNull, ByteChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.longCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.longCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); default: throw new IllegalArgumentException("Unexpected bit width: " + bitWidth); } @@ -871,15 +906,18 @@ private static ChunkWriter> intFromShort( switch (bitWidth) { case 8: return new ByteChunkWriter<>(ShortChunk::isNull, ShortChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 16: - return ShortChunkWriter.IDENTITY_INSTANCE; + return ShortChunkWriter.getIdentity(typeInfo.arrowField().isNullable()); case 32: return new IntChunkWriter<>(ShortChunk::isNull, ShortChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 64: return new LongChunkWriter<>(ShortChunk::isNull, ShortChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.longCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.longCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); default: throw new IllegalArgumentException("Unexpected bit width: " + bitWidth); } @@ -893,15 +931,18 @@ private static ChunkWriter> intFromInt( switch (bitWidth) { case 8: return new ByteChunkWriter<>(IntChunk::isNull, IntChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 16: return new ShortChunkWriter<>(IntChunk::isNull, IntChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 32: - return IntChunkWriter.IDENTITY_INSTANCE; + return IntChunkWriter.getIdentity(typeInfo.arrowField().isNullable()); case 64: return new LongChunkWriter<>(IntChunk::isNull, IntChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.longCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.longCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); default: throw new IllegalArgumentException("Unexpected bit width: " + bitWidth); } @@ -915,15 +956,18 @@ private static ChunkWriter> intFromLong( switch (bitWidth) { case 8: return new ByteChunkWriter<>(LongChunk::isNull, LongChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 16: return new ShortChunkWriter<>(LongChunk::isNull, LongChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 32: return new IntChunkWriter<>(LongChunk::isNull, LongChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 64: - return LongChunkWriter.IDENTITY_INSTANCE; + return LongChunkWriter.getIdentity(typeInfo.arrowField().isNullable()); default: throw new IllegalArgumentException("Unexpected bit width: " + bitWidth); } @@ -937,16 +981,20 @@ private static ChunkWriter> intFromObject( switch (bitWidth) { case 8: return new ByteChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 16: return new ShortChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 32: return new IntChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 64: return new LongChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.longCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.longCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); default: throw new IllegalArgumentException("Unexpected bit width: " + bitWidth); } @@ -961,20 +1009,24 @@ private static ChunkWriter> intFromChar( switch (bitWidth) { case 8: return new ByteChunkWriter<>(CharChunk::isNull, CharChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 16: if (unsigned) { - return CharChunkWriter.IDENTITY_INSTANCE; + return CharChunkWriter.getIdentity(typeInfo.arrowField().isNullable()); } else { return new ShortChunkWriter<>(CharChunk::isNull, CharChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); } case 32: return new IntChunkWriter<>(CharChunk::isNull, CharChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 64: return new LongChunkWriter<>(CharChunk::isNull, CharChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.longCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.longCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); default: throw new IllegalArgumentException("Unexpected bit width: " + bitWidth); } @@ -988,16 +1040,20 @@ private static ChunkWriter> intFromFloat( switch (bitWidth) { case 8: return new ByteChunkWriter<>(FloatChunk::isNull, FloatChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 16: return new ShortChunkWriter<>(FloatChunk::isNull, FloatChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 32: return new IntChunkWriter<>(FloatChunk::isNull, FloatChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 64: return new LongChunkWriter<>(FloatChunk::isNull, FloatChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.longCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.longCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); default: throw new IllegalArgumentException("Unexpected bit width: " + bitWidth); } @@ -1011,16 +1067,20 @@ private static ChunkWriter> intFromDouble( switch (bitWidth) { case 8: return new ByteChunkWriter<>(DoubleChunk::isNull, DoubleChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 16: return new ShortChunkWriter<>(DoubleChunk::isNull, DoubleChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 32: return new IntChunkWriter<>(DoubleChunk::isNull, DoubleChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case 64: return new LongChunkWriter<>(DoubleChunk::isNull, DoubleChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.longCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.longCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); default: throw new IllegalArgumentException("Unexpected bit width: " + bitWidth); } @@ -1028,7 +1088,7 @@ private static ChunkWriter> intFromDouble( private static ChunkWriter> boolFromBoolean( final BarrageTypeInfo typeInfo) { - return new BooleanChunkWriter(); + return BooleanChunkWriter.getIdentity(typeInfo.arrowField().isNullable()); } private static ChunkWriter> fixedSizeBinaryFromByteArray( @@ -1036,6 +1096,7 @@ private static ChunkWriter> fixedSizeBinaryFromByteA final ArrowType.FixedSizeBinary fixedSizeBinary = (ArrowType.FixedSizeBinary) typeInfo.arrowField().getType(); final int elementWidth = fixedSizeBinary.getByteWidth(); return new FixedWidthChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, elementWidth, false, + typeInfo.arrowField().isNullable(), (out, chunk, offset) -> { final byte[] data = chunk.get(offset); if (data.length != elementWidth) { @@ -1054,7 +1115,8 @@ private static ChunkWriter> dateFromInt( switch (dateType.getUnit()) { case DAY: return new IntChunkWriter<>(IntChunk::isNull, IntChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case MILLISECOND: final long factor = Duration.ofDays(1).toMillis(); @@ -1063,7 +1125,7 @@ private static ChunkWriter> dateFromInt( return value == QueryConstants.NULL_LONG ? QueryConstants.NULL_LONG : (value * factor); - }); + }, typeInfo.arrowField().isNullable()); default: throw new IllegalArgumentException("Unexpected date unit: " + dateType.getUnit()); } @@ -1076,7 +1138,8 @@ private static ChunkWriter> dateFromLong( switch (dateType.getUnit()) { case DAY: return new IntChunkWriter<>(LongChunk::isNull, LongChunk::getEmptyChunk, - (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii))); + (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii)), + typeInfo.arrowField().isNullable()); case MILLISECOND: final long factor = Duration.ofDays(1).toMillis(); @@ -1085,7 +1148,7 @@ private static ChunkWriter> dateFromLong( return value == QueryConstants.NULL_LONG ? QueryConstants.NULL_LONG : (value * factor); - }); + }, typeInfo.arrowField().isNullable()); default: throw new IllegalArgumentException("Unexpected date unit: " + dateType.getUnit()); } @@ -1110,13 +1173,13 @@ private static ChunkWriter> dateFromLocalDate( return new IntChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (chunk, ii) -> { final LocalDate value = chunk.get(ii); return value == null ? QueryConstants.NULL_INT : (int) value.toEpochDay(); - }); + }, typeInfo.arrowField().isNullable()); case MILLISECOND: final long factor = Duration.ofDays(1).toMillis(); return new LongChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (chunk, ii) -> { final LocalDate value = chunk.get(ii); return value == null ? QueryConstants.NULL_LONG : value.toEpochDay() * factor; - }); + }, typeInfo.arrowField().isNullable()); default: throw new IllegalArgumentException("Unexpected date unit: " + dateType.getUnit()); } @@ -1137,7 +1200,7 @@ private static ChunkWriter> intervalFromDurationLong( final long nsPerDay = Duration.ofDays(1).toNanos(); final long nsPerMs = Duration.ofMillis(1).toNanos(); return new FixedWidthChunkWriter<>(LongChunk::isNull, LongChunk::getEmptyChunk, Integer.BYTES * 2, - false, + false, typeInfo.arrowField().isNullable(), (out, source, offset) -> { final long value = source.get(offset); if (value == QueryConstants.NULL_LONG) { @@ -1169,7 +1232,7 @@ private static ChunkWriter> intervalFromDuration( case DAY_TIME: final long nsPerMs = Duration.ofMillis(1).toNanos(); return new FixedWidthChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, Integer.BYTES * 2, - false, + false, typeInfo.arrowField().isNullable(), (out, source, offset) -> { final Duration value = source.get(offset); if (value == null) { @@ -1217,10 +1280,10 @@ private static ChunkWriter> intervalFromPeriod( return new IntChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (chunk, ii) -> { final Period value = chunk.get(ii); return value == null ? QueryConstants.NULL_INT : value.getMonths() + value.getYears() * 12; - }); + }, typeInfo.arrowField().isNullable()); case DAY_TIME: return new FixedWidthChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, Integer.BYTES * 2, - false, + false, typeInfo.arrowField().isNullable(), (out, chunk, offset) -> { final Period value = chunk.get(offset); if (value == null) { @@ -1234,7 +1297,7 @@ private static ChunkWriter> intervalFromPeriod( }); case MONTH_DAY_NANO: return new FixedWidthChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, - Integer.BYTES * 2 + Long.BYTES, false, + Integer.BYTES * 2 + Long.BYTES, false, typeInfo.arrowField().isNullable(), (out, chunk, offset) -> { final Period value = chunk.get(offset); if (value == null) { @@ -1262,10 +1325,10 @@ private static ChunkWriter> intervalFromPeri return new IntChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, (chunk, ii) -> { final Period value = chunk.get(ii).getPeriod(); return value == null ? QueryConstants.NULL_INT : value.getMonths() + value.getYears() * 12; - }); + }, typeInfo.arrowField().isNullable()); case DAY_TIME: return new FixedWidthChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, Integer.BYTES * 2, - false, + false, typeInfo.arrowField().isNullable(), (out, chunk, offset) -> { final PeriodDuration value = chunk.get(offset); if (value == null) { @@ -1279,7 +1342,7 @@ private static ChunkWriter> intervalFromPeri }); case MONTH_DAY_NANO: return new FixedWidthChunkWriter<>(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, - Integer.BYTES * 2 + Long.BYTES, false, + Integer.BYTES * 2 + Long.BYTES, false, typeInfo.arrowField().isNullable(), (out, chunk, offset) -> { final PeriodDuration value = chunk.get(offset); if (value == null) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkWriter.java index 965ae4d1f47..60341a7df74 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkWriter.java @@ -24,8 +24,15 @@ public class DoubleChunkWriter> extends BaseChunkWriter { private static final String DEBUG_NAME = "DoubleChunkWriter"; - public static final DoubleChunkWriter> IDENTITY_INSTANCE = new DoubleChunkWriter<>( - DoubleChunk::isNull, DoubleChunk::getEmptyChunk, DoubleChunk::get); + private static final DoubleChunkWriter> NULLABLE_IDENTITY_INSTANCE = new DoubleChunkWriter<>( + DoubleChunk::isNull, DoubleChunk::getEmptyChunk, DoubleChunk::get, false); + private static final DoubleChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new DoubleChunkWriter<>( + DoubleChunk::isNull, DoubleChunk::getEmptyChunk, DoubleChunk::get, true); + + + public static DoubleChunkWriter> getIdentity(boolean isNullable) { + return isNullable ? NULLABLE_IDENTITY_INSTANCE : NON_NULLABLE_IDENTITY_INSTANCE; + } @FunctionalInterface public interface ToDoubleTransformFunction> { @@ -37,8 +44,9 @@ public interface ToDoubleTransformFunction public DoubleChunkWriter( @NotNull final IsRowNullProvider isRowNullProvider, @NotNull final Supplier emptyChunkSupplier, - @Nullable final ToDoubleTransformFunction transform) { - super(isRowNullProvider, emptyChunkSupplier, Double.BYTES, true); + @Nullable final ToDoubleTransformFunction transform, + final boolean fieldNullable) { + super(isRowNullProvider, emptyChunkSupplier, Double.BYTES, true, fieldNullable); this.transform = transform; } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FixedWidthChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FixedWidthChunkWriter.java index d159dc7f559..b823c0a60c1 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FixedWidthChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FixedWidthChunkWriter.java @@ -33,8 +33,9 @@ public FixedWidthChunkWriter( @NotNull final Supplier emptyChunkSupplier, final int elementSize, final boolean dhNullable, + final boolean fieldNullable, final Appender appendItem) { - super(isRowNullProvider, emptyChunkSupplier, elementSize, dhNullable); + super(isRowNullProvider, emptyChunkSupplier, elementSize, dhNullable, fieldNullable); this.appendItem = appendItem; } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkWriter.java index 1b3b58689a0..de0ec413fff 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkWriter.java @@ -24,8 +24,15 @@ public class FloatChunkWriter> extends BaseChunkWriter { private static final String DEBUG_NAME = "FloatChunkWriter"; - public static final FloatChunkWriter> IDENTITY_INSTANCE = new FloatChunkWriter<>( - FloatChunk::isNull, FloatChunk::getEmptyChunk, FloatChunk::get); + private static final FloatChunkWriter> NULLABLE_IDENTITY_INSTANCE = new FloatChunkWriter<>( + FloatChunk::isNull, FloatChunk::getEmptyChunk, FloatChunk::get, false); + private static final FloatChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new FloatChunkWriter<>( + FloatChunk::isNull, FloatChunk::getEmptyChunk, FloatChunk::get, true); + + + public static FloatChunkWriter> getIdentity(boolean isNullable) { + return isNullable ? NULLABLE_IDENTITY_INSTANCE : NON_NULLABLE_IDENTITY_INSTANCE; + } @FunctionalInterface public interface ToFloatTransformFunction> { @@ -37,8 +44,9 @@ public interface ToFloatTransformFunction> public FloatChunkWriter( @NotNull final IsRowNullProvider isRowNullProvider, @NotNull final Supplier emptyChunkSupplier, - @Nullable final ToFloatTransformFunction transform) { - super(isRowNullProvider, emptyChunkSupplier, Float.BYTES, true); + @Nullable final ToFloatTransformFunction transform, + final boolean fieldNullable) { + super(isRowNullProvider, emptyChunkSupplier, Float.BYTES, true, fieldNullable); this.transform = transform; } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkWriter.java index f598e8a629b..4f566b586a2 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkWriter.java @@ -24,8 +24,15 @@ public class IntChunkWriter> extends BaseChunkWriter { private static final String DEBUG_NAME = "IntChunkWriter"; - public static final IntChunkWriter> IDENTITY_INSTANCE = new IntChunkWriter<>( - IntChunk::isNull, IntChunk::getEmptyChunk, IntChunk::get); + private static final IntChunkWriter> NULLABLE_IDENTITY_INSTANCE = new IntChunkWriter<>( + IntChunk::isNull, IntChunk::getEmptyChunk, IntChunk::get, false); + private static final IntChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new IntChunkWriter<>( + IntChunk::isNull, IntChunk::getEmptyChunk, IntChunk::get, true); + + + public static IntChunkWriter> getIdentity(boolean isNullable) { + return isNullable ? NULLABLE_IDENTITY_INSTANCE : NON_NULLABLE_IDENTITY_INSTANCE; + } @FunctionalInterface public interface ToIntTransformFunction> { @@ -37,8 +44,9 @@ public interface ToIntTransformFunction> { public IntChunkWriter( @NotNull final IsRowNullProvider isRowNullProvider, @NotNull final Supplier emptyChunkSupplier, - @Nullable final ToIntTransformFunction transform) { - super(isRowNullProvider, emptyChunkSupplier, Integer.BYTES, true); + @Nullable final ToIntTransformFunction transform, + final boolean fieldNullable) { + super(isRowNullProvider, emptyChunkSupplier, Integer.BYTES, true, fieldNullable); this.transform = transform; } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ListChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ListChunkWriter.java index c158be06149..dfa2477a2bd 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ListChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ListChunkWriter.java @@ -33,8 +33,9 @@ public ListChunkWriter( final ListChunkReader.Mode mode, final int fixedSizeLength, final ExpansionKernel kernel, - final ChunkWriter componentWriter) { - super(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, 0, false); + final ChunkWriter componentWriter, + final boolean fieldNullable) { + super(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, 0, false, fieldNullable); this.mode = mode; this.fixedSizeLength = fixedSizeLength; this.kernel = kernel; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkWriter.java index 016a0ec4bb7..02e1e92e207 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkWriter.java @@ -24,8 +24,15 @@ public class LongChunkWriter> extends BaseChunkWriter { private static final String DEBUG_NAME = "LongChunkWriter"; - public static final LongChunkWriter> IDENTITY_INSTANCE = new LongChunkWriter<>( - LongChunk::isNull, LongChunk::getEmptyChunk, LongChunk::get); + private static final LongChunkWriter> NULLABLE_IDENTITY_INSTANCE = new LongChunkWriter<>( + LongChunk::isNull, LongChunk::getEmptyChunk, LongChunk::get, false); + private static final LongChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new LongChunkWriter<>( + LongChunk::isNull, LongChunk::getEmptyChunk, LongChunk::get, true); + + + public static LongChunkWriter> getIdentity(boolean isNullable) { + return isNullable ? NULLABLE_IDENTITY_INSTANCE : NON_NULLABLE_IDENTITY_INSTANCE; + } @FunctionalInterface public interface ToLongTransformFunction> { @@ -37,8 +44,9 @@ public interface ToLongTransformFunction> public LongChunkWriter( @NotNull final IsRowNullProvider isRowNullProvider, @NotNull final Supplier emptyChunkSupplier, - @Nullable final ToLongTransformFunction transform) { - super(isRowNullProvider, emptyChunkSupplier, Long.BYTES, true); + @Nullable final ToLongTransformFunction transform, + final boolean fieldNullable) { + super(isRowNullProvider, emptyChunkSupplier, Long.BYTES, true, fieldNullable); this.transform = transform; } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/MapChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/MapChunkWriter.java index 7ae00513b3c..eb7b68cabd1 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/MapChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/MapChunkWriter.java @@ -38,8 +38,9 @@ public MapChunkWriter( final ChunkWriter> keyWriter, final ChunkWriter> valueWriter, final ChunkType keyWriterChunkType, - final ChunkType valueWriterChunkType) { - super(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, 0, false); + final ChunkType valueWriterChunkType, + final boolean fieldNullable) { + super(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, 0, false, fieldNullable); this.keyWriter = keyWriter; this.valueWriter = valueWriter; this.keyWriterChunkType = keyWriterChunkType; @@ -67,7 +68,7 @@ public Context( int numOffsets = chunk.size() + 1; offsets = WritableIntChunk.makeWritableChunk(numOffsets); offsets.setSize(0); - if (chunk.size() != 0) { + if (chunk.size() != 0) { offsets.add(0); } for (int ii = 0; ii < chunk.size(); ++ii) { @@ -200,6 +201,9 @@ public void visitBuffers(final BufferListener listener) { } listener.noteLogicalBuffer(padBufferSize(numOffsetBytes)); + // a validity buffer for the inner struct ?? + listener.noteLogicalBuffer(0); + // payload keyColumn.visitBuffers(listener); valueColumn.visitBuffers(listener); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/NullChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/NullChunkWriter.java index 20d399d4125..c1f56c5512c 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/NullChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/NullChunkWriter.java @@ -17,7 +17,7 @@ public class NullChunkWriter> extends Base public static final NullChunkWriter> INSTANCE = new NullChunkWriter<>(); public NullChunkWriter() { - super((chunk, idx) -> true, () -> null, 0, true); + super((chunk, idx) -> true, () -> null, 0, true, true); } @Override diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkWriter.java index eb257457b2c..47571306816 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkWriter.java @@ -24,8 +24,15 @@ public class ShortChunkWriter> extends BaseChunkWriter { private static final String DEBUG_NAME = "ShortChunkWriter"; - public static final ShortChunkWriter> IDENTITY_INSTANCE = new ShortChunkWriter<>( - ShortChunk::isNull, ShortChunk::getEmptyChunk, ShortChunk::get); + private static final ShortChunkWriter> NULLABLE_IDENTITY_INSTANCE = new ShortChunkWriter<>( + ShortChunk::isNull, ShortChunk::getEmptyChunk, ShortChunk::get, false); + private static final ShortChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new ShortChunkWriter<>( + ShortChunk::isNull, ShortChunk::getEmptyChunk, ShortChunk::get, true); + + + public static ShortChunkWriter> getIdentity(boolean isNullable) { + return isNullable ? NULLABLE_IDENTITY_INSTANCE : NON_NULLABLE_IDENTITY_INSTANCE; + } @FunctionalInterface public interface ToShortTransformFunction> { @@ -37,8 +44,9 @@ public interface ToShortTransformFunction> public ShortChunkWriter( @NotNull final IsRowNullProvider isRowNullProvider, @NotNull final Supplier emptyChunkSupplier, - @Nullable final ToShortTransformFunction transform) { - super(isRowNullProvider, emptyChunkSupplier, Short.BYTES, true); + @Nullable final ToShortTransformFunction transform, + final boolean fieldNullable) { + super(isRowNullProvider, emptyChunkSupplier, Short.BYTES, true, fieldNullable); this.transform = transform; } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/UnionChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/UnionChunkWriter.java index e37184141fb..239d421905a 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/UnionChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/UnionChunkWriter.java @@ -3,11 +3,10 @@ // package io.deephaven.extensions.barrage.chunk; +import com.google.common.io.LittleEndianDataOutputStream; import io.deephaven.base.verify.Assert; -import io.deephaven.chunk.ByteChunk; import io.deephaven.chunk.Chunk; import io.deephaven.chunk.ChunkType; -import io.deephaven.chunk.IntChunk; import io.deephaven.chunk.ObjectChunk; import io.deephaven.chunk.WritableByteChunk; import io.deephaven.chunk.WritableIntChunk; @@ -18,7 +17,6 @@ import io.deephaven.extensions.barrage.BarrageOptions; import io.deephaven.util.BooleanUtils; import io.deephaven.util.datastructures.LongSizedDataStructure; -import org.apache.arrow.vector.types.UnionMode; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -28,19 +26,23 @@ import java.util.stream.Collectors; public class UnionChunkWriter extends BaseChunkWriter> { + public enum Mode { + Dense, Sparse + } + private static final String DEBUG_NAME = "UnionChunkWriter"; - private final UnionMode mode; + private final Mode mode; private final List> classMatchers; private final List>> writers; private final List writerChunkTypes; public UnionChunkWriter( - final UnionMode mode, + final Mode mode, final List> classMatchers, final List>> writers, final List writerChunkTypes) { - super(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, 0, false); + super(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, 0, false, false); this.mode = mode; this.classMatchers = classMatchers; this.writers = writers; @@ -75,8 +77,8 @@ public DrainableColumn getInputStream( private class UnionChunkInputStream extends BaseChunkInputStream { private int cachedSize = -1; - private final DrainableColumn columnOfInterest; - private final DrainableColumn columnOffset; + private final WritableByteChunk columnOfInterest; + private final WritableIntChunk columnOffset; private final DrainableColumn[] innerColumns; private UnionChunkInputStream( @@ -86,8 +88,7 @@ private UnionChunkInputStream( super(context, mySubset, options); final int numColumns = classMatchers.size(); final ObjectChunk chunk = context.getChunk(); - final WritableIntChunk columnOffset; - if (mode == UnionMode.Sparse) { + if (mode == Mode.Sparse) { columnOffset = null; } else { // noinspection resource @@ -96,14 +97,14 @@ private UnionChunkInputStream( // noinspection resource - final WritableByteChunk columnOfInterest = WritableByteChunk.makeWritableChunk(chunk.size()); + columnOfInterest = WritableByteChunk.makeWritableChunk(chunk.size()); // noinspection unchecked final WritableObjectChunk[] innerChunks = new WritableObjectChunk[numColumns]; for (int ii = 0; ii < numColumns; ++ii) { // noinspection resource innerChunks[ii] = WritableObjectChunk.makeWritableChunk(chunk.size()); - if (mode == UnionMode.Sparse) { + if (mode == Mode.Sparse) { innerChunks[ii].fillWithNullValue(0, chunk.size()); } else { innerChunks[ii].setSize(0); @@ -114,11 +115,12 @@ private UnionChunkInputStream( int jj; for (jj = 0; jj < classMatchers.size(); ++jj) { if (value.getClass().isAssignableFrom(classMatchers.get(jj))) { - if (mode == UnionMode.Sparse) { + if (mode == Mode.Sparse) { columnOfInterest.set(ii, (byte) jj); innerChunks[jj].set(ii, value); } else { columnOfInterest.set(ii, (byte) jj); + columnOffset.set(ii, innerChunks[jj].size()); innerChunks[jj].add(value); } break; @@ -147,7 +149,8 @@ private UnionChunkInputStream( // note that we do not close the kernel since we steal the inner chunk into the context final ChunkUnboxer.UnboxerKernel kernel = chunkType == ChunkType.Object - ? null : ChunkUnboxer.getUnboxer(chunkType, innerChunk.size()); + ? null + : ChunkUnboxer.getUnboxer(chunkType, innerChunk.size()); // noinspection unchecked try (ChunkWriter.Context> innerContext = writer.makeContext(kernel != null @@ -157,25 +160,11 @@ private UnionChunkInputStream( innerColumns[ii] = writer.getInputStream(innerContext, null, options); } } - - if (columnOffset == null) { - this.columnOffset = new NullChunkWriter.NullDrainableColumn(); - } else { - final IntChunkWriter> writer = IntChunkWriter.IDENTITY_INSTANCE; - try (ChunkWriter.Context> innerContext = writer.makeContext(columnOffset, 0)) { - this.columnOffset = writer.getInputStream(innerContext, null, options); - } - } - - final ByteChunkWriter> coiWriter = ByteChunkWriter.IDENTITY_INSTANCE; - try (ChunkWriter.Context> innerContext = coiWriter.makeContext(columnOfInterest, 0)) { - this.columnOfInterest = coiWriter.getInputStream(innerContext, null, options); - } } @Override public void visitFieldNodes(final FieldNodeListener listener) { - columnOfInterest.visitFieldNodes(listener); + listener.noteLogicalFieldNode(subset.intSize(), nullCount()); for (DrainableColumn innerColumn : innerColumns) { innerColumn.visitFieldNodes(listener); } @@ -183,8 +172,13 @@ public void visitFieldNodes(final FieldNodeListener listener) { @Override public void visitBuffers(final BufferListener listener) { - columnOfInterest.visitBuffers(listener); - columnOffset.visitBuffers(listener); + // one buffer for the column of interest + listener.noteLogicalBuffer(padBufferSize(subset.intSize(DEBUG_NAME))); + // one buffer for the column offset + if (columnOffset != null) { + listener.noteLogicalBuffer(padBufferSize((long) Integer.BYTES * subset.intSize(DEBUG_NAME))); + } + for (DrainableColumn innerColumn : innerColumns) { innerColumn.visitBuffers(listener); } @@ -204,8 +198,8 @@ public void close() throws IOException { protected int getRawSize() throws IOException { if (cachedSize == -1) { long size = 0; - size += columnOfInterest.available(); - size += columnOffset.available(); + size += padBufferSize(subset.intSize(DEBUG_NAME)); + size += padBufferSize(Integer.BYTES * subset.size()); for (DrainableColumn innerColumn : innerColumns) { size += innerColumn.available(); } @@ -223,8 +217,21 @@ public int drainTo(final OutputStream outputStream) throws IOException { read = true; long bytesWritten = 0; - bytesWritten += columnOfInterest.drainTo(outputStream); - bytesWritten += columnOffset.drainTo(outputStream); + final LittleEndianDataOutputStream dos = new LittleEndianDataOutputStream(outputStream); + // must write out the column of interest + for (int ii = 0; ii < columnOfInterest.size(); ++ii) { + dos.writeByte(columnOfInterest.get(ii)); + } + bytesWritten += columnOfInterest.size(); + bytesWritten += writePadBuffer(dos, bytesWritten); + + // must write out the column offset + for (int ii = 0; ii < columnOffset.size(); ++ii) { + dos.writeInt(columnOffset.get(ii)); + } + bytesWritten += LongSizedDataStructure.intSize(DEBUG_NAME, (long) Integer.BYTES * columnOffset.size()); + bytesWritten += writePadBuffer(dos, bytesWritten); + for (DrainableColumn innerColumn : innerColumns) { bytesWritten += innerColumn.drainTo(outputStream); } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarBinaryChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarBinaryChunkWriter.java index d6084b0e289..d509cbbac51 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarBinaryChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarBinaryChunkWriter.java @@ -33,8 +33,9 @@ public interface Appender { private final Appender appendItem; public VarBinaryChunkWriter( + final boolean fieldNullable, final Appender appendItem) { - super(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, 0, false); + super(ObjectChunk::isNull, ObjectChunk::getEmptyChunk, 0, false, fieldNullable); this.appendItem = appendItem; } diff --git a/extensions/barrage/src/main/resources/io/deephaven/extensions/barrage/Barrage.gwt.xml b/extensions/barrage/src/main/resources/io/deephaven/extensions/barrage/Barrage.gwt.xml index 6607a6cbfca..3360608b9a7 100644 --- a/extensions/barrage/src/main/resources/io/deephaven/extensions/barrage/Barrage.gwt.xml +++ b/extensions/barrage/src/main/resources/io/deephaven/extensions/barrage/Barrage.gwt.xml @@ -10,5 +10,6 @@ + diff --git a/extensions/flight-sql/src/test/java/io/deephaven/server/flightsql/FlightSqlTest.java b/extensions/flight-sql/src/test/java/io/deephaven/server/flightsql/FlightSqlTest.java index 0b56d813c59..c366adaeab9 100644 --- a/extensions/flight-sql/src/test/java/io/deephaven/server/flightsql/FlightSqlTest.java +++ b/extensions/flight-sql/src/test/java/io/deephaven/server/flightsql/FlightSqlTest.java @@ -46,6 +46,7 @@ import org.apache.arrow.flight.sql.FlightSqlClient.SubstraitPlan; import org.apache.arrow.flight.sql.FlightSqlClient.Transaction; import org.apache.arrow.flight.sql.FlightSqlUtils; +import org.apache.arrow.flight.sql.impl.FlightSql; import org.apache.arrow.flight.sql.impl.FlightSql.ActionBeginSavepointRequest; import org.apache.arrow.flight.sql.impl.FlightSql.ActionBeginTransactionRequest; import org.apache.arrow.flight.sql.impl.FlightSql.ActionCancelQueryRequest; @@ -71,6 +72,7 @@ import org.apache.arrow.flight.sql.util.TableRef; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.ArrowType.Utf8; import org.apache.arrow.vector.types.pojo.Field; @@ -648,10 +650,22 @@ public void getSqlInfo() throws Exception { int numRows = 0; int flightCount = 0; + boolean found = false; while (stream.next()) { ++flightCount; numRows += stream.getRoot().getRowCount(); + + // validate the data: + final List vs = stream.getRoot().getFieldVectors(); + for (int ii = 0; ii < stream.getRoot().getRowCount(); ++ii) { + if (vs.get(0).getObject(ii).equals(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_NAME_VALUE)) { + found = true; + assertThat(vs.get(1).getObject(ii).toString()).isEqualTo("Deephaven"); + break; + } + } } + assertThat(found).isTrue(); assertThat(flightCount).isEqualTo(1); assertThat(numRows).isEqualTo(8); }