-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ADH-5241]: Implement the trino-adb connector reading function #4
Conversation
VitekArkhipov
commented
Nov 26, 2024
- implemented trino-adb plugin (for reading)
- done refactoring
@@ -131,7 +131,7 @@ public static Object[] getJdbcObjectArray(ConnectorSession session, Type element | |||
return valuesArray; | |||
} | |||
|
|||
public static String getArrayElementPgTypeName(ConnectorSession session, AdbSqlClient client, Type elementType) | |||
public static String getArrayElementPgTypeName(ConnectorSession session, DataTypeMapper client, Type elementType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: DataTypeMapper typeMapper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -44,8 +50,11 @@ public class AdbClientModule | |||
@Override | |||
protected void setup(Binder binder) | |||
{ | |||
install(new EncoderModule()); | |||
install(new DataFormatModule()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this.
before install method calls in this method for uniformity and use empty lines between logical blocks like in JdbcModule
for readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
private final AdbMetadataDao metadata; | ||
private final List<String> tableTypes; | ||
private final Optional<Integer> fetchSize; | ||
private final Type jsonType; | ||
private final Type uuidType; | ||
private final boolean statisticsEnabled; | ||
private final AdbPluginConfig pluginConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this field is not used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
private static final int PRECISION_OF_UNSPECIFIED_DECIMAL = 0; | ||
private static final int ARRAY_RESULT_SET_VALUE_COLUMN = 2; | ||
private static final String IDENTIFIER_QUOTE = "\""; | ||
public static final String COLUMN_TYPE_NOT_SUPPORTED_ERROR_MSG_TEMPLATE = "Column type %s is not supported"; | ||
private final AdbMetadataDao metadata; | ||
private final List<String> tableTypes; | ||
private final Optional<Integer> fetchSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the value of this field is always Optional.empty()
, do we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
this.fetchSize = Optional.empty(); | ||
this.jsonType = typeManager.getType(new TypeSignature("json")); | ||
this.uuidType = typeManager.getType(new TypeSignature("uuid")); | ||
statisticsEnabled = statisticsConfig.isEnabled(); | ||
this.pluginConfig = config; | ||
connectorExpressionRewriter = JdbcConnectorExpressionRewriterBuilder.newBuilder() | ||
.add(new RewriteVariable(this::quoted)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can replace adding of this and other standard expression rules with addStandardRules(this::quoted)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, fixed
|
||
import java.util.Optional; | ||
|
||
public interface ContextManager<T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the type parameter be bounded, like ContextManager<T extends Context>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
try { | ||
try (InputStreamReader streamReader = new InputStreamReader(dataStream, Charset.forName(dataFormatConfig.getEncoding())); | ||
BufferedReader bufferedStreamReader = new BufferedReader(streamReader)) { | ||
CSVReader csvReader = new CSVReaderBuilder(bufferedStreamReader) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we close CSVReader
in case of normal execution completion? if yes, consider moving it to try
block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
ConnectorRow element; | ||
lock.lock(); | ||
try { | ||
element = rowsQueue.poll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can rowsQueue.poll()
return null
element here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now yes
} | ||
while (queryExecutionException == null && (isDataTransferNotInitialized() || isDataNotProcessed())) { | ||
try { | ||
Thread.sleep(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to replace sleep
with Future-based notification of initialization completion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored
return segmentId; | ||
} | ||
|
||
public ProcessingDataResult getProcessedDataResult() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need getSegmentId
and getProcessedDataResult
methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed some comments