From b5214a83d343e9c9861a5d735dac1bc5cb1292bc Mon Sep 17 00:00:00 2001 From: Lei Zhang <27994433+SWJTU-ZhangLei@users.noreply.github.com> Date: Wed, 15 Nov 2023 20:02:52 +0800 Subject: [PATCH] [refact](fe) Refact GlobalTransactionMgr * Only export `GlobalTransactionMgr` method, avoid using `DatabaseTransactionMgr` --- .../java/org/apache/doris/catalog/Env.java | 2 +- .../apache/doris/clone/TabletScheduler.java | 7 ++-- .../doris/datasource/InternalCatalog.java | 6 ++-- .../apache/doris/load/loadv2/LoadManager.java | 5 ++- .../load/routineload/RoutineLoadJob.java | 6 ++-- .../load/sync/canal/CanalSyncChannel.java | 7 ++-- .../doris/service/FrontendServiceImpl.java | 35 +++++++++---------- .../org/apache/doris/system/HeartbeatMgr.java | 2 +- .../transaction/DatabaseTransactionMgr.java | 26 +++++++------- .../transaction/GlobalTransactionMgr.java | 26 ++++++++++++++ 10 files changed, 70 insertions(+), 52 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 24945649912a04..3eededfdae3a6e 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -5429,7 +5429,7 @@ public void eraseDatabase(long dbId, boolean needEditLog) { Env.getCurrentEnv().getLoadInstance().removeDbLoadJob(dbId); // remove database transaction manager - Env.getCurrentEnv().getGlobalTransactionMgr().removeDatabaseTransactionMgr(dbId); + Env.getCurrentGlobalTransactionMgr().removeDatabaseTransactionMgr(dbId); if (needEditLog) { Env.getCurrentEnv().getEditLog().logEraseDb(dbId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 4960fc6be144b7..dcf503b8ad70a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -62,7 +62,6 @@ import org.apache.doris.thrift.TFinishTaskRequest; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStorageMedium; -import org.apache.doris.transaction.DatabaseTransactionMgr; import org.apache.doris.transaction.TransactionState; import com.google.common.base.Joiner; @@ -557,9 +556,8 @@ private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) } try { - DatabaseTransactionMgr dbTransactionMgr - = Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(db.getId()); - for (TransactionState transactionState : dbTransactionMgr.getPreCommittedTxnList()) { + for (TransactionState transactionState : + Env.getCurrentGlobalTransactionMgr().getPreCommittedTxnList(db.getId())) { if (transactionState.getTableIdList().contains(tbl.getId())) { // If table releate to transaction with precommitted status, do not allow to do balance. throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, @@ -568,6 +566,7 @@ private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) } } catch (AnalysisException e) { // CHECKSTYLE IGNORE THIS LINE + LOG.warn("Exception:", e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 6b1e3d90e60d13..450e09e0724db4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -500,7 +500,7 @@ public void dropDb(DropDbStmt stmt) throws DdlException { long recycleTime = 0; try { if (!stmt.isForceDrop()) { - if (Env.getCurrentEnv().getGlobalTransactionMgr().existCommittedTxns(db.getId(), null, null)) { + if (Env.getCurrentGlobalTransactionMgr().existCommittedTxns(db.getId(), null, null)) { throw new DdlException( "There are still some transactions in the COMMITTED state waiting to be completed. " + "The database [" + dbName @@ -885,7 +885,7 @@ public void dropTable(DropTableStmt stmt) throws DdlException { } if (!stmt.isForceDrop()) { - if (Env.getCurrentEnv().getGlobalTransactionMgr().existCommittedTxns(db.getId(), table.getId(), null)) { + if (Env.getCurrentGlobalTransactionMgr().existCommittedTxns(db.getId(), table.getId(), null)) { throw new DdlException( "There are still some transactions in the COMMITTED state waiting to be completed. " + "The table [" + tableName @@ -1734,7 +1734,7 @@ public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause if (!clause.isForceDrop()) { partition = olapTable.getPartition(partitionName); if (partition != null) { - if (Env.getCurrentEnv().getGlobalTransactionMgr() + if (Env.getCurrentGlobalTransactionMgr() .existCommittedTxns(db.getId(), olapTable.getId(), partition.getId())) { throw new DdlException( "There are still some transactions in the COMMITTED state waiting to be completed." diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 3f05e2c5a9887a..009c5144fad29e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -50,7 +50,6 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.thrift.TUniqueId; -import org.apache.doris.transaction.DatabaseTransactionMgr; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; @@ -764,10 +763,10 @@ private void cleanLabelInternal(long dbId, String label, boolean isReplay) { // 2. Remove from DatabaseTransactionMgr try { - DatabaseTransactionMgr dbTxnMgr = Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(dbId); - dbTxnMgr.cleanLabel(label); + Env.getCurrentGlobalTransactionMgr().cleanLabel(dbId, label); } catch (AnalysisException e) { // just ignore, because we don't want to throw any exception here. + LOG.warn("Exception:", e); } // 3. Log diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 86fe00a01a9a58..ca650068c07235 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -64,7 +64,6 @@ import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.AbstractTxnStateChangeCallback; -import org.apache.doris.transaction.DatabaseTransactionMgr; import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; @@ -1457,8 +1456,6 @@ public List> getTasksShowInfo() throws AnalysisException { if (null == routineLoadTaskInfoList || routineLoadTaskInfoList.isEmpty()) { return rows; } - DatabaseTransactionMgr databaseTransactionMgr = Env.getCurrentEnv().getGlobalTransactionMgr() - .getDatabaseTransactionMgr(dbId); routineLoadTaskInfoList.forEach(entity -> { long txnId = entity.getTxnId(); @@ -1466,7 +1463,8 @@ public List> getTasksShowInfo() throws AnalysisException { rows.add(entity.getTaskShowInfo()); return; } - TransactionState transactionState = databaseTransactionMgr.getTransactionState(entity.getTxnId()); + TransactionState transactionState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(dbId, entity.getTxnId()); if (null != transactionState && null != transactionState.getTransactionStatus()) { entity.setTxnStatus(transactionState.getTransactionStatus()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java index 5b10ecea818ffa..73cfc77f0cb217 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java @@ -42,7 +42,6 @@ import org.apache.doris.thrift.TTxnParams; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.BeginTransactionException; -import org.apache.doris.transaction.DatabaseTransactionMgr; import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.TransactionEntry; import org.apache.doris.transaction.TransactionState; @@ -123,9 +122,9 @@ public void beginTxn(long batchId) throws UserException, TException, TimeoutExce + "_batch" + batchId + "_" + currentTime; String targetColumn = Joiner.on(",").join(columns) + "," + DELETE_COLUMN; GlobalTransactionMgr globalTransactionMgr = Env.getCurrentGlobalTransactionMgr(); - DatabaseTransactionMgr databaseTransactionMgr = globalTransactionMgr.getDatabaseTransactionMgr(db.getId()); long txnLimit = db.getTransactionQuotaSize(); - if (databaseTransactionMgr.getRunningTxnNums() < txnLimit) { + long runningTxnNums = globalTransactionMgr.getRunningTxnNums(db.getId()); + if (runningTxnNums < txnLimit) { TransactionEntry txnEntry = txnExecutor.getTxnEntry(); TTxnParams txnConf = txnEntry.getTxnConf(); TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING; @@ -184,7 +183,7 @@ public void beginTxn(long batchId) throws UserException, TException, TimeoutExce } } else { String failMsg = "current running txns on db " + db.getId() + " is " - + databaseTransactionMgr.getRunningTxnNums() + + runningTxnNums + ", larger than limit " + txnLimit; LOG.warn(failMsg); throw new BeginTransactionException(failMsg); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 65fc162279b5d9..a461a97e42c62c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -216,7 +216,6 @@ import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest; import org.apache.doris.thrift.TWaitingTxnStatusRequest; import org.apache.doris.thrift.TWaitingTxnStatusResult; -import org.apache.doris.transaction.DatabaseTransactionMgr; import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.TxnCoordinator; @@ -1443,8 +1442,8 @@ private List queryLoadCommitTables(TLoadTxnCommitRequest request, Databas // if it has multi table, use multi table and update multi table running transaction table ids if (CollectionUtils.isNotEmpty(request.getTbls())) { List multiTableIds = tables.stream().map(Table::getId).collect(Collectors.toList()); - Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(db.getId()) - .updateMultiTableRunningTransactionTableIds(request.getTxnId(), multiTableIds); + Env.getCurrentGlobalTransactionMgr() + .updateMultiTableRunningTransactionTableIds(db.getId(), request.getTxnId(), multiTableIds); LOG.debug("txn {} has multi table {}", request.getTxnId(), request.getTbls()); } return tables; @@ -1551,8 +1550,6 @@ private void loadTxn2PCImpl(TLoadTxn2PCRequest request) throws UserException { throw new UserException("unknown database, database=" + fullDbName); } - DatabaseTransactionMgr dbTransactionMgr = Env.getCurrentGlobalTransactionMgr() - .getDatabaseTransactionMgr(database.getId()); String txnOperation = request.getOperation().trim(); if (!request.isSetTxnId()) { List statusList = new ArrayList<>(); @@ -1560,13 +1557,15 @@ private void loadTxn2PCImpl(TLoadTxn2PCRequest request) throws UserException { if (txnOperation.equalsIgnoreCase("abort")) { statusList.add(TransactionStatus.PREPARE); } - request.setTxnId(dbTransactionMgr.getTransactionIdByLabel(request.getLabel(), statusList)); + request.setTxnId(Env.getCurrentGlobalTransactionMgr() + .getTransactionIdByLabel(database.getId(), request.getLabel(), statusList)); } - TransactionState transactionState = dbTransactionMgr.getTransactionState(request.getTxnId()); - LOG.debug("txn {} has multi table {}", request.getTxnId(), transactionState.getTableIdList()); + TransactionState transactionState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(database.getId(), request.getTxnId()); if (transactionState == null) { throw new UserException("transaction [" + request.getTxnId() + "] not found"); } + LOG.debug("txn {} has multi table {}", request.getTxnId(), transactionState.getTableIdList()); List tableIdList = transactionState.getTableIdList(); List
tableList = new ArrayList<>(); // if table was dropped, stream load must can abort. @@ -1753,9 +1752,9 @@ private boolean commitTxnImpl(TCommitTxnRequest request) throws UserException { } // Step 2: get tables - DatabaseTransactionMgr dbTransactionMgr = Env.getCurrentGlobalTransactionMgr() - .getDatabaseTransactionMgr(db.getId()); - TransactionState transactionState = dbTransactionMgr.getTransactionState(request.getTxnId()); + TransactionState transactionState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(db.getId(), request.getTxnId()); + if (transactionState == null) { throw new UserException("transaction [" + request.getTxnId() + "] not found"); } @@ -1855,8 +1854,8 @@ private void loadTxnRollbackImpl(TLoadTxnRollbackRequest request) throws UserExc throw new MetaNotFoundException("db " + request.getDb() + " does not exist"); } long dbId = db.getId(); - DatabaseTransactionMgr dbTransactionMgr = Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(dbId); - TransactionState transactionState = dbTransactionMgr.getTransactionState(request.getTxnId()); + TransactionState transactionState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(dbId, request.getTxnId()); if (transactionState == null) { throw new UserException("transaction [" + request.getTxnId() + "] not found"); } @@ -1935,9 +1934,8 @@ private void rollbackTxnImpl(TRollbackTxnRequest request) throws UserException { } // Step 2: get tables - DatabaseTransactionMgr dbTransactionMgr = Env.getCurrentGlobalTransactionMgr() - .getDatabaseTransactionMgr(db.getId()); - TransactionState transactionState = dbTransactionMgr.getTransactionState(request.getTxnId()); + TransactionState transactionState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(db.getId(), request.getTxnId()); if (transactionState == null) { throw new UserException("transaction [" + request.getTxnId() + "] not found"); } @@ -2092,9 +2090,8 @@ public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequ } multiTableFragmentInstanceIdIndexMap.put(request.getTxnId(), ++index); } - Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(db.getId()) - .putTransactionTableNames(request.getTxnId(), - tableIds); + Env.getCurrentGlobalTransactionMgr() + .putTransactionTableNames(db.getId(), request.getTxnId(), tableIds); LOG.debug("receive stream load multi table put request result: {}", result); } catch (Throwable e) { LOG.warn("catch unknown result.", e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index a285d529a268f8..4281ba8f3765f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -175,7 +175,7 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) { // invalid all connections cached in ClientPool ClientPool.backendPool.clearPool(new TNetworkAddress(be.getHost(), be.getBePort())); if (!isReplay && System.currentTimeMillis() - be.getLastUpdateMs() > 60 * 1000L) { - Env.getCurrentEnv().getGlobalTransactionMgr() + Env.getCurrentGlobalTransactionMgr() .abortTxnWhenCoordinateBeDown(be.getHost(), 100); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index cac271af4aab97..4e36891867ebaa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -166,20 +166,20 @@ private enum PublishResult { private long lockReportingThresholdMs = Config.lock_reporting_threshold_ms; - protected void readLock() { + private void readLock() { this.transactionLock.readLock().lock(); } - protected void readUnlock() { + private void readUnlock() { this.transactionLock.readLock().unlock(); } - protected void writeLock() { + private void writeLock() { this.transactionLock.writeLock().lock(); lockWriteStart = System.currentTimeMillis(); } - protected void writeUnlock() { + private void writeUnlock() { checkAndLogWriteLockDuration(lockWriteStart, System.currentTimeMillis()); this.transactionLock.writeLock().unlock(); } @@ -195,7 +195,7 @@ public long getDbId() { return dbId; } - public TransactionState getTransactionState(Long transactionId) { + protected TransactionState getTransactionState(Long transactionId) { readLock(); try { TransactionState transactionState = idToRunningTransactionState.get(transactionId); @@ -223,7 +223,7 @@ protected Set unprotectedGetTxnIdsByLabel(String label) { return labelToTxnIds.get(label); } - public int getRunningTxnNums() { + protected int getRunningTxnNums() { return runningTxnNums; } @@ -863,7 +863,7 @@ public TransactionStatus getLabelState(String label) { } } - public Long getTransactionIdByLabel(String label) { + protected Long getTransactionIdByLabel(String label) { readLock(); try { Set existingTxnIds = unprotectedGetTxnIdsByLabel(label); @@ -877,7 +877,7 @@ public Long getTransactionIdByLabel(String label) { } } - public Long getTransactionIdByLabel(String label, List statusList) throws UserException { + protected Long getTransactionIdByLabel(String label, List statusList) throws UserException { readLock(); try { TransactionState findTxn = null; @@ -905,7 +905,7 @@ public Long getTransactionIdByLabel(String label, List status } } - public List getPreCommittedTxnList() { + protected List getPreCommittedTxnList() { readLock(); try { // only send task to preCommitted transaction @@ -919,7 +919,7 @@ public List getPreCommittedTxnList() { } } - public List getCommittedTxnList() { + protected List getCommittedTxnList() { readLock(); try { // only send task to committed transaction @@ -2027,7 +2027,7 @@ public void unprotectWriteAllTransactionStates(DataOutput out) throws IOExceptio } } - public void cleanLabel(String label) { + protected void cleanLabel(String label) { Set removedTxnIds = Sets.newHashSet(); writeLock(); try { @@ -2120,7 +2120,7 @@ private static String getStackTrace(Thread t) { return msgBuilder.toString(); } - public void putTransactionTableNames(long transactionId, List tableIds) { + protected void putTransactionTableNames(long transactionId, List tableIds) { if (CollectionUtils.isEmpty(tableIds)) { return; } @@ -2135,7 +2135,7 @@ public void putTransactionTableNames(long transactionId, List tableIds) { * Update transaction table ids by transaction id. * it's used for multi table transaction. */ - public void updateMultiTableRunningTransactionTableIds(long transactionId, List tableIds) { + protected void updateMultiTableRunningTransactionTableIds(long transactionId, List tableIds) { if (CollectionUtils.isEmpty(tableIds)) { return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 8053e9f382cbbc..ee228a10fc867d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -750,4 +750,30 @@ private long updateTxnMetric(Function metricSuppli return total; } + public List getPreCommittedTxnList(Long dbId) throws AnalysisException { + return getDatabaseTransactionMgr(dbId).getPreCommittedTxnList(); + } + + public void cleanLabel(Long dbId, String label) throws AnalysisException { + getDatabaseTransactionMgr(dbId).cleanLabel(label); + } + + public Long getTransactionIdByLabel(Long dbId, String label, List statusList) + throws UserException { + return getDatabaseTransactionMgr(dbId).getTransactionIdByLabel(label, statusList); + } + + public int getRunningTxnNums(Long dbId) throws AnalysisException { + return getDatabaseTransactionMgr(dbId).getRunningTxnNums(); + } + + public void updateMultiTableRunningTransactionTableIds(Long dbId, Long transactionId, List tableIds) + throws AnalysisException { + getDatabaseTransactionMgr(dbId).updateMultiTableRunningTransactionTableIds(transactionId, tableIds); + } + + public void putTransactionTableNames(Long dbId, Long transactionId, List tableIds) + throws AnalysisException { + getDatabaseTransactionMgr(dbId).putTransactionTableNames(transactionId, tableIds); + } }