-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BugFix] Fix transactional stream load with warehouse property not work rightly #56464
base: main
Are you sure you want to change the base?
[BugFix] Fix transactional stream load with warehouse property not work rightly #56464
Conversation
…rk rightly Signed-off-by: drake_wang <[email protected]>
} | ||
throw new StarRocksException("No compute node alive in warehouse: " + warehouseId); | ||
} | ||
|
||
public Long seqChooseBackendOrComputeId() throws StarRocksException { | ||
List<Long> backendIds = seqChooseBackendIds(1, true, false, null); | ||
if (CollectionUtils.isNotEmpty(backendIds)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
A null pointer exception could be thrown if GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouse(warehouseId)
returns null.
You can modify the code like this:
public Long seqChooseComputeIdFromWarehouse(long warehouseId) throws StarRocksException {
Warehouse warehouse = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouse(warehouseId);
if (warehouse == null) {
throw new StarRocksException("Warehouse not found for ID: " + warehouseId);
}
List<ComputeNode> aliveComputeNodes =
GlobalStateMgr.getCurrentState().getWarehouseMgr().getAliveComputeNodes(warehouseId);
if (CollectionUtils.isNotEmpty(aliveComputeNodes)) {
List<Long> computeNodes = seqChooseNodeIds(1, false, null, aliveComputeNodes);
if (CollectionUtils.isNotEmpty(computeNodes)) {
return computeNodes.get(0);
}
}
throw new StarRocksException("No compute node alive in warehouse: " + warehouseId);
}
// txnNodeMap is LRU cache, it atomic remove unused entry | ||
accessTxnNodeMapWithWriteLock(txnNodeMap -> txnNodeMap.put(label, chosenNodeId)); | ||
accessTxnNodeMapWithWriteLock(txnNodeMap -> txnNodeMap.put(label, nodeId)); | ||
} else { | ||
nodeId = accessTxnNodeMapWithReadLock(txnNodeMap -> txnNodeMap.get(label)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
Potential NullPointerException
when warehouseName
is empty.
You can modify the code like this:
private Long getNodeId(TransactionOperation txnOperation, String label, String warehouseName) throws StarRocksException {
Long nodeId = null; // Initialize to null for safety
// save label->be hashmap when begin transaction, so that subsequent operator can send to same BE
if (TXN_BEGIN.equals(txnOperation)) {
if (StringUtils.isNotEmpty(warehouseName)) {
Warehouse warehouse = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouse(warehouseName);
if (warehouse != null) { // Check if the warehouse exists
nodeId = GlobalStateMgr.getCurrentState().getNodeMgr()
.getClusterInfo().getNodeSelector().seqChooseComputeIdFromWarehouse(warehouse.getId());
}
// Handle case where warehouse is not found
if (nodeId == null) {
throw new StarRocksException("Warehouse " + warehouseName + " is not valid");
}
}
if (nodeId == null) { // If nodeId was not set, fall back to default behavior
nodeId = GlobalStateMgr.getCurrentState().getNodeMgr()
.getClusterInfo().getNodeSelector().seqChooseBackendOrComputeId();
}
// txnNodeMap is LRU cache, it atomic remove unused entry
accessTxnNodeMapWithWriteLock(txnNodeMap -> txnNodeMap.put(label, nodeId));
} else {
nodeId = accessTxnNodeMapWithReadLock(txnNodeMap -> txnNodeMap.get(label));
}
if (nodeId == null) {
throw new StarRocksException("Could not obtain a valid node ID for label: " + label);
}
return nodeId;
}
|
[Java-Extensions Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
[BE Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
return computeNodes.get(0); | ||
} | ||
} | ||
throw new StarRocksException("No compute node alive in warehouse: " + warehouseId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new StarRocksException("No compute node alive in warehouse: " + warehouseId); | |
throw new StarRocksException("No compute node alive in warehouse: " + warehouse.getName()); |
@@ -250,7 +250,6 @@ public static Optional<Long> getWarehouseIdByNodeId(SystemInfoService systemInfo | |||
LOG.warn("failed to get warehouse id by node id: {}", nodeId); | |||
return Optional.empty(); | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated code change.
@@ -49,7 +49,8 @@ public ResultWrapper handle(BaseRequest request, BaseResponse response) throws S | |||
Long timeoutMillis = txnOperationParams.getTimeoutMillis(); | |||
String label = txnOperationParams.getLabel(); | |||
Channel channel = txnOperationParams.getChannel(); | |||
LOG.info("Handle transaction with channel info, label: {}", label); | |||
LOG.info("Handle transaction with channel info, label: {}, warehouse: {}", label, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this log may not be accurate, because only the begin txn will take the warehouse name from parameter, subsequent ops are not using the warehousename parameter at all.
Long chosenNodeId = GlobalStateMgr.getCurrentState().getNodeMgr() | ||
.getClusterInfo().getNodeSelector().seqChooseBackendOrComputeId(); | ||
nodeId = chosenNodeId; | ||
if (StringUtils.isNotEmpty(warehouseName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can provide a utility function to unify the same logic in LoadAction
should add some tests |
Why I'm doing:
What I'm doing:
Fixes #issue
What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check: