Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix transactional stream load with warehouse property not work rightly #56464

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.transaction.TransactionState;
import com.starrocks.transaction.TransactionState.LoadJobSourceType;
import com.starrocks.warehouse.Warehouse;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -224,20 +225,18 @@ protected void executeTransaction(BaseRequest request, BaseResponse response) th
// redirect transaction op to BE
TNetworkAddress redirectAddress = result.getRedirectAddress();
if (null == redirectAddress) {
Long nodeId = getNodeId(txnOperation, label);
ComputeNode node = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getBackend(nodeId);
Long nodeId = getNodeId(txnOperation, label, txnOperationParams.getWarehouseName());
ComputeNode node = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getBackendOrComputeNode(nodeId);
if (node == null) {
node = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getComputeNode(nodeId);
if (node == null) {
throw new StarRocksException("Node " + nodeId + " is not alive");
}
throw new StarRocksException("Node " + nodeId + " is not alive");
}

redirectAddress = new TNetworkAddress(node.getHost(), node.getHttpPort());
}

LOG.info("Redirect transaction action to destination={}, db: {}, table: {}, op: {}, label: {}",
redirectAddress, txnOperationParams.getDbName(), txnOperationParams.getTableName(), txnOperation, label);
LOG.info("Redirect transaction action to destination={}, db: {}, table: {}, op: {}, label: {}, warehouse: {}",
redirectAddress, txnOperationParams.getDbName(), txnOperationParams.getTableName(), txnOperation, label,
txnOperationParams.getWarehouseName());
redirectTo(request, response, redirectAddress);
}

Expand Down Expand Up @@ -279,15 +278,20 @@ private TransactionOperationHandler getTxnOperationHandler(TransactionOperationP
? new BypassWriteTransactionHandler(params) : new TransactionWithoutChannelHandler(params);
}

private Long getNodeId(TransactionOperation txnOperation, String label) throws StarRocksException {
private Long getNodeId(TransactionOperation txnOperation, String label, String warehouseName) throws StarRocksException {
Long nodeId;
// save label->be hashmap when begin transaction, so that subsequent operator can send to same BE
if (TXN_BEGIN.equals(txnOperation)) {
Long chosenNodeId = GlobalStateMgr.getCurrentState().getNodeMgr()
.getClusterInfo().getNodeSelector().seqChooseBackendOrComputeId();
nodeId = chosenNodeId;
if (StringUtils.isNotEmpty(warehouseName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can provide a utility function to unify the same logic in LoadAction

Warehouse warehouse = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouse(warehouseName);
nodeId = GlobalStateMgr.getCurrentState().getNodeMgr()
.getClusterInfo().getNodeSelector().seqChooseComputeIdFromWarehouse(warehouse.getId());
} else {
nodeId = GlobalStateMgr.getCurrentState().getNodeMgr()
.getClusterInfo().getNodeSelector().seqChooseBackendOrComputeId();
}
// txnNodeMap is LRU cache, it atomic remove unused entry
accessTxnNodeMapWithWriteLock(txnNodeMap -> txnNodeMap.put(label, chosenNodeId));
accessTxnNodeMapWithWriteLock(txnNodeMap -> txnNodeMap.put(label, nodeId));
} else {
nodeId = accessTxnNodeMapWithReadLock(txnNodeMap -> txnNodeMap.get(label));
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
Potential NullPointerException when warehouseName is empty.

You can modify the code like this:

private Long getNodeId(TransactionOperation txnOperation, String label, String warehouseName) throws StarRocksException {
    Long nodeId = null;  // Initialize to null for safety
    // save label->be hashmap when begin transaction, so that subsequent operator can send to same BE
    if (TXN_BEGIN.equals(txnOperation)) {
        if (StringUtils.isNotEmpty(warehouseName)) {
            Warehouse warehouse = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouse(warehouseName);
            if (warehouse != null) {  // Check if the warehouse exists
                nodeId = GlobalStateMgr.getCurrentState().getNodeMgr()
                        .getClusterInfo().getNodeSelector().seqChooseComputeIdFromWarehouse(warehouse.getId());
            }
            // Handle case where warehouse is not found
            if (nodeId == null) {
                throw new StarRocksException("Warehouse " + warehouseName + " is not valid");
            }
        }
        if (nodeId == null) {  // If nodeId was not set, fall back to default behavior
            nodeId = GlobalStateMgr.getCurrentState().getNodeMgr()
                    .getClusterInfo().getNodeSelector().seqChooseBackendOrComputeId();
        }
        // txnNodeMap is LRU cache, it atomic remove unused entry
        accessTxnNodeMapWithWriteLock(txnNodeMap -> txnNodeMap.put(label, nodeId));
    } else {
        nodeId = accessTxnNodeMapWithReadLock(txnNodeMap -> txnNodeMap.get(label));
    }

    if (nodeId == null) {
        throw new StarRocksException("Could not obtain a valid node ID for label: " + label);
    }

    return nodeId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public ResultWrapper handle(BaseRequest request, BaseResponse response) throws S
Long timeoutMillis = txnOperationParams.getTimeoutMillis();
String label = txnOperationParams.getLabel();
Channel channel = txnOperationParams.getChannel();
LOG.info("Handle transaction with channel info, label: {}", label);
LOG.info("Handle transaction with channel info, label: {}, warehouse: {}", label,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this log may not be accurate, because only the begin txn will take the warehouse name from parameter, subsequent ops are not using the warehousename parameter at all.

txnOperationParams.getWarehouseName());

TransactionResult result = new TransactionResult();
switch (txnOperation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public ResultWrapper handle(BaseRequest request, BaseResponse response) throws S
String dbName = txnOperationParams.getDbName();
String label = txnOperationParams.getLabel();
Long timeoutMillis = txnOperationParams.getTimeoutMillis();
LOG.info("Handle transaction without channel info, label: {}", label);
LOG.info("Handle transaction without channel info, label: {}, warehouse: {}", label,
txnOperationParams.getWarehouseName());

Database db = Optional.ofNullable(GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbName))
.orElseThrow(() -> new StarRocksException(String.format("Database[%s] does not exist.", dbName)));
Expand Down
1 change: 0 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/lake/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ public static Optional<Long> getWarehouseIdByNodeId(SystemInfoService systemInfo
LOG.warn("failed to get warehouse id by node id: {}", nodeId);
return Optional.empty();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated code change.

return Optional.of(node.getWarehouseId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ public void beginTxn(int channelId, int channelNum, TUniqueId requestId, boolean
resp.addResultEntry("TxnId", this.txnId);
resp.addResultEntry("BeginChannel", channelNum);
resp.addResultEntry("BeginTxnTimeMs", this.beforeLoadTimeMs - this.createTimeMs);
LOG.info("stream load {} channel_id {} begin. db: {}, tbl: {}, txn_id: {}",
label, channelId, dbName, tableName, txnId);
LOG.info("stream load {} channel_id {} begin. db: {}, tbl: {}, txn_id: {}, warehouse: {}",
label, channelId, dbName, tableName, txnId, warehouseId);
break;
}
case BEFORE_LOAD:
Expand All @@ -328,8 +328,8 @@ public void beginTxn(int channelId, int channelNum, TUniqueId requestId, boolean
}
this.channels.set(channelId, State.BEFORE_LOAD);
resp.addResultEntry("BeginChannel", channelNum);
LOG.info("stream load {} channel_id {} begin. db: {}, tbl: {}, txn_id: {}",
label, channelId, dbName, tableName, txnId);
LOG.info("stream load {} channel_id {} begin. db: {}, tbl: {}, txn_id: {}, warehouse: {}",
label, channelId, dbName, tableName, txnId, warehouseId);
break;
}
case PREPARED: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1230,8 +1230,8 @@ private void checkPasswordAndLoadPriv(String user, String passwd, String db, Str
@Override
public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TException {
String clientAddr = getClientAddrAsString();
LOG.info("receive txn begin request, db: {}, tbl: {}, label: {}, backend: {}",
request.getDb(), request.getTbl(), request.getLabel(), clientAddr);
LOG.info("receive txn begin request, db: {}, tbl: {}, label: {}, backendAddr: {}, backendId: {}",
request.getDb(), request.getTbl(), request.getLabel(), clientAddr, request.getBackend_id());
LOG.debug("txn begin request: {}", request);

TLoadTxnBeginResult result = new TLoadTxnBeginResult();
Expand Down
19 changes: 19 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/system/NodeSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import com.starrocks.clone.TabletChecker;
import com.starrocks.common.Pair;
import com.starrocks.common.StarRocksException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.thrift.TStorageMedium;
import com.starrocks.warehouse.Warehouse;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -65,6 +67,23 @@ public List<Long> seqChooseBackendIdsByStorageMedium(int backendNum, boolean nee
v -> !v.checkDiskExceedLimitForCreate(storageMedium));
}

/**
* It's the caller's responsibility to make sure warehouse existence is pre-checked
*/
public Long seqChooseComputeIdFromWarehouse(long warehouseId) throws StarRocksException {
Warehouse warehouse = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouse(warehouseId);
assert warehouse != null;
List<ComputeNode> aliveComputeNodes =
GlobalStateMgr.getCurrentState().getWarehouseMgr().getAliveComputeNodes(warehouseId);
if (CollectionUtils.isNotEmpty(aliveComputeNodes)) {
List<Long> computeNodes = seqChooseNodeIds(1, false, null, aliveComputeNodes);
if (CollectionUtils.isNotEmpty(computeNodes)) {
return computeNodes.get(0);
}
}
throw new StarRocksException("No compute node alive in warehouse: " + warehouseId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new StarRocksException("No compute node alive in warehouse: " + warehouseId);
throw new StarRocksException("No compute node alive in warehouse: " + warehouse.getName());

}

public Long seqChooseBackendOrComputeId() throws StarRocksException {
List<Long> backendIds = seqChooseBackendIds(1, true, false, null);
if (CollectionUtils.isNotEmpty(backendIds)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
A null pointer exception could be thrown if GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouse(warehouseId) returns null.

You can modify the code like this:

public Long seqChooseComputeIdFromWarehouse(long warehouseId) throws StarRocksException {
    Warehouse warehouse = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouse(warehouseId);
    if (warehouse == null) {
        throw new StarRocksException("Warehouse not found for ID: " + warehouseId);
    }
    List<ComputeNode> aliveComputeNodes =
            GlobalStateMgr.getCurrentState().getWarehouseMgr().getAliveComputeNodes(warehouseId);
    if (CollectionUtils.isNotEmpty(aliveComputeNodes)) {
        List<Long> computeNodes = seqChooseNodeIds(1, false, null, aliveComputeNodes);
        if (CollectionUtils.isNotEmpty(computeNodes)) {
            return computeNodes.get(0);
        }
    }
    throw new StarRocksException("No compute node alive in warehouse: " + warehouseId);
}

Expand Down
Loading