-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PHOENIX-6698 hive-connector will take long time to generate splits fo… #79
base: master
Are you sure you want to change the base?
Conversation
…r large phoenix tables.
💔 -1 overall
This message was automatically generated. |
…r large phoenix tables.
💔 -1 overall
This message was automatically generated. |
…r large phoenix tables.
💔 -1 overall
This message was automatically generated. |
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
Outdated
Show resolved
Hide resolved
|| | ||
(qplan.getScans().size() < parallelThreshould) | ||
) { | ||
LOG.info("generate splits in serial"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it clear that Generate Input Splits in serial.
LOG.info("generate splits in serial"); | ||
for (final List<Scan> scans : qplan.getScans()) { | ||
psplits.addAll( | ||
generateSplitsInternal( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reduce the number of lines used to call the method.
); | ||
} | ||
} else { | ||
final int parallism = jobConf.getInt( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whats the difference between this parallelism level config and parallel threshold.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parallelism level config is used to control the worker threads count for parallel split method, parallel threshold is used to control which split-generation method is used, serial or parallel.
final Path[] tablePaths) throws IOException { | ||
|
||
final List<InputSplit> psplits = new ArrayList<>(scans.size()); | ||
try (org.apache.hadoop.hbase.client.Connection connection = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The connection creation can be shared and reuse when generating the inputsplit.
"[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks() | ||
+ ", " + scans.get(0).getBatch() + "] and regionLocation : " + | ||
regionLocation); | ||
LOG.debug("Scan count[" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format properly.
psplits.addAll(task.get()); | ||
} | ||
} catch (ExecutionException | InterruptedException exception) { | ||
throw new IOException("failed to get splits,reason:", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log message can be improved.
ConnectionFactory.createConnection( | ||
PhoenixConnectionUtil.getConfiguration(jobConf))) { | ||
RegionLocator regionLocator = | ||
connection.getRegionLocator(TableName.valueOf( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Region locator also can be shared for each call.
InputSplit[] inputSplitsSerial = inputFormat.getSplits(jobConf,SPLITS); | ||
end = System.currentTimeMillis(); | ||
long durationInSerial=end - start; | ||
System.out.println(String.format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to use assertions and no use in printing those in the logs.
private static void buildPreparedSqlWithBinarySplits( | ||
StringBuffer sb, | ||
int splits) | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code formatting required.
…r large phoenix tables.
…r large phoenix tables.
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
…r large phoenix tables.
💔 -1 overall
This message was automatically generated. |
…r large phoenix tables.
…r large phoenix tables.
💔 -1 overall
This message was automatically generated. |
…r large phoenix tables.
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
…r large phoenix tables.
💔 -1 overall
This message was automatically generated. |
…r large phoenix tables.
…r large phoenix tables.
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
throw new IOException("Failed to Generate Input Splits in Parallel, reason:", | ||
exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to unwrap the ExecutionException and throw back the real exception. It may already be an IOException which you can throw with a cast, rather than rewrapping in another IOException.
PhoenixStorageHandlerConstants | ||
.DEFAULT_PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT); | ||
ExecutorService executorService = Executors.newFixedThreadPool(parallism); | ||
LOG.info("Generate Input Splits in Parallel with {} threads", parallism); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOG.info("Generate Input Splits in Parallel with {} threads", parallism); | |
LOG.info("Generating Input Splits in Parallel with {} threads", parallism); |
qplan.getTableRef().getTable().getPhysicalName().toString())); | ||
final int scanSize = qplan.getScans().size(); | ||
if (useParallelInputGeneration(parallelThreshold, scanSize)) { | ||
final int parallism = jobConf.getInt( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final int parallism = jobConf.getInt( | |
final int parallelism = jobConf.getInt( |
String.valueOf(durationInParallel))); | ||
|
||
// Test if performance of parallel method is better than serial method | ||
Assert.assertTrue(durationInParallel < durationInSerial); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will result in flaky tests as the environments which will run this test are guaranteed to not be deterministic. Unit tests should be about functional correctness, not performance.
*/ | ||
@NotThreadSafe | ||
@Category(ParallelStatsDisabledTest.class) | ||
public class HivePhoenixInputFormatTest extends ParallelStatsDisabledIT { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do the existing Phoenix-hive tests activate your new property and implicitly validate that it is functional? I think we have some test classes but do we create multi-region Phoenix tables in those tests (or those with enough data to have multiple guideposts)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q:Do the existing Phoenix-hive tests activate your new property and implicitly validate that it is functional?
A:yes, will implicitly validate its functionality using default settings of new properties.
Q:I think we have some test classes but do we create multi-region Phoenix tables in those tests (or those with enough data to have multiple guideposts)?
A:have discussed with chrajeshbabu and update Test Class based on ParallelStatsEnabledIT instead of ParallelStatsDisabledIT, and create tables with multi-regions and multiple guideposts .
…r large phoenix tables.
This patch enables PhoenixInputFormat to generate splits in parallel, it introduce two parameters to control the degree of parallelism.
1.'hive.phoenix.split.parallel.threshold' is used to contrl if split should be generated in parallel.it will generate splits in serial for following condition:
(1) hive.phoenix.split.parallel.threshold<0, it will generate split in serial.
(2) number of scans in query plan is less than the value setting.
in other conditions, it will generate split in parallel.
2. hive.phoenix.split.parallel.level
is used to control the number of work threads for the splits.(2*cpu cores by default).
A unit test is created for unit test, the test case will compare the time cost of generating split for phoenix table with 128 regions.
the output shows that: parallel method is 6x faster than serial method, and it will be better for tables with more regions
in production environment, we have tested the time cost for table with 2048 regions, it reduces time cost from nearly 30 mins to 2 mins with default configuration.