Skip to content

Commit

Permalink
poc v2
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bli committed Feb 28, 2025
1 parent 90c82b8 commit 70112d0
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 8 deletions.
10 changes: 10 additions & 0 deletions src/main/java/net/snowflake/client/core/QueryExecDTO.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
public class QueryExecDTO {
private String sqlText;

private String dataframeAst;

@Deprecated private Integer sequenceId;

private Map<String, ParameterBindingDTO> bindings;
Expand Down Expand Up @@ -67,6 +69,14 @@ public void setSqlText(String sqlText) {
this.sqlText = sqlText;
}

public String getDataframeAst() {
return this.dataframeAst;
}

public void setDataframeAst(String dataframeAst) {
this.dataframeAst = dataframeAst;
}

@Deprecated
public Integer getSequenceId() {
return sequenceId;
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/net/snowflake/client/core/SFBaseStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,27 @@ public abstract SFBaseResultSet execute(
ExecTimeTelemetryData execTimeData)
throws SQLException, SFException;

/**
* Executes the given SQL string.
*
* @param sql The SQL string to execute, synchronously.
* @param dataframeAst ...
* @param parametersBinding parameters to bind
* @param caller the JDBC interface method that called this method, if any
* @param execTimeData OOB telemetry object to record timings
* @return whether there is result set or not
* @throws SQLException if failed to execute sql
* @throws SFException exception raised from Snowflake components
* @throws SQLException if SQL error occurs
*/
public abstract SFBaseResultSet execute(
String sql,
String dataframeAst,
Map<String, ParameterBindingDTO> parametersBinding,
CallingMethod caller,
ExecTimeTelemetryData execTimeData)
throws SQLException, SFException;

/**
* Execute sql asynchronously. Note that at a minimum, this does not have to be supported; if
* executeAsyncQuery() is called from SnowflakeStatement and the SFConnectionHandler's
Expand Down
32 changes: 28 additions & 4 deletions src/main/java/net/snowflake/client/core/SFStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,21 @@ private void sanityCheckQuery(String sql) throws SQLException {
}
}



/**
* Execute SQL query with an option for describe only
*
* @param sql sql statement
* @param dataframeAst ...
* @param describeOnly true if describe only
* @return query result set
* @throws SQLException if connection is already closed
* @throws SFException if result set is null
*/
private SFBaseResultSet executeQuery(
String sql,
String dataframeAst,
Map<String, ParameterBindingDTO> parametersBinding,
boolean describeOnly,
boolean asyncExec,
Expand Down Expand Up @@ -148,6 +152,7 @@ private SFBaseResultSet executeQuery(
// NOTE: It is intentional two describeOnly parameters are specified.
return executeQueryInternal(
sql,
dataframeAst,
parametersBinding,
describeOnly,
describeOnly, // internal query if describeOnly is true
Expand All @@ -167,7 +172,7 @@ private SFBaseResultSet executeQuery(
@Override
public SFPreparedStatementMetaData describe(String sql) throws SFException, SQLException {
SFBaseResultSet baseResultSet =
executeQuery(sql, null, true, false, null, new ExecTimeTelemetryData());
executeQuery(sql, null, null, true, false, null, new ExecTimeTelemetryData());

describeJobUUID = baseResultSet.getQueryId();

Expand Down Expand Up @@ -196,6 +201,7 @@ public SFPreparedStatementMetaData describe(String sql) throws SFException, SQLE
*/
SFBaseResultSet executeQueryInternal(
String sql,
String dataframeAst,
Map<String, ParameterBindingDTO> parameterBindings,
boolean describeOnly,
boolean internal,
Expand All @@ -214,6 +220,7 @@ SFBaseResultSet executeQueryInternal(
Object result =
executeHelper(
sql,
dataframeAst,
StmtUtil.SF_MEDIA_TYPE,
parameterBindings,
describeOnly,
Expand Down Expand Up @@ -326,6 +333,7 @@ public Void call() throws SQLException {
*/
public Object executeHelper(
String sql,
String dataframeAst,
String mediaType,
Map<String, ParameterBindingDTO> bindValues,
boolean describeOnly,
Expand Down Expand Up @@ -403,6 +411,7 @@ public Object executeHelper(
StmtUtil.StmtInput stmtInput = new StmtUtil.StmtInput();
stmtInput
.setSql(sql)
.setDataframeAst(dataframeAst)
.setMediaType(mediaType)
.setInternal(internal)
.setDescribeOnly(describeOnly)
Expand Down Expand Up @@ -698,7 +707,18 @@ public SFBaseResultSet execute(
CallingMethod caller,
ExecTimeTelemetryData execTimeData)
throws SQLException, SFException {
return execute(sql, false, parametersBinding, caller, execTimeData);
return execute(sql, null, false, parametersBinding, caller, execTimeData);
}

@Override
public SFBaseResultSet execute(
String sql,
String dataframeAst,
Map<String, ParameterBindingDTO> parametersBinding,
CallingMethod caller,
ExecTimeTelemetryData execTimeData)
throws SQLException, SFException {
return execute(sql, dataframeAst, false, parametersBinding, caller, execTimeData);
}

/**
Expand Down Expand Up @@ -747,6 +767,7 @@ private void cancelHelper(String sql, String mediaType, CancellationReason cance
* Execute sql
*
* @param sql sql statement.
* @param dataframeAst ...
* @param asyncExec is async exec
* @param parametersBinding parameters to bind
* @param caller the JDBC interface method that called this method, if any
Expand All @@ -758,12 +779,15 @@ private void cancelHelper(String sql, String mediaType, CancellationReason cance
*/
public SFBaseResultSet execute(
String sql,
String dataframeAst,
boolean asyncExec,
Map<String, ParameterBindingDTO> parametersBinding,
CallingMethod caller,
ExecTimeTelemetryData execTimeData)
throws SQLException, SFException {
TelemetryService.getInstance().updateContext(session.getSnowflakeConnectionString());

// todo: if (dataframeAst == null)
sanityCheckQuery(sql);

session.injectedDelay();
Expand All @@ -780,7 +804,7 @@ public SFBaseResultSet execute(
executeSetProperty(sql);
return null;
}
return executeQuery(sql, parametersBinding, false, asyncExec, caller, execTimeData);
return executeQuery(sql, dataframeAst, parametersBinding, false, asyncExec, caller, execTimeData);
}

private SFBaseResultSet executeFileTransfer(String sql) throws SQLException, SFException {
Expand Down Expand Up @@ -956,6 +980,6 @@ public SFBaseResultSet asyncExecute(
CallingMethod caller,
ExecTimeTelemetryData execTimeData)
throws SQLException, SFException {
return execute(sql, true, parametersBinding, caller, execTimeData);
return execute(sql, null, true, parametersBinding, caller, execTimeData);
}
}
6 changes: 6 additions & 0 deletions src/main/java/net/snowflake/client/core/StmtUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class StmtUtil {
/** Input for executing a statement on server */
static class StmtInput {
String sql;
String dataframeAst = null;

// default to snowflake (a special json format for snowflake query result
String mediaType = SF_MEDIA_TYPE;
Expand Down Expand Up @@ -103,6 +104,11 @@ public StmtInput setSql(String sql) {
return this;
}

public StmtInput setDataframeAst(String dataframeAst) {
this.dataframeAst = dataframeAst;
return this;
}

public StmtInput setMediaType(String mediaType) {
this.mediaType = mediaType;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,7 @@ private static JsonNode parseCommandInGS(SFStatement statement, String command)
result =
statement.executeHelper(
command,
null,
"application/json",
null, // bindValues
false, // describeOnly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public ResultSet executeQuery() throws SQLException {
} else {
logger.trace("executeQuery()", false);
}
ResultSet rs = executeQueryInternal(sql, false, parameterBindings, execTimeData);
ResultSet rs = executeQueryInternal(sql, null, false, parameterBindings, execTimeData);
execTimeData.setQueryEnd();
execTimeData.generateTelemetry();
logger.debug("Query completed. {}", execTimeData.getLogString());
Expand All @@ -190,7 +190,7 @@ public ResultSet executeAsyncQuery() throws SQLException {
} else {
logger.trace("executeAsyncQuery()", false);
}
ResultSet rs = executeQueryInternal(sql, true, parameterBindings, execTimeData);
ResultSet rs = executeQueryInternal(sql, null, true, parameterBindings, execTimeData);
execTimeData.setQueryEnd();
execTimeData.generateTelemetry();
logger.debug("Query completed. {}", execTimeData.getLogString());
Expand Down
18 changes: 16 additions & 2 deletions src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public ResultSet executeQuery(String sql) throws SQLException {
new ExecTimeTelemetryData("ResultSet Statement.executeQuery(String)", this.batchID);

raiseSQLExceptionIfStatementIsClosed();
ResultSet rs = executeQueryInternal(sql, false, null, execTimeData);
ResultSet rs = executeQueryInternal(sql, null, false, null, execTimeData);
execTimeData.setQueryEnd();
execTimeData.generateTelemetry();
logger.debug("Query completed. {}", execTimeData.getLogString());
Expand All @@ -168,13 +168,26 @@ public ResultSet executeAsyncQuery(String sql) throws SQLException {
ExecTimeTelemetryData execTimeData =
new ExecTimeTelemetryData("ResultSet Statement.executeAsyncQuery(String)", this.batchID);
raiseSQLExceptionIfStatementIsClosed();
ResultSet rs = executeQueryInternal(sql, true, null, execTimeData);
ResultSet rs = executeQueryInternal(sql, null, true, null, execTimeData);
execTimeData.setQueryEnd();
execTimeData.generateTelemetry();
logger.debug("Query completed. {}", queryID, execTimeData.getLogString());
return rs;
}

// todo: add doc
public ResultSet executeDataframeAst(String dataframeAst) throws SQLException {
ExecTimeTelemetryData execTimeData =
new ExecTimeTelemetryData("ResultSet Statement.executeQuery(String)", this.batchID);

raiseSQLExceptionIfStatementIsClosed();
ResultSet rs = executeQueryInternal("", dataframeAst, false, null, execTimeData);
execTimeData.setQueryEnd();
execTimeData.generateTelemetry();
logger.debug("Query completed. {}", execTimeData.getLogString());
return rs;
}

@Override
public void resultSetMetadataHandler(SFBaseResultSet resultSet) throws SQLException {
// No-Op.
Expand Down Expand Up @@ -278,6 +291,7 @@ private void setQueryIdWhenValidOrNull(String queryId) {
*/
ResultSet executeQueryInternal(
String sql,
String dataframeAst,
boolean asyncExec,
Map<String, ParameterBindingDTO> parameterBindings,
ExecTimeTelemetryData execTimeData)
Expand Down
20 changes: 20 additions & 0 deletions src/test/java/net/snowflake/client/jdbc/DataframeAstIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package net.snowflake.client.jdbc;

import net.snowflake.client.core.SFException;
import org.junit.jupiter.api.Test;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;

public class DataframeAstIT {

@Test
public void simpleEndToEndTest() throws SQLException {
Properties props = new Properties();
String url = "jdbc:sqlite:test.db";
try(SnowflakeConnectionV1 conn = new SnowflakeConnectionV1(url, props)) {
ResultSet result = ((SnowflakeStatementV1) conn.createStatement()).executeDataframeAst("dummy");
}
}
}

0 comments on commit 70112d0

Please sign in to comment.