Skip to content

Commit

Permalink
Increased compatibility with Flight reference implementation of `Do…
Browse files Browse the repository at this point in the history
…Exchange` (#1964)

* added flight reference impl compatibility for DoExchange

* updated to require magic bytes in FlightData.FlightDescriptor.CMD
  • Loading branch information
lbooker42 authored Feb 28, 2022
1 parent 05c79ad commit e57e24b
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 27 deletions.
4 changes: 4 additions & 0 deletions java-client/flight-examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ dependencies {
implementation project(':java-client-flight-dagger')
implementation project(':java-client-example-utilities')

implementation "io.deephaven.barrage:barrage-format:0.4.0"

Classpaths.inheritJUnitPlatform(project)
Classpaths.inheritAssertJ(project)
testImplementation 'org.junit.jupiter:junit-jupiter'
Expand Down Expand Up @@ -43,6 +45,8 @@ applicationDistribution.into('bin') {
from(createApplication('aggregate-all', 'io.deephaven.client.examples.AggregateAllExample'))
from(createApplication('agg-by', 'io.deephaven.client.examples.AggByExample'))

from(createApplication('do-exchange', 'io.deephaven.client.examples.DoExchange'))

from(createApplication('do-put-new', 'io.deephaven.client.examples.DoPutNew'))
from(createApplication('do-put-spray', 'io.deephaven.client.examples.DoPutSpray'))
from(createApplication('do-put-table', 'io.deephaven.client.examples.DoPutTable'))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.deephaven.client.examples;

import com.google.flatbuffers.FlatBufferBuilder;
import com.google.protobuf.ByteString;
import io.deephaven.client.impl.FlightSession;
import io.deephaven.proto.util.ScopeTicketHelper;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.RootAllocator;
import picocli.CommandLine;
import picocli.CommandLine.ArgGroup;
import picocli.CommandLine.Command;

import io.deephaven.barrage.flatbuf.*;

@Command(name = "do-exchange", mixinStandardHelpOptions = true,
description = "Start a DoExchange session with the server", version = "0.1.0")
class DoExchange extends FlightExampleBase {

@ArgGroup(exclusive = true, multiplicity = "1")
Ticket ticket;

@Override
protected void execute(FlightSession flight) throws Exception {

// need to provide the MAGIC bytes as the FlightDescriptor.cmd in the initial message
byte[] cmd = new byte[] {100, 112, 104, 110}; // equivalent to '0x6E687064' (ASCII "dphn")

FlightDescriptor fd = FlightDescriptor.command(cmd);

// create the bi-directional reader/writer
try (FlightClient.ExchangeReaderWriter erw = flight.startExchange(fd);
final RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE)) {

/////////////////////////////////////////////////////////////
// create a BarrageSnapshotRequest for ticket 's/timetable'
/////////////////////////////////////////////////////////////

// inner metadata for the snapshot request
final FlatBufferBuilder metadata = new FlatBufferBuilder();

int optOffset =
BarrageSnapshotOptions.createBarrageSnapshotOptions(metadata, ColumnConversionMode.Stringify,
false, 1000);

final int ticOffset =
BarrageSnapshotRequest.createTicketVector(metadata,
ScopeTicketHelper.nameToBytes(ticket.scopeField.variable));
BarrageSnapshotRequest.startBarrageSnapshotRequest(metadata);
BarrageSnapshotRequest.addColumns(metadata, 0);
BarrageSnapshotRequest.addViewport(metadata, 0);
BarrageSnapshotRequest.addSnapshotOptions(metadata, optOffset);
BarrageSnapshotRequest.addTicket(metadata, ticOffset);
metadata.finish(BarrageSnapshotRequest.endBarrageSnapshotRequest(metadata));

// outer metadata to ID the message type and provide the MAGIC bytes
final FlatBufferBuilder wrapper = new FlatBufferBuilder();
final int innerOffset = wrapper.createByteVector(metadata.dataBuffer());
wrapper.finish(BarrageMessageWrapper.createBarrageMessageWrapper(
wrapper,
0x6E687064, // the numerical representation of the ASCII "dphn".
BarrageMessageType.BarrageSnapshotRequest,
innerOffset));

// extract the bytes and package them in an ArrowBuf for transmission
cmd = wrapper.sizedByteArray();
ArrowBuf data = allocator.buffer(cmd.length);
data.writeBytes(cmd);

// `putMetadata()` makes the GRPC call
erw.getWriter().putMetadata(data);

// snapshot requests do not need to stay open on the client side
erw.getWriter().completed();

// read everything from the server
while (erw.getReader().next()) {
// NOP
}

// print the table data
System.out.println(erw.getReader().getSchema().toString());
System.out.println(erw.getReader().getRoot().contentToTSVString());
}
}

public static void main(String[] args) {
int execute = new CommandLine(new DoExchange()).execute(args);
System.exit(execute);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@
import io.deephaven.proto.flight.util.SchemaHelper;
import io.deephaven.qst.table.NewTable;
import io.grpc.ManagedChannel;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightGrpcUtilsExtension;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.*;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.types.pojo.Schema;

Expand Down Expand Up @@ -79,6 +75,18 @@ public FlightStream stream(HasTicketId ticketId) {
return FlightClientHelper.get(client, ticketId);
}

/**
* Creates a new server side DoExchange session.
*
* @param descriptor the FlightDescriptor object to include on the first FlightData message (other fields will
* remain null)
* @param options the GRPC otions to apply to this call
* @return the bi-directional ReaderWriter object
*/
public FlightClient.ExchangeReaderWriter startExchange(FlightDescriptor descriptor, CallOption... options) {
return client.doExchange(descriptor, options);
}

/**
* Creates a new server side exported table backed by the server semantics of DoPut with a {@link NewTable} payload.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import gnu.trove.list.array.TLongArrayList;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.barrage.flatbuf.BarrageMessageType;
import io.deephaven.barrage.flatbuf.BarrageMessageWrapper;
import io.deephaven.barrage.flatbuf.BarrageSnapshotRequest;
import io.deephaven.barrage.flatbuf.BarrageSubscriptionRequest;
import io.deephaven.chunk.ChunkType;
Expand Down Expand Up @@ -49,6 +50,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayDeque;
import java.util.BitSet;
import java.util.Iterator;
Expand Down Expand Up @@ -296,6 +298,8 @@ public interface Factory {

private boolean isClosed = false;

private boolean isFirstMsg = true;

private final TicketRouter ticketRouter;
private final BarrageMessageProducer.Operation.Factory<BarrageStreamGenerator.View> operationFactory;
private final BarrageMessageProducer.Adapter<BarrageSubscriptionRequest, BarrageSubscriptionOptions> optionsAdapter;
Expand Down Expand Up @@ -336,14 +340,19 @@ public void onNext(final InputStream request) {
GrpcUtil.rpcWrapper(log, listener, () -> {
BarrageProtoUtil.MessageInfo message = BarrageProtoUtil.parseProtoMessage(request);
synchronized (this) {
if (message.app_metadata == null
|| message.app_metadata.magic() != BarrageUtil.FLATBUFFER_MAGIC) {
log.warn().append(myPrefix).append("received a message without app_metadata").endl();

// `FlightData` messages from Barrage clients will provide app_metadata describing the request but
// official Flight implementations may force a NULL metadata field in the first message. In that
// case, identify a valid Barrage connection by verifying the `FlightDescriptor.CMD` field contains
// the `Barrage` magic bytes

if (requestHandler != null) {
// rely on the handler to verify message type
requestHandler.handleMessage(message);
return;
}

// handle the different message types that can come over DoExchange
if (requestHandler == null) {
if (message.app_metadata != null) {
// handle the different message types that can come over DoExchange
switch (message.app_metadata.msgType()) {
case BarrageMessageType.BarrageSubscriptionRequest:
Expand All @@ -356,9 +365,37 @@ public void onNext(final InputStream request) {
throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT,
myPrefix + "received a message with unhandled BarrageMessageType");
}
requestHandler.handleMessage(message);
return;
}

// handle the possible error cases
if (!isFirstMsg) {
// only the first messages is allowed to have null metadata
throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT,
myPrefix + "failed to receive Barrage request metadata");
}

isFirstMsg = false;

// The magic value is '0x6E687064'. It is the numerical representation of the ASCII "dphn".
int size = message.descriptor.getCmd().size();
if (size == 4) {
ByteBuffer bb = message.descriptor.getCmd().asReadOnlyByteBuffer();

// set the order to little-endian (FlatBuffers default)
bb.order(ByteOrder.LITTLE_ENDIAN);

// read and compare the value to the "magic" bytes
long value = (long) bb.getInt(0) & 0xFFFFFFFFL;
if (value != BarrageUtil.FLATBUFFER_MAGIC) {
throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT,
myPrefix + "expected BarrageMessageWrapper magic bytes in FlightDescriptor.cmd");
}
} else {
throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT,
myPrefix + "expected BarrageMessageWrapper magic bytes in FlightDescriptor.cmd");
}
// rely on the handler to verify message type
requestHandler.handleMessage(message);
}
});
}
Expand Down
Loading

0 comments on commit e57e24b

Please sign in to comment.