Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADH-5241]: Implement the trino-adb connector reading function #4

Merged
merged 5 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/docker/arenadata/coordinator/etc/log.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# Enable verbose logging from Trino
#io.trino=DEBUG
io.trino=DEBUG
2 changes: 1 addition & 1 deletion core/docker/arenadata/worker/etc/log.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# Enable verbose logging from Trino
#io.trino=DEBUG
io.trino=DEBUG
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,41 @@
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.airlift.units.MinDataSize;
import io.trino.plugin.adb.connector.protocol.TransferDataProtocol;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;

public class AdbPluginConfig
{
public static final String IDENTIFIER_QUOTE = "\"";
private AdbPluginConfig.ArrayMapping arrayMapping = AdbPluginConfig.ArrayMapping.DISABLED;
private int maxScanParallelism = 1;
private boolean includeSystemTables;
private DataSize writeBufferSize = DataSize.of(16L, DataSize.Unit.MEGABYTE);
private Integer fetchSize;
private DataSize writeBufferSize = DataSize.of(64L, DataSize.Unit.MEGABYTE);
private DataSize readBufferSize = DataSize.of(64L, DataSize.Unit.MEGABYTE);
private final TransferDataProtocol dataProtocol = TransferDataProtocol.GPFDIST;
private Duration gpfdistRetryTimeout;

public TransferDataProtocol getDataProtocol()
{
return dataProtocol;
}

public Integer getFetchSize()
{
return fetchSize;
}

@Config("adb.fetch-size")
public AdbPluginConfig setFetchSize(int fetchSize)
{
this.fetchSize = fetchSize;
return this;
}

@NotNull
public AdbPluginConfig.ArrayMapping getArrayMapping()
{
Expand Down Expand Up @@ -81,13 +98,41 @@ public DataSize getWriteBufferSize()
}

@Config("adb.connector.write-buffer-size")
@ConfigDescription("Maximum amount of memory that could be allocated per sink when executing write queries. Defaults to 16MB")
@ConfigDescription("Maximum amount of memory that could be allocated per sink when executing write queries. Defaults to 64MB")
public AdbPluginConfig setWriteBufferSize(DataSize writeBufferSize)
{
this.writeBufferSize = writeBufferSize;
return this;
}

@MinDataSize("1kB")
@NotNull
public DataSize getReadBufferSize()
{
return readBufferSize;
}

@Config("adb.connector.read-buffer-size")
@ConfigDescription("Maximum amount of memory that could be allocated per record cursor when executing read queries. Defaults to 64MB")
public AdbPluginConfig setReadBufferSize(DataSize readBufferSize)
{
this.readBufferSize = readBufferSize;
return this;
}

public Duration getGpfdistRetryTimeout()
{
return this.gpfdistRetryTimeout;
}

@Config("adb.gpfdist.retry-timeout")
@ConfigDescription("Value of adb gpfdist_retry_timeout property. Defaults to null (use adb defaults)")
public AdbPluginConfig setGpfdistRetryTimeout(Duration gpfdistRetryTimeout)
{
this.gpfdistRetryTimeout = gpfdistRetryTimeout;
return this;
}

public static enum ArrayMapping
{
DISABLED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.google.common.primitives.Shorts;
import com.google.common.primitives.SignedBytes;
import io.airlift.slice.Slice;
import io.trino.plugin.adb.connector.AdbSqlClient;
import io.trino.plugin.adb.connector.datatype.mapper.DataTypeMapper;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorSession;
Expand Down Expand Up @@ -131,7 +131,7 @@ public static Object[] getJdbcObjectArray(ConnectorSession session, Type element
return valuesArray;
}

public static String getArrayElementPgTypeName(ConnectorSession session, AdbSqlClient client, Type elementType)
public static String getArrayElementPgTypeName(ConnectorSession session, DataTypeMapper typeMapper, Type elementType)
{
if (DOUBLE.equals(elementType)) {
return "float8";
Expand Down Expand Up @@ -164,10 +164,10 @@ public static String getArrayElementPgTypeName(ConnectorSession session, AdbSqlC
}

if (elementType instanceof ArrayType) {
return getArrayElementPgTypeName(session, client, ((ArrayType) elementType).getElementType());
return getArrayElementPgTypeName(session, typeMapper, ((ArrayType) elementType).getElementType());
}

return client.toWriteMapping(session, elementType).getDataType();
return typeMapper.toWriteMapping(session, elementType).getDataType();
}

private static Object trinoNativeToJdbcObject(ConnectorSession session, Type trinoType, Object trinoNative)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.airlift.configuration.ConfigBinder;
import io.trino.plugin.adb.AdbPluginConfig;
import io.trino.plugin.adb.connector.encode.EncoderModule;
import io.trino.plugin.adb.connector.datatype.mapper.DataTypeMapper;
import io.trino.plugin.adb.connector.datatype.mapper.DataTypeMapperImpl;
import io.trino.plugin.adb.connector.encode.DataFormatModule;
import io.trino.plugin.adb.connector.metadata.AdbMetadataDao;
import io.trino.plugin.adb.connector.metadata.impl.AdbMetadataDaoImpl;
import io.trino.plugin.adb.connector.protocol.TransferDataProtocol;
import io.trino.plugin.adb.connector.protocol.gpfdist.GpfdistModule;
import io.trino.plugin.adb.connector.table.AdbCreateTableStorageConfig;
import io.trino.plugin.adb.connector.table.AdbTableProperties;
import io.trino.plugin.adb.connector.table.SplitSourceManager;
import io.trino.plugin.adb.connector.table.SplitSourceManagerImpl;
import io.trino.plugin.adb.connector.table.StatisticsManager;
import io.trino.plugin.adb.connector.table.StatisticsManagerImpl;
import io.trino.plugin.jdbc.DecimalModule;
import io.trino.plugin.jdbc.ForBaseJdbc;
import io.trino.plugin.jdbc.JdbcClient;
Expand All @@ -44,21 +50,26 @@ public class AdbClientModule
@Override
protected void setup(Binder binder)
{
install(new EncoderModule());
binder.bind(AdbMetadataDao.class).to(AdbMetadataDaoImpl.class).in(Scopes.SINGLETON);
binder.bind(DataTypeMapper.class).to(DataTypeMapperImpl.class).in(Scopes.SINGLETON);
binder.bind(StatisticsManager.class).to(StatisticsManagerImpl.class).in(Scopes.SINGLETON);
binder.bind(SplitSourceManager.class).to(SplitSourceManagerImpl.class).in(Scopes.SINGLETON);
binder.bind(JdbcClient.class).annotatedWith(ForBaseJdbc.class).to(AdbSqlClient.class).in(Scopes.SINGLETON);
ConfigBinder.configBinder(binder).bindConfig(AdbCreateTableStorageConfig.class);
ConfigBinder.configBinder(binder).bindConfig(JdbcStatisticsConfig.class);
JdbcModule.bindSessionPropertiesProvider(binder, AdbSessionProperties.class);
JdbcModule.bindTablePropertiesProvider(binder, AdbTableProperties.class);
OptionalBinder.newOptionalBinder(binder, QueryBuilder.class).setBinding().to(CollationAwareQueryBuilder.class).in(Scopes.SINGLETON);
this.install(new DecimalModule());
this.install(new JdbcJoinPushdownSupportModule());
this.install(new RemoteQueryCancellationModule());
Multibinder.newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(Query.class).in(Scopes.SINGLETON);
AdbPluginConfig pluginConfig = this.buildConfigObject(AdbPluginConfig.class);

install(new DataFormatModule());
install(new DecimalModule());
install(new JdbcJoinPushdownSupportModule());
install(new RemoteQueryCancellationModule());

if (pluginConfig.getDataProtocol() == TransferDataProtocol.GPFDIST) {
this.install(new GpfdistModule());
install(new GpfdistModule());
}
else {
throw new UnsupportedOperationException("Unsupported data protocol: " + pluginConfig.getDataProtocol());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.units.Duration;
import io.trino.plugin.adb.AdbPluginConfig;
import io.trino.plugin.base.session.PropertyMetadataUtil;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.session.PropertyMetadata;

import java.util.List;
import java.util.Optional;

public class AdbSessionProperties
implements SessionPropertiesProvider
Expand All @@ -33,7 +36,9 @@ public AdbSessionProperties(AdbPluginConfig config)
this.sessionProperties = ImmutableList.of(
PropertyMetadata.enumProperty("array_mapping", "Handling of PostgreSql arrays", AdbPluginConfig.ArrayMapping.class, config.getArrayMapping(), false),
PropertyMetadata.integerProperty(
"max_scan_parallelism", "Maximum degree of parallelism when scanning tables. Defaults to 1.", config.getMaxScanParallelism(), false));
"max_scan_parallelism", "Maximum degree of parallelism when scanning tables. Defaults to 1.", config.getMaxScanParallelism(), false),
PropertyMetadataUtil.durationProperty(
"gpfdist_retry_timeout", "Value of adb gpfdist_retry_timeout property", config.getGpfdistRetryTimeout(), false));
}

@Override
Expand All @@ -56,4 +61,9 @@ public static int getMaxScanParallelism(ConnectorSession session)
{
return session.getProperty("max_scan_parallelism", Integer.class);
}

public static Optional<Duration> getGpfdistRetryTimeout(ConnectorSession session)
{
return Optional.ofNullable(session.getProperty("gpfdist_retry_timeout", Duration.class));
}
}
Loading
Loading