Skip to content

Commit

Permalink
Merge pull request #163 from oracle/160-lob-1gb
Browse files Browse the repository at this point in the history
Set Default LOB Prefetch to 1GB
  • Loading branch information
jeandelavarene authored Dec 9, 2024
2 parents dbb4daa + ce7f5d9 commit 2db2fae
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 11 deletions.
14 changes: 5 additions & 9 deletions src/main/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -650,15 +648,13 @@ private static void configureJdbcDefaults(OracleDataSource oracleDataSource) {
setPropertyIfAbsent(oracleDataSource,
OracleConnection.CONNECTION_PROPERTY_IMPLICIT_STATEMENT_CACHE_SIZE, "25");

// Prefetch LOB values by default. The database's maximum supported
// prefetch size, 1GB, is configured by default. This is done so that
// Row.get(...) can map LOB values into ByteBuffer/String without a
// blocking database call. If the entire value is prefetched, then JDBC
// won't need to fetch the remainder from the database when the entire is
// value requested as a ByteBuffer or String.
// Prefetch LOB values by default. This allows Row.get(...) to map most LOB
// values into ByteBuffer/String without a blocking database call to fetch
// the remaining bytes. 1GB is configured by default, as this is close to
// the maximum allowed by the Autonomous Database service.
setPropertyIfAbsent(oracleDataSource,
OracleConnection.CONNECTION_PROPERTY_DEFAULT_LOB_PREFETCH_SIZE,
"1048576");
"1000000000");

// TODO: Disable the result set cache? This is needed to support the
// SERIALIZABLE isolation level, which requires result set caching to be
Expand Down
135 changes: 134 additions & 1 deletion src/test/java/oracle/r2dbc/impl/OracleLargeObjectsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,33 @@
import io.r2dbc.spi.Blob;
import io.r2dbc.spi.Clob;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.Parameters;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.Statement;
import oracle.r2dbc.OracleR2dbcObject;
import oracle.r2dbc.OracleR2dbcTypes;
import oracle.r2dbc.test.DatabaseConfig;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static io.r2dbc.spi.ConnectionFactoryOptions.HOST;
import static io.r2dbc.spi.ConnectionFactoryOptions.PORT;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.util.Arrays.asList;
import static oracle.r2dbc.test.DatabaseConfig.connectTimeout;
import static oracle.r2dbc.test.DatabaseConfig.sharedConnection;
Expand Down Expand Up @@ -540,7 +548,7 @@ public Publisher<Void> discard() {

class TestClob implements Clob {
final CharBuffer clobData =
CharBuffer.wrap(new String(data, StandardCharsets.US_ASCII));
CharBuffer.wrap(new String(data, US_ASCII));

boolean isDiscarded = false;

Expand Down Expand Up @@ -640,4 +648,129 @@ public void testNullLob() {
}
}

/**
* Verifies that the default LOB prefetch size is at least large enough to
* fully prefetch 1MB of data.
*/
@Test
public void testDefaultLobPrefetch() throws Exception {
Assumptions.assumeTrue(
null == DatabaseConfig.protocol(), "Test requires TCP protocol");

// A local server will monitor network I/O
try (ServerSocketChannel localServer = ServerSocketChannel.open()) {
localServer.configureBlocking(true);
localServer.bind(null);

class TestThread extends Thread {

/** Count of bytes exchanged between JDBC and the database */
int ioCount = 0;

@Override
public void run() {
InetSocketAddress databaseAddress =
new InetSocketAddress(DatabaseConfig.host(), DatabaseConfig.port());

try (
SocketChannel jdbcChannel = localServer.accept();
SocketChannel databaseChannel =
SocketChannel.open(databaseAddress)){

jdbcChannel.configureBlocking(false);
databaseChannel.configureBlocking(false);

ByteBuffer byteBuffer = ByteBuffer.allocateDirect(8192);
while (true) {

byteBuffer.clear();
if (-1 == jdbcChannel.read(byteBuffer))
break;
byteBuffer.flip();
ioCount += byteBuffer.remaining();

while (byteBuffer.hasRemaining())
databaseChannel.write(byteBuffer);

byteBuffer.clear();
databaseChannel.read(byteBuffer);
byteBuffer.flip();
ioCount += byteBuffer.remaining();

while (byteBuffer.hasRemaining())
jdbcChannel.write(byteBuffer);
}
}
catch (Exception exception) {
exception.printStackTrace();
}
}
}

TestThread testThread = new TestThread();
testThread.start();


int lobSize = 99 + (1024 * 1024); // <-- 99 + 1MB
Connection connection = awaitOne(ConnectionFactories.get(
DatabaseConfig.connectionFactoryOptions()
.mutate()
.option(HOST, "localhost")
.option(PORT,
((InetSocketAddress)localServer.getLocalAddress()).getPort())
.build())
.create());
try {
awaitExecution(connection.createStatement(
"CREATE TABLE testLobPrefetch ("
+ " id NUMBER GENERATED ALWAYS AS IDENTITY,"
+ " blobValue BLOB,"
+ " clobValue CLOB,"
+ " PRIMARY KEY(id))"));

// Insert two rows of LOBs larger than 1MB
byte[] bytes = getBytes(lobSize);
ByteBuffer blobValue = ByteBuffer.wrap(bytes);
String clobValue = new String(bytes, US_ASCII);
awaitUpdate(List.of(1,1), connection.createStatement(
"INSERT INTO testLobPrefetch (blobValue, clobValue)"
+ " VALUES (:blobValue, :clobValue)")
.bind("blobValue", blobValue)
.bind("clobValue", clobValue)
.add()
.bind("blobValue", blobValue)
.bind("clobValue", clobValue));

// Query two rows of LOBs larger than 1MB
awaitQuery(
List.of(
List.of(blobValue, clobValue),
List.of(blobValue, clobValue)),
row -> {
try {
// Expect no I/O to result from mapping a fully prefetched BLOB or
// CLOB:
int ioCount = testThread.ioCount;
var result = List.of(row.get("blobValue"), row.get("clobValue"));
assertEquals(ioCount, testThread.ioCount);
return result;
}
catch (Exception exception) {
throw new RuntimeException(exception);
}
},
connection.createStatement(
"SELECT blobValue, clobValue FROM testLobPrefetch ORDER BY id")
.fetchSize(1));
}
finally {
tryAwaitExecution(connection.createStatement(
"DROP TABLE testLobPrefetch"));
tryAwaitNone(connection.close());
testThread.join(10_000);
testThread.interrupt();
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ private static Properties getJdbcDefaultProperties() throws SQLException {
OracleConnection.CONNECTION_PROPERTY_IMPLICIT_STATEMENT_CACHE_SIZE, "25");
defaultProperties.setProperty(
OracleConnection.CONNECTION_PROPERTY_DEFAULT_LOB_PREFETCH_SIZE,
"1048576");
"1000000000");
defaultProperties.setProperty(
OracleConnection.CONNECTION_PROPERTY_THIN_NET_USE_ZERO_COPY_IO,
"false");
Expand Down

0 comments on commit 2db2fae

Please sign in to comment.