Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix table load metrics missing for transaction load #44991

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1459,13 +1459,9 @@ void loadTxnCommitImpl(TLoadTxnCommitRequest request, TStatus status) throws Sta
if (!(attachment instanceof ManualLoadTxnCommitAttachment)) {
break;
}
ManualLoadTxnCommitAttachment streamAttachment = (ManualLoadTxnCommitAttachment) attachment;
entity.counterStreamLoadFinishedTotal.increase(1L);
entity.counterStreamLoadBytesTotal.increase(streamAttachment.getReceivedBytes());
entity.counterStreamLoadRowsTotal.increase(streamAttachment.getLoadedRows());

if (streamLoadtask != null) {
streamLoadtask.setLoadState(streamAttachment, "");
streamLoadtask.setLoadState(attachment, "");
}

break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,11 @@
import com.starrocks.common.util.concurrent.lock.LockType;
import com.starrocks.common.util.concurrent.lock.Locker;
import com.starrocks.lake.LakeTableHelper;
import com.starrocks.load.loadv2.ManualLoadTxnCommitAttachment;
import com.starrocks.load.routineload.RLTaskTxnCommitAttachment;
import com.starrocks.metric.MetricRepo;
import com.starrocks.metric.TableMetricsEntity;
import com.starrocks.metric.TableMetricsRegistry;
import com.starrocks.persist.EditLog;
import com.starrocks.persist.metablock.SRMetaBlockException;
import com.starrocks.persist.metablock.SRMetaBlockWriter;
Expand Down Expand Up @@ -1282,6 +1285,7 @@ public void finishTransaction(long transactionId, Set<Long> errorReplicaIds) thr
GlobalStateMgr.getCurrentState().getOperationListenerBus().onStreamJobTransactionFinish(transactionState);
GlobalStateMgr.getCurrentState().getLocalMetastore().handleMVRepair(transactionState);
LOG.info("finish transaction {} successfully", transactionState);
updateTransactionMetrics(transactionState);
}

protected void unprotectedCommitPreparedTransaction(TransactionState transactionState, Database db) {
Expand Down Expand Up @@ -1976,6 +1980,7 @@ public void finishTransactionNew(TransactionState transactionState, Set<Long> pu
GlobalStateMgr.getCurrentState().getOperationListenerBus().onStreamJobTransactionFinish(transactionState);
GlobalStateMgr.getCurrentState().getLocalMetastore().handleMVRepair(transactionState);
LOG.info("finish transaction {} successfully", transactionState);
updateTransactionMetrics(transactionState);
}

public void finishTransactionBatch(TransactionStateBatch stateBatch, Set<Long> errorReplicaIds) {
Expand Down Expand Up @@ -2041,6 +2046,25 @@ public void finishTransactionBatch(TransactionStateBatch stateBatch, Set<Long> e
}

LOG.info("finish transaction {} batch successfully", stateBatch);
for (TransactionState transactionState : stateBatch.getTransactionStates()) {
updateTransactionMetrics(transactionState);
}
}

private void updateTransactionMetrics(TransactionState txnState) {
if (txnState.getTableIdList().isEmpty()) {
return;
}
TxnCommitAttachment attachment = txnState.getTxnCommitAttachment();
if (!(attachment instanceof ManualLoadTxnCommitAttachment)) {
return;
}
long tableId = txnState.getTableIdList().get(0);
ManualLoadTxnCommitAttachment streamAttachment = (ManualLoadTxnCommitAttachment) attachment;
TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(tableId);
entity.counterStreamLoadFinishedTotal.increase(1L);
entity.counterStreamLoadRowsTotal.increase(streamAttachment.getLoadedRows());
entity.counterStreamLoadBytesTotal.increase(streamAttachment.getLoadedBytes());
}

public String getTxnPublishTimeoutDebugInfo(long txnId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import com.starrocks.common.StarRocksException;
import com.starrocks.common.jmockit.Deencapsulation;
import com.starrocks.common.util.concurrent.lock.LockTimeoutException;
import com.starrocks.load.loadv2.ManualLoadTxnCommitAttachment;
import com.starrocks.load.routineload.KafkaProgress;
import com.starrocks.load.routineload.KafkaRoutineLoadJob;
import com.starrocks.load.routineload.KafkaTaskInfo;
Expand All @@ -63,6 +64,8 @@
import com.starrocks.load.routineload.RoutineLoadMgr;
import com.starrocks.load.routineload.RoutineLoadTaskInfo;
import com.starrocks.metric.MetricRepo;
import com.starrocks.metric.TableMetricsEntity;
import com.starrocks.metric.TableMetricsRegistry;
import com.starrocks.persist.EditLog;
import com.starrocks.persist.ImageFormatVersion;
import com.starrocks.persist.ImageWriter;
Expand All @@ -71,6 +74,7 @@
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.thrift.TKafkaRLTaskProgress;
import com.starrocks.thrift.TLoadSourceType;
import com.starrocks.thrift.TManualLoadTxnCommitAttachment;
import com.starrocks.thrift.TRLTaskTxnCommitAttachment;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.transaction.TransactionState.LoadJobSourceType;
Expand Down Expand Up @@ -822,8 +826,18 @@ public void testPrepareTransaction() throws StarRocksException {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);

TManualLoadTxnCommitAttachment loadTxnCommitAttachment = new TManualLoadTxnCommitAttachment();
loadTxnCommitAttachment.setLoadedRows(100);
loadTxnCommitAttachment.setLoadedBytes(10000);
loadTxnCommitAttachment.setFilteredRows(0);
TxnCommitAttachment txnCommitAttachment = new ManualLoadTxnCommitAttachment(loadTxnCommitAttachment);
TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(GlobalStateMgrTestUtil.testTableId1);
assertEquals(0, entity.counterStreamLoadRowsTotal.getValue().intValue());
assertEquals(0, entity.counterStreamLoadBytesTotal.getValue().intValue());
assertEquals(0, entity.counterStreamLoadFinishedTotal.getValue().intValue());
masterTransMgr.prepareTransaction(GlobalStateMgrTestUtil.testDbId1, transactionId, transTablets,
Lists.newArrayList(), null);
Lists.newArrayList(), txnCommitAttachment);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
assertEquals(TransactionStatus.PREPARED, transactionState.getTransactionStatus());

Expand All @@ -841,6 +855,9 @@ public void testPrepareTransaction() throws StarRocksException {
Set<Long> errorReplicaIds = Sets.newHashSet();
errorReplicaIds.add(GlobalStateMgrTestUtil.testReplicaId1);
masterTransMgr.finishTransaction(GlobalStateMgrTestUtil.testDbId1, transactionId, errorReplicaIds);
assertEquals(100, entity.counterStreamLoadRowsTotal.getValue().intValue());
assertEquals(10000, entity.counterStreamLoadBytesTotal.getValue().intValue());
assertEquals(1, entity.counterStreamLoadFinishedTotal.getValue().intValue());
transactionState = fakeEditLog.getTransaction(transactionId);
assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus());
// check replica version
Expand Down
Loading