Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] Dataframe AST POC #2102

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/main/java/net/snowflake/client/core/QueryExecDTO.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
public class QueryExecDTO {
private String sqlText;

private String dataframeAst;

@Deprecated private Integer sequenceId;

private Map<String, ParameterBindingDTO> bindings;
Expand Down Expand Up @@ -67,6 +69,14 @@ public void setSqlText(String sqlText) {
this.sqlText = sqlText;
}

public String getDataframeAst() {
return this.dataframeAst;
}

public void setDataframeAst(String dataframeAst) {
this.dataframeAst = dataframeAst;
}

@Deprecated
public Integer getSequenceId() {
return sequenceId;
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/net/snowflake/client/core/SFBaseStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,27 @@ public abstract SFBaseResultSet execute(
ExecTimeTelemetryData execTimeData)
throws SQLException, SFException;

/**
* Executes the given SQL string.
*
* @param sql The SQL string to execute, synchronously.
* @param dataframeAst ...
* @param parametersBinding parameters to bind
* @param caller the JDBC interface method that called this method, if any
* @param execTimeData OOB telemetry object to record timings
* @return whether there is result set or not
* @throws SQLException if failed to execute sql
* @throws SFException exception raised from Snowflake components
* @throws SQLException if SQL error occurs
*/
public abstract SFBaseResultSet execute(
String sql,
String dataframeAst,
Map<String, ParameterBindingDTO> parametersBinding,
CallingMethod caller,
ExecTimeTelemetryData execTimeData)
throws SQLException, SFException;

/**
* Execute sql asynchronously. Note that at a minimum, this does not have to be supported; if
* executeAsyncQuery() is called from SnowflakeStatement and the SFConnectionHandler's
Expand Down
32 changes: 28 additions & 4 deletions src/main/java/net/snowflake/client/core/SFStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,21 @@ private void sanityCheckQuery(String sql) throws SQLException {
}
}



/**
* Execute SQL query with an option for describe only
*
* @param sql sql statement
* @param dataframeAst ...
* @param describeOnly true if describe only
* @return query result set
* @throws SQLException if connection is already closed
* @throws SFException if result set is null
*/
private SFBaseResultSet executeQuery(
String sql,
String dataframeAst,
Map<String, ParameterBindingDTO> parametersBinding,
boolean describeOnly,
boolean asyncExec,
Expand Down Expand Up @@ -148,6 +152,7 @@ private SFBaseResultSet executeQuery(
// NOTE: It is intentional two describeOnly parameters are specified.
return executeQueryInternal(
sql,
dataframeAst,
parametersBinding,
describeOnly,
describeOnly, // internal query if describeOnly is true
Expand All @@ -167,7 +172,7 @@ private SFBaseResultSet executeQuery(
@Override
public SFPreparedStatementMetaData describe(String sql) throws SFException, SQLException {
SFBaseResultSet baseResultSet =
executeQuery(sql, null, true, false, null, new ExecTimeTelemetryData());
executeQuery(sql, null, null, true, false, null, new ExecTimeTelemetryData());

describeJobUUID = baseResultSet.getQueryId();

Expand Down Expand Up @@ -196,6 +201,7 @@ public SFPreparedStatementMetaData describe(String sql) throws SFException, SQLE
*/
SFBaseResultSet executeQueryInternal(
String sql,
String dataframeAst,
Map<String, ParameterBindingDTO> parameterBindings,
boolean describeOnly,
boolean internal,
Expand All @@ -214,6 +220,7 @@ SFBaseResultSet executeQueryInternal(
Object result =
executeHelper(
sql,
dataframeAst,
StmtUtil.SF_MEDIA_TYPE,
parameterBindings,
describeOnly,
Expand Down Expand Up @@ -326,6 +333,7 @@ public Void call() throws SQLException {
*/
public Object executeHelper(
String sql,
String dataframeAst,
String mediaType,
Map<String, ParameterBindingDTO> bindValues,
boolean describeOnly,
Expand Down Expand Up @@ -403,6 +411,7 @@ public Object executeHelper(
StmtUtil.StmtInput stmtInput = new StmtUtil.StmtInput();
stmtInput
.setSql(sql)
.setDataframeAst(dataframeAst)
.setMediaType(mediaType)
.setInternal(internal)
.setDescribeOnly(describeOnly)
Expand Down Expand Up @@ -698,7 +707,18 @@ public SFBaseResultSet execute(
CallingMethod caller,
ExecTimeTelemetryData execTimeData)
throws SQLException, SFException {
return execute(sql, false, parametersBinding, caller, execTimeData);
return execute(sql, null, false, parametersBinding, caller, execTimeData);
}

@Override
public SFBaseResultSet execute(
String sql,
String dataframeAst,
Map<String, ParameterBindingDTO> parametersBinding,
CallingMethod caller,
ExecTimeTelemetryData execTimeData)
throws SQLException, SFException {
return execute(sql, dataframeAst, false, parametersBinding, caller, execTimeData);
}

/**
Expand Down Expand Up @@ -747,6 +767,7 @@ private void cancelHelper(String sql, String mediaType, CancellationReason cance
* Execute sql
*
* @param sql sql statement.
* @param dataframeAst ...
* @param asyncExec is async exec
* @param parametersBinding parameters to bind
* @param caller the JDBC interface method that called this method, if any
Expand All @@ -758,12 +779,15 @@ private void cancelHelper(String sql, String mediaType, CancellationReason cance
*/
public SFBaseResultSet execute(
String sql,
String dataframeAst,
boolean asyncExec,
Map<String, ParameterBindingDTO> parametersBinding,
CallingMethod caller,
ExecTimeTelemetryData execTimeData)
throws SQLException, SFException {
TelemetryService.getInstance().updateContext(session.getSnowflakeConnectionString());

// todo: if (dataframeAst == null)
sanityCheckQuery(sql);

session.injectedDelay();
Expand All @@ -780,7 +804,7 @@ public SFBaseResultSet execute(
executeSetProperty(sql);
return null;
}
return executeQuery(sql, parametersBinding, false, asyncExec, caller, execTimeData);
return executeQuery(sql, dataframeAst, parametersBinding, false, asyncExec, caller, execTimeData);
}

private SFBaseResultSet executeFileTransfer(String sql) throws SQLException, SFException {
Expand Down Expand Up @@ -956,6 +980,6 @@ public SFBaseResultSet asyncExecute(
CallingMethod caller,
ExecTimeTelemetryData execTimeData)
throws SQLException, SFException {
return execute(sql, true, parametersBinding, caller, execTimeData);
return execute(sql, null, true, parametersBinding, caller, execTimeData);
}
}
6 changes: 6 additions & 0 deletions src/main/java/net/snowflake/client/core/StmtUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class StmtUtil {
/** Input for executing a statement on server */
static class StmtInput {
String sql;
String dataframeAst = null;

// default to snowflake (a special json format for snowflake query result
String mediaType = SF_MEDIA_TYPE;
Expand Down Expand Up @@ -103,6 +104,11 @@ public StmtInput setSql(String sql) {
return this;
}

public StmtInput setDataframeAst(String dataframeAst) {
this.dataframeAst = dataframeAst;
return this;
}

public StmtInput setMediaType(String mediaType) {
this.mediaType = mediaType;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,7 @@ private static JsonNode parseCommandInGS(SFStatement statement, String command)
result =
statement.executeHelper(
command,
null,
"application/json",
null, // bindValues
false, // describeOnly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public ResultSet executeQuery() throws SQLException {
} else {
logger.trace("executeQuery()", false);
}
ResultSet rs = executeQueryInternal(sql, false, parameterBindings, execTimeData);
ResultSet rs = executeQueryInternal(sql, null, false, parameterBindings, execTimeData);
execTimeData.setQueryEnd();
execTimeData.generateTelemetry();
logger.debug("Query completed. {}", execTimeData.getLogString());
Expand All @@ -190,7 +190,7 @@ public ResultSet executeAsyncQuery() throws SQLException {
} else {
logger.trace("executeAsyncQuery()", false);
}
ResultSet rs = executeQueryInternal(sql, true, parameterBindings, execTimeData);
ResultSet rs = executeQueryInternal(sql, null, true, parameterBindings, execTimeData);
execTimeData.setQueryEnd();
execTimeData.generateTelemetry();
logger.debug("Query completed. {}", execTimeData.getLogString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public ResultSet executeQuery(String sql) throws SQLException {
new ExecTimeTelemetryData("ResultSet Statement.executeQuery(String)", this.batchID);

raiseSQLExceptionIfStatementIsClosed();
ResultSet rs = executeQueryInternal(sql, false, null, execTimeData);
ResultSet rs = executeQueryInternal(sql, null, false, null, execTimeData);
execTimeData.setQueryEnd();
execTimeData.generateTelemetry();
logger.debug("Query completed. {}", execTimeData.getLogString());
Expand All @@ -168,13 +168,26 @@ public ResultSet executeAsyncQuery(String sql) throws SQLException {
ExecTimeTelemetryData execTimeData =
new ExecTimeTelemetryData("ResultSet Statement.executeAsyncQuery(String)", this.batchID);
raiseSQLExceptionIfStatementIsClosed();
ResultSet rs = executeQueryInternal(sql, true, null, execTimeData);
ResultSet rs = executeQueryInternal(sql, null, true, null, execTimeData);
execTimeData.setQueryEnd();
execTimeData.generateTelemetry();
logger.debug("Query completed. {}", queryID, execTimeData.getLogString());
return rs;
}

// todo: add doc
public ResultSet executeDataframeAst(String dataframeAst) throws SQLException {
ExecTimeTelemetryData execTimeData =
new ExecTimeTelemetryData("ResultSet Statement.executeQuery(String)", this.batchID);

raiseSQLExceptionIfStatementIsClosed();
ResultSet rs = executeQueryInternal("", dataframeAst, false, null, execTimeData);
execTimeData.setQueryEnd();
execTimeData.generateTelemetry();
logger.debug("Query completed. {}", execTimeData.getLogString());
return rs;
}

@Override
public void resultSetMetadataHandler(SFBaseResultSet resultSet) throws SQLException {
// No-Op.
Expand Down Expand Up @@ -278,6 +291,7 @@ private void setQueryIdWhenValidOrNull(String queryId) {
*/
ResultSet executeQueryInternal(
String sql,
String dataframeAst,
boolean asyncExec,
Map<String, ParameterBindingDTO> parameterBindings,
ExecTimeTelemetryData execTimeData)
Expand Down
20 changes: 20 additions & 0 deletions src/test/java/net/snowflake/client/jdbc/DataframeAstIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package net.snowflake.client.jdbc;

import net.snowflake.client.core.SFException;
import org.junit.jupiter.api.Test;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;

public class DataframeAstIT {

@Test
public void simpleEndToEndTest() throws SQLException {
Properties props = new Properties();
String url = "jdbc:sqlite:test.db";
try(SnowflakeConnectionV1 conn = new SnowflakeConnectionV1(url, props)) {
ResultSet result = ((SnowflakeStatementV1) conn.createStatement()).executeDataframeAst("dummy");
}
}
}
Loading