Skip to content

Commit

Permalink
[refact](fe) Refact GlobalTransactionMgr
Browse files Browse the repository at this point in the history
* Only export `GlobalTransactionMgr` method, avoid
  using `DatabaseTransactionMgr`
  • Loading branch information
SWJTU-ZhangLei committed Nov 15, 2023
1 parent 3ad865f commit b5214a8
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 52 deletions.
2 changes: 1 addition & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -5429,7 +5429,7 @@ public void eraseDatabase(long dbId, boolean needEditLog) {
Env.getCurrentEnv().getLoadInstance().removeDbLoadJob(dbId);

// remove database transaction manager
Env.getCurrentEnv().getGlobalTransactionMgr().removeDatabaseTransactionMgr(dbId);
Env.getCurrentGlobalTransactionMgr().removeDatabaseTransactionMgr(dbId);

if (needEditLog) {
Env.getCurrentEnv().getEditLog().logEraseDb(dbId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.transaction.DatabaseTransactionMgr;
import org.apache.doris.transaction.TransactionState;

import com.google.common.base.Joiner;
Expand Down Expand Up @@ -557,9 +556,8 @@ private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
}

try {
DatabaseTransactionMgr dbTransactionMgr
= Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(db.getId());
for (TransactionState transactionState : dbTransactionMgr.getPreCommittedTxnList()) {
for (TransactionState transactionState :
Env.getCurrentGlobalTransactionMgr().getPreCommittedTxnList(db.getId())) {
if (transactionState.getTableIdList().contains(tbl.getId())) {
// If table releate to transaction with precommitted status, do not allow to do balance.
throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
Expand All @@ -568,6 +566,7 @@ private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
}
} catch (AnalysisException e) {
// CHECKSTYLE IGNORE THIS LINE
LOG.warn("Exception:", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ public void dropDb(DropDbStmt stmt) throws DdlException {
long recycleTime = 0;
try {
if (!stmt.isForceDrop()) {
if (Env.getCurrentEnv().getGlobalTransactionMgr().existCommittedTxns(db.getId(), null, null)) {
if (Env.getCurrentGlobalTransactionMgr().existCommittedTxns(db.getId(), null, null)) {
throw new DdlException(
"There are still some transactions in the COMMITTED state waiting to be completed. "
+ "The database [" + dbName
Expand Down Expand Up @@ -885,7 +885,7 @@ public void dropTable(DropTableStmt stmt) throws DdlException {
}

if (!stmt.isForceDrop()) {
if (Env.getCurrentEnv().getGlobalTransactionMgr().existCommittedTxns(db.getId(), table.getId(), null)) {
if (Env.getCurrentGlobalTransactionMgr().existCommittedTxns(db.getId(), table.getId(), null)) {
throw new DdlException(
"There are still some transactions in the COMMITTED state waiting to be completed. "
+ "The table [" + tableName
Expand Down Expand Up @@ -1734,7 +1734,7 @@ public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause
if (!clause.isForceDrop()) {
partition = olapTable.getPartition(partitionName);
if (partition != null) {
if (Env.getCurrentEnv().getGlobalTransactionMgr()
if (Env.getCurrentGlobalTransactionMgr()
.existCommittedTxns(db.getId(), olapTable.getId(), partition.getId())) {
throw new DdlException(
"There are still some transactions in the COMMITTED state waiting to be completed."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.DatabaseTransactionMgr;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -764,10 +763,10 @@ private void cleanLabelInternal(long dbId, String label, boolean isReplay) {

// 2. Remove from DatabaseTransactionMgr
try {
DatabaseTransactionMgr dbTxnMgr = Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(dbId);
dbTxnMgr.cleanLabel(label);
Env.getCurrentGlobalTransactionMgr().cleanLabel(dbId, label);
} catch (AnalysisException e) {
// just ignore, because we don't want to throw any exception here.
LOG.warn("Exception:", e);
}

// 3. Log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
import org.apache.doris.transaction.DatabaseTransactionMgr;
import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
Expand Down Expand Up @@ -1457,16 +1456,15 @@ public List<List<String>> getTasksShowInfo() throws AnalysisException {
if (null == routineLoadTaskInfoList || routineLoadTaskInfoList.isEmpty()) {
return rows;
}
DatabaseTransactionMgr databaseTransactionMgr = Env.getCurrentEnv().getGlobalTransactionMgr()
.getDatabaseTransactionMgr(dbId);

routineLoadTaskInfoList.forEach(entity -> {
long txnId = entity.getTxnId();
if (RoutineLoadTaskInfo.INIT_TXN_ID == txnId) {
rows.add(entity.getTaskShowInfo());
return;
}
TransactionState transactionState = databaseTransactionMgr.getTransactionState(entity.getTxnId());
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, entity.getTxnId());
if (null != transactionState && null != transactionState.getTransactionStatus()) {
entity.setTxnStatus(transactionState.getTransactionStatus());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.doris.thrift.TTxnParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.DatabaseTransactionMgr;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionState;
Expand Down Expand Up @@ -123,9 +122,9 @@ public void beginTxn(long batchId) throws UserException, TException, TimeoutExce
+ "_batch" + batchId + "_" + currentTime;
String targetColumn = Joiner.on(",").join(columns) + "," + DELETE_COLUMN;
GlobalTransactionMgr globalTransactionMgr = Env.getCurrentGlobalTransactionMgr();
DatabaseTransactionMgr databaseTransactionMgr = globalTransactionMgr.getDatabaseTransactionMgr(db.getId());
long txnLimit = db.getTransactionQuotaSize();
if (databaseTransactionMgr.getRunningTxnNums() < txnLimit) {
long runningTxnNums = globalTransactionMgr.getRunningTxnNums(db.getId());
if (runningTxnNums < txnLimit) {
TransactionEntry txnEntry = txnExecutor.getTxnEntry();
TTxnParams txnConf = txnEntry.getTxnConf();
TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
Expand Down Expand Up @@ -184,7 +183,7 @@ public void beginTxn(long batchId) throws UserException, TException, TimeoutExce
}
} else {
String failMsg = "current running txns on db " + db.getId() + " is "
+ databaseTransactionMgr.getRunningTxnNums()
+ runningTxnNums
+ ", larger than limit " + txnLimit;
LOG.warn(failMsg);
throw new BeginTransactionException(failMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@
import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;
import org.apache.doris.thrift.TWaitingTxnStatusRequest;
import org.apache.doris.thrift.TWaitingTxnStatusResult;
import org.apache.doris.transaction.DatabaseTransactionMgr;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
Expand Down Expand Up @@ -1443,8 +1442,8 @@ private List<Table> queryLoadCommitTables(TLoadTxnCommitRequest request, Databas
// if it has multi table, use multi table and update multi table running transaction table ids
if (CollectionUtils.isNotEmpty(request.getTbls())) {
List<Long> multiTableIds = tables.stream().map(Table::getId).collect(Collectors.toList());
Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(db.getId())
.updateMultiTableRunningTransactionTableIds(request.getTxnId(), multiTableIds);
Env.getCurrentGlobalTransactionMgr()
.updateMultiTableRunningTransactionTableIds(db.getId(), request.getTxnId(), multiTableIds);
LOG.debug("txn {} has multi table {}", request.getTxnId(), request.getTbls());
}
return tables;
Expand Down Expand Up @@ -1551,22 +1550,22 @@ private void loadTxn2PCImpl(TLoadTxn2PCRequest request) throws UserException {
throw new UserException("unknown database, database=" + fullDbName);
}

DatabaseTransactionMgr dbTransactionMgr = Env.getCurrentGlobalTransactionMgr()
.getDatabaseTransactionMgr(database.getId());
String txnOperation = request.getOperation().trim();
if (!request.isSetTxnId()) {
List<TransactionStatus> statusList = new ArrayList<>();
statusList.add(TransactionStatus.PRECOMMITTED);
if (txnOperation.equalsIgnoreCase("abort")) {
statusList.add(TransactionStatus.PREPARE);
}
request.setTxnId(dbTransactionMgr.getTransactionIdByLabel(request.getLabel(), statusList));
request.setTxnId(Env.getCurrentGlobalTransactionMgr()
.getTransactionIdByLabel(database.getId(), request.getLabel(), statusList));
}
TransactionState transactionState = dbTransactionMgr.getTransactionState(request.getTxnId());
LOG.debug("txn {} has multi table {}", request.getTxnId(), transactionState.getTableIdList());
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(database.getId(), request.getTxnId());
if (transactionState == null) {
throw new UserException("transaction [" + request.getTxnId() + "] not found");
}
LOG.debug("txn {} has multi table {}", request.getTxnId(), transactionState.getTableIdList());
List<Long> tableIdList = transactionState.getTableIdList();
List<Table> tableList = new ArrayList<>();
// if table was dropped, stream load must can abort.
Expand Down Expand Up @@ -1753,9 +1752,9 @@ private boolean commitTxnImpl(TCommitTxnRequest request) throws UserException {
}

// Step 2: get tables
DatabaseTransactionMgr dbTransactionMgr = Env.getCurrentGlobalTransactionMgr()
.getDatabaseTransactionMgr(db.getId());
TransactionState transactionState = dbTransactionMgr.getTransactionState(request.getTxnId());
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(db.getId(), request.getTxnId());

if (transactionState == null) {
throw new UserException("transaction [" + request.getTxnId() + "] not found");
}
Expand Down Expand Up @@ -1855,8 +1854,8 @@ private void loadTxnRollbackImpl(TLoadTxnRollbackRequest request) throws UserExc
throw new MetaNotFoundException("db " + request.getDb() + " does not exist");
}
long dbId = db.getId();
DatabaseTransactionMgr dbTransactionMgr = Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(dbId);
TransactionState transactionState = dbTransactionMgr.getTransactionState(request.getTxnId());
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, request.getTxnId());
if (transactionState == null) {
throw new UserException("transaction [" + request.getTxnId() + "] not found");
}
Expand Down Expand Up @@ -1935,9 +1934,8 @@ private void rollbackTxnImpl(TRollbackTxnRequest request) throws UserException {
}

// Step 2: get tables
DatabaseTransactionMgr dbTransactionMgr = Env.getCurrentGlobalTransactionMgr()
.getDatabaseTransactionMgr(db.getId());
TransactionState transactionState = dbTransactionMgr.getTransactionState(request.getTxnId());
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(db.getId(), request.getTxnId());
if (transactionState == null) {
throw new UserException("transaction [" + request.getTxnId() + "] not found");
}
Expand Down Expand Up @@ -2092,9 +2090,8 @@ public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequ
}
multiTableFragmentInstanceIdIndexMap.put(request.getTxnId(), ++index);
}
Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(db.getId())
.putTransactionTableNames(request.getTxnId(),
tableIds);
Env.getCurrentGlobalTransactionMgr()
.putTransactionTableNames(db.getId(), request.getTxnId(), tableIds);
LOG.debug("receive stream load multi table put request result: {}", result);
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) {
// invalid all connections cached in ClientPool
ClientPool.backendPool.clearPool(new TNetworkAddress(be.getHost(), be.getBePort()));
if (!isReplay && System.currentTimeMillis() - be.getLastUpdateMs() > 60 * 1000L) {
Env.getCurrentEnv().getGlobalTransactionMgr()
Env.getCurrentGlobalTransactionMgr()
.abortTxnWhenCoordinateBeDown(be.getHost(), 100);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,20 +166,20 @@ private enum PublishResult {

private long lockReportingThresholdMs = Config.lock_reporting_threshold_ms;

protected void readLock() {
private void readLock() {
this.transactionLock.readLock().lock();
}

protected void readUnlock() {
private void readUnlock() {
this.transactionLock.readLock().unlock();
}

protected void writeLock() {
private void writeLock() {
this.transactionLock.writeLock().lock();
lockWriteStart = System.currentTimeMillis();
}

protected void writeUnlock() {
private void writeUnlock() {
checkAndLogWriteLockDuration(lockWriteStart, System.currentTimeMillis());
this.transactionLock.writeLock().unlock();
}
Expand All @@ -195,7 +195,7 @@ public long getDbId() {
return dbId;
}

public TransactionState getTransactionState(Long transactionId) {
protected TransactionState getTransactionState(Long transactionId) {
readLock();
try {
TransactionState transactionState = idToRunningTransactionState.get(transactionId);
Expand Down Expand Up @@ -223,7 +223,7 @@ protected Set<Long> unprotectedGetTxnIdsByLabel(String label) {
return labelToTxnIds.get(label);
}

public int getRunningTxnNums() {
protected int getRunningTxnNums() {
return runningTxnNums;
}

Expand Down Expand Up @@ -863,7 +863,7 @@ public TransactionStatus getLabelState(String label) {
}
}

public Long getTransactionIdByLabel(String label) {
protected Long getTransactionIdByLabel(String label) {
readLock();
try {
Set<Long> existingTxnIds = unprotectedGetTxnIdsByLabel(label);
Expand All @@ -877,7 +877,7 @@ public Long getTransactionIdByLabel(String label) {
}
}

public Long getTransactionIdByLabel(String label, List<TransactionStatus> statusList) throws UserException {
protected Long getTransactionIdByLabel(String label, List<TransactionStatus> statusList) throws UserException {
readLock();
try {
TransactionState findTxn = null;
Expand Down Expand Up @@ -905,7 +905,7 @@ public Long getTransactionIdByLabel(String label, List<TransactionStatus> status
}
}

public List<TransactionState> getPreCommittedTxnList() {
protected List<TransactionState> getPreCommittedTxnList() {
readLock();
try {
// only send task to preCommitted transaction
Expand All @@ -919,7 +919,7 @@ public List<TransactionState> getPreCommittedTxnList() {
}
}

public List<TransactionState> getCommittedTxnList() {
protected List<TransactionState> getCommittedTxnList() {
readLock();
try {
// only send task to committed transaction
Expand Down Expand Up @@ -2027,7 +2027,7 @@ public void unprotectWriteAllTransactionStates(DataOutput out) throws IOExceptio
}
}

public void cleanLabel(String label) {
protected void cleanLabel(String label) {
Set<Long> removedTxnIds = Sets.newHashSet();
writeLock();
try {
Expand Down Expand Up @@ -2120,7 +2120,7 @@ private static String getStackTrace(Thread t) {
return msgBuilder.toString();
}

public void putTransactionTableNames(long transactionId, List<Long> tableIds) {
protected void putTransactionTableNames(long transactionId, List<Long> tableIds) {
if (CollectionUtils.isEmpty(tableIds)) {
return;
}
Expand All @@ -2135,7 +2135,7 @@ public void putTransactionTableNames(long transactionId, List<Long> tableIds) {
* Update transaction table ids by transaction id.
* it's used for multi table transaction.
*/
public void updateMultiTableRunningTransactionTableIds(long transactionId, List<Long> tableIds) {
protected void updateMultiTableRunningTransactionTableIds(long transactionId, List<Long> tableIds) {
if (CollectionUtils.isEmpty(tableIds)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -750,4 +750,30 @@ private long updateTxnMetric(Function<DatabaseTransactionMgr, Long> metricSuppli
return total;
}

public List<TransactionState> getPreCommittedTxnList(Long dbId) throws AnalysisException {
return getDatabaseTransactionMgr(dbId).getPreCommittedTxnList();
}

public void cleanLabel(Long dbId, String label) throws AnalysisException {
getDatabaseTransactionMgr(dbId).cleanLabel(label);
}

public Long getTransactionIdByLabel(Long dbId, String label, List<TransactionStatus> statusList)
throws UserException {
return getDatabaseTransactionMgr(dbId).getTransactionIdByLabel(label, statusList);
}

public int getRunningTxnNums(Long dbId) throws AnalysisException {
return getDatabaseTransactionMgr(dbId).getRunningTxnNums();
}

public void updateMultiTableRunningTransactionTableIds(Long dbId, Long transactionId, List<Long> tableIds)
throws AnalysisException {
getDatabaseTransactionMgr(dbId).updateMultiTableRunningTransactionTableIds(transactionId, tableIds);
}

public void putTransactionTableNames(Long dbId, Long transactionId, List<Long> tableIds)
throws AnalysisException {
getDatabaseTransactionMgr(dbId).putTransactionTableNames(transactionId, tableIds);
}
}

0 comments on commit b5214a8

Please sign in to comment.