diff --git a/core/src/main/java/org/apache/seata/core/context/RootContext.java b/core/src/main/java/org/apache/seata/core/context/RootContext.java index 85453afb5d1..4bcb94dc240 100644 --- a/core/src/main/java/org/apache/seata/core/context/RootContext.java +++ b/core/src/main/java/org/apache/seata/core/context/RootContext.java @@ -25,6 +25,7 @@ import org.apache.seata.common.exception.ShouldNeverHappenException; import org.apache.seata.common.util.StringUtils; import org.apache.seata.core.model.BranchType; +import org.apache.seata.core.model.TccLocalTxActive; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -60,6 +61,26 @@ private RootContext() { */ public static final String KEY_TIMEOUT = "TX_TIMEOUT"; + /** + * The constant KEY_RESOURCE_ID. + */ + public static final String KEY_RESOURCE_ID = "TX_RESOURCE_ID"; + + /** + * The constant KEY_TCC_LOCAL_TX_ACTIVE. + */ + public static final String KEY_TCC_LOCAL_TX_ACTIVE = "TCC_LOCAL_TX_ACTIVE"; + + /** + * The constant KEY_TCC_COMMIT_RESULT. + */ + public static final String KEY_TCC_COMMIT_RESULT = "KEY_TCC_COMMIT_RESULT"; + + /** + * The constant KEY_TCC_ROLLBACK_RESULT. + */ + public static final String KEY_TCC_ROLLBACK_RESULT = "KEY_TCC_ROLLBACK_RESULT"; + /** * The constant MDC_KEY_XID for logback * @since 1.5.0 @@ -142,6 +163,81 @@ public static void setTimeout(Integer timeout) { CONTEXT_HOLDER.put(KEY_TIMEOUT,timeout); } + public static String getBranchId() { + return (String) CONTEXT_HOLDER.get(KEY_BRANCHID); + } + + public static void bindBranchId(String branchId) { + CONTEXT_HOLDER.put(KEY_BRANCHID, branchId); + } + + public static void unbindBranchId() { + String branchId = (String) CONTEXT_HOLDER.remove(KEY_BRANCHID); + if (LOGGER.isDebugEnabled() && branchId != null) { + LOGGER.debug("unbind branch id"); + } + } + + public static String getResourceId() { + return (String) CONTEXT_HOLDER.get(KEY_RESOURCE_ID); + } + + public static void bindResourceId(String resourceId) { + CONTEXT_HOLDER.put(KEY_RESOURCE_ID, resourceId); + } + + public static void unbindResourceId() { + String resourceId = (String) CONTEXT_HOLDER.remove(KEY_RESOURCE_ID); + if (LOGGER.isDebugEnabled() && resourceId != null) { + LOGGER.debug("unbind tcc resource id"); + } + } + + public static TccLocalTxActive getTccLocalTxActive() { + return (TccLocalTxActive) CONTEXT_HOLDER.get(KEY_TCC_LOCAL_TX_ACTIVE); + } + + public static void bindTccLocalTxActive(TccLocalTxActive tccLocalTxActive) { + CONTEXT_HOLDER.put(KEY_TCC_LOCAL_TX_ACTIVE, tccLocalTxActive); + } + + public static void unbindTccLocalTxActive() { + TccLocalTxActive tccLocalTxActive = (TccLocalTxActive) CONTEXT_HOLDER.remove(KEY_TCC_LOCAL_TX_ACTIVE); + if (LOGGER.isDebugEnabled() && tccLocalTxActive != null) { + LOGGER.debug("unbind tcc local tx active identification"); + } + } + + public static Boolean getTccCommitResult() { + return (Boolean) CONTEXT_HOLDER.get(KEY_TCC_COMMIT_RESULT); + } + + public static void bindTccCommitResult(Boolean tccCommitResult) { + CONTEXT_HOLDER.put(KEY_TCC_COMMIT_RESULT, tccCommitResult); + } + + public static void unbindTccCommitResult() { + Boolean tccCommitResult = (Boolean) CONTEXT_HOLDER.remove(KEY_TCC_COMMIT_RESULT); + if (LOGGER.isDebugEnabled() && tccCommitResult != null) { + LOGGER.debug("unbind tcc commit result"); + } + } + + public static Boolean getTccRollbackResult() { + return (Boolean) CONTEXT_HOLDER.get(KEY_TCC_ROLLBACK_RESULT); + } + + public static void bindTccRollbackResult(Boolean tccRollbackResult) { + CONTEXT_HOLDER.put(KEY_TCC_ROLLBACK_RESULT, tccRollbackResult); + } + + public static void unbindTccRollbackResult() { + Boolean tccRollbackResult = (Boolean) CONTEXT_HOLDER.remove(KEY_TCC_ROLLBACK_RESULT); + if (LOGGER.isDebugEnabled() && tccRollbackResult != null) { + LOGGER.debug("unbind tcc rollback result"); + } + } + /** * declare local transactions will use global lock check for update/delete/insert/selectForUpdate SQL */ diff --git a/core/src/main/java/org/apache/seata/core/model/TccLocalTxActive.java b/core/src/main/java/org/apache/seata/core/model/TccLocalTxActive.java new file mode 100644 index 00000000000..d74428a74e7 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/model/TccLocalTxActive.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.model; + +/** + * Identifies whether tcc transactions are activated on the business side + */ +public enum TccLocalTxActive { + + /** + * The tcc transaction is not activated on the service side. + */ + UN_ACTIVE(0), + + /** + * The tcc transaction is activated on the service side. + */ + ACTIVE(1); + + + private final int code; + + TccLocalTxActive(int code) { + this.code = code; + } +} diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/DefaultCommonFenceHandler.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/DefaultCommonFenceHandler.java index 3e8307628d7..c393696c7ea 100644 --- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/DefaultCommonFenceHandler.java +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/DefaultCommonFenceHandler.java @@ -45,9 +45,9 @@ private void check() { } @Override - public Object prepareFence(String xid, Long branchId, String actionName, Callback targetCallback) { + public Object prepareFence(Method prepareMethod, Object targetTCCBean, String xid, Long branchId, String actionName, Callback targetCallback) { check(); - return fenceHandler.prepareFence(xid, branchId, actionName, targetCallback); + return fenceHandler.prepareFence(prepareMethod, targetTCCBean, xid, branchId, actionName, targetCallback); } @Override diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/FenceHandler.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/FenceHandler.java index 11978290bff..083f0e7a013 100644 --- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/FenceHandler.java +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/FenceHandler.java @@ -24,7 +24,7 @@ public interface FenceHandler { - Object prepareFence(String xid, Long branchId, String actionName, Callback targetCallback); + Object prepareFence(Method prepareMethod, Object targetTCCBean, String xid, Long branchId, String actionName, Callback targetCallback); boolean commitFence(Method commitMethod, Object targetTCCBean, String xid, Long branchId, Object[] args); diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/ActionInterceptorHandler.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/ActionInterceptorHandler.java index b3600cdc55a..d4dd3a0979d 100644 --- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/ActionInterceptorHandler.java +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/ActionInterceptorHandler.java @@ -16,14 +16,6 @@ */ package org.apache.seata.integration.tx.api.interceptor; -import javax.annotation.Nonnull; -import java.lang.annotation.Annotation; -import java.lang.reflect.Method; -import java.lang.reflect.UndeclaredThrowableException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - import org.apache.seata.common.Constants; import org.apache.seata.common.exception.FrameworkException; import org.apache.seata.common.exception.SkipCallbackWrapperException; @@ -42,6 +34,14 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import javax.annotation.Nonnull; +import java.lang.annotation.Annotation; +import java.lang.reflect.Method; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + /** * Handler the Tx Participant Aspect : Setting Context, Creating Branch Record * @@ -53,16 +53,17 @@ public class ActionInterceptorHandler { /** * Handler the Tx Aspect * - * @param method the method - * @param arguments the arguments + * @param invocation the invocation wrapper * @param xid the xid * @param businessActionParam the business action params - * @param targetCallback the target callback * @return the business result * @throws Throwable the throwable */ - public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessActionParam businessActionParam, - Callback targetCallback) throws Throwable { + public Object proceed(InvocationWrapper invocation, String xid, TwoPhaseBusinessActionParam businessActionParam) throws Throwable { + Method method = invocation.getMethod(); + Object targetTCCBean = invocation.getProxy(); + Object[] arguments = invocation.getArguments(); + Callback targetCallback = invocation::proceed; //Get action context from arguments, or create a new one and then reset to arguments BusinessActionContext actionContext = getOrCreateActionContextAndResetToArguments(method.getParameterTypes(), arguments); @@ -87,11 +88,13 @@ public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBus try { //share actionContext implicitly BusinessActionContextUtil.setContext(actionContext); + RootContext.bindBranchId(branchId); + RootContext.bindResourceId(actionName); if (businessActionParam.getUseCommonFence()) { try { // Use common Fence, and return the business result - return DefaultCommonFenceHandler.get().prepareFence(xid, Long.valueOf(branchId), actionName, targetCallback); + return DefaultCommonFenceHandler.get().prepareFence(method, targetTCCBean, xid, Long.valueOf(branchId), actionName, targetCallback); } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) { Throwable originException = e.getCause(); if (originException instanceof FrameworkException) { @@ -104,6 +107,8 @@ public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBus return targetCallback.execute(); } } finally { + RootContext.unbindBranchId(); + RootContext.unbindResourceId(); try { //to report business action context finally if the actionContext.getUpdated() is true BusinessActionContextUtil.reportContext(actionContext); diff --git a/spring/src/main/java/org/apache/seata/rm/fence/SpringFenceHandler.java b/spring/src/main/java/org/apache/seata/rm/fence/SpringFenceHandler.java index 95ada3dc090..dca95634d31 100644 --- a/spring/src/main/java/org/apache/seata/rm/fence/SpringFenceHandler.java +++ b/spring/src/main/java/org/apache/seata/rm/fence/SpringFenceHandler.java @@ -16,23 +16,13 @@ */ package org.apache.seata.rm.fence; -import java.lang.reflect.Method; -import java.sql.Connection; -import java.util.ArrayList; -import java.util.Date; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import javax.sql.DataSource; - import org.apache.seata.common.exception.ExceptionUtil; import org.apache.seata.common.exception.FrameworkErrorCode; import org.apache.seata.common.exception.SkipCallbackWrapperException; import org.apache.seata.common.executor.Callback; import org.apache.seata.common.thread.NamedThreadFactory; +import org.apache.seata.core.context.RootContext; +import org.apache.seata.core.model.TccLocalTxActive; import org.apache.seata.integration.tx.api.fence.DefaultCommonFenceHandler; import org.apache.seata.integration.tx.api.fence.FenceHandler; import org.apache.seata.integration.tx.api.fence.constant.CommonFenceConstant; @@ -41,12 +31,32 @@ import org.apache.seata.integration.tx.api.fence.store.CommonFenceStore; import org.apache.seata.integration.tx.api.fence.store.db.CommonFenceStoreDataBaseDAO; import org.apache.seata.integration.tx.api.remoting.TwoPhaseResult; +import org.apache.seata.rm.fence.interceptor.TccFenceCommitInterceptor; +import org.apache.seata.rm.fence.interceptor.TccFencePrepareInterceptor; +import org.apache.seata.rm.fence.interceptor.TccFenceRollbackInterceptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.aop.Advisor; +import org.springframework.aop.framework.Advised; import org.springframework.jdbc.datasource.DataSourceUtils; import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.interceptor.TransactionAttributeSource; +import org.springframework.transaction.interceptor.TransactionInterceptor; +import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionTemplate; +import javax.sql.DataSource; +import java.lang.reflect.Method; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Date; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + /** * Common Fence Handler(idempotent, non_rollback, suspend) * @@ -55,9 +65,9 @@ public class SpringFenceHandler implements FenceHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SpringFenceHandler.class); - private static final CommonFenceStore COMMON_FENCE_DAO = CommonFenceStoreDataBaseDAO.getInstance(); + protected static final CommonFenceStore COMMON_FENCE_DAO = CommonFenceStoreDataBaseDAO.getInstance(); - private static DataSource dataSource; + protected static DataSource dataSource; private static TransactionTemplate transactionTemplate; @@ -100,6 +110,8 @@ public static void setTransactionTemplate(TransactionTemplate transactionTemplat /** * common prepare method enhanced * + * @param prepareMethod prepare method + * @param targetTCCBean target common bean * @param xid the global transaction id * @param branchId the branch transaction id * @param actionName the action name @@ -107,30 +119,46 @@ public static void setTransactionTemplate(TransactionTemplate transactionTemplat * @return the boolean */ @Override - public Object prepareFence(String xid, Long branchId, String actionName, Callback targetCallback) { - return transactionTemplate.execute(status -> { + public Object prepareFence(Method prepareMethod, Object targetTCCBean, String xid, Long branchId, String actionName, Callback targetCallback) { + if (hasTransactionAspectBeforeInterceptor(prepareMethod, targetTCCBean, TccFencePrepareInterceptor.class)) { try { - Connection conn = DataSourceUtils.getConnection(dataSource); - boolean result = insertCommonFenceLog(conn, xid, branchId, actionName, CommonFenceConstant.STATUS_TRIED); - LOGGER.info("Common fence prepare result: {}. xid: {}, branchId: {}", result, xid, branchId); - if (result) { - return targetCallback.execute(); - } else { - throw new CommonFenceException(String.format("Insert common fence record error, prepare fence failed. xid= %s, branchId= %s", xid, branchId), - FrameworkErrorCode.InsertRecordError); - } - } catch (CommonFenceException e) { - if (e.getErrcode() == FrameworkErrorCode.DuplicateKeyException) { - LOGGER.error("Branch transaction has already rollbacked before,prepare fence failed. xid= {},branchId = {}", xid, branchId); - addToLogCleanQueue(xid, branchId); - } - status.setRollbackOnly(); - throw new SkipCallbackWrapperException(e); + // The tcc transaction is activated on the service side. + RootContext.bindTccLocalTxActive(TccLocalTxActive.ACTIVE); + return targetCallback.execute(); } catch (Throwable t) { - status.setRollbackOnly(); - throw new SkipCallbackWrapperException(t); + throw new SkipCallbackWrapperException(ExceptionUtil.unwrap(t)); + } finally { + RootContext.unbindTccLocalTxActive(); } - }); + } else { + return transactionTemplate.execute(status -> { + try { + // The tcc transaction is not activated on the service side. + RootContext.bindTccLocalTxActive(TccLocalTxActive.UN_ACTIVE); + Connection conn = DataSourceUtils.getConnection(dataSource); + boolean result = insertCommonFenceLog(conn, xid, branchId, actionName, CommonFenceConstant.STATUS_TRIED); + LOGGER.info("Common fence prepare result: {}. xid: {}, branchId: {}", result, xid, branchId); + if (result) { + return targetCallback.execute(); + } else { + throw new CommonFenceException(String.format("Insert common fence record error, prepare fence failed. xid= %s, branchId= %s", xid, branchId), + FrameworkErrorCode.InsertRecordError); + } + } catch (CommonFenceException e) { + if (e.getErrcode() == FrameworkErrorCode.DuplicateKeyException) { + LOGGER.error("Branch transaction has already rollbacked before,prepare fence failed. xid= {},branchId = {}", xid, branchId); + addToLogCleanQueue(xid, branchId); + } + status.setRollbackOnly(); + throw new SkipCallbackWrapperException(e); + } catch (Throwable t) { + status.setRollbackOnly(); + throw new SkipCallbackWrapperException(t); + } finally { + RootContext.unbindTccLocalTxActive(); + } + }); + } } /** @@ -145,33 +173,53 @@ public Object prepareFence(String xid, Long branchId, String actionName, Callbac */ @Override public boolean commitFence(Method commitMethod, Object targetTCCBean, - String xid, Long branchId, Object[] args) { - return transactionTemplate.execute(status -> { + String xid, Long branchId, Object[] args) { + if (hasTransactionAspectBeforeInterceptor(commitMethod, targetTCCBean, TccFenceCommitInterceptor.class)) { try { - Connection conn = DataSourceUtils.getConnection(dataSource); - CommonFenceDO commonFenceDO = COMMON_FENCE_DAO.queryCommonFenceDO(conn, xid, branchId); - if (commonFenceDO == null) { - throw new CommonFenceException(String.format("Common fence record not exists, commit fence method failed. xid= %s, branchId= %s", xid, branchId), - FrameworkErrorCode.RecordNotExists); - } - if (CommonFenceConstant.STATUS_COMMITTED == commonFenceDO.getStatus()) { - LOGGER.info("Branch transaction has already committed before. idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, commonFenceDO.getStatus()); - return true; - } - if (CommonFenceConstant.STATUS_ROLLBACKED == commonFenceDO.getStatus() || CommonFenceConstant.STATUS_SUSPENDED == commonFenceDO.getStatus()) { - if (LOGGER.isWarnEnabled()) { - LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, commonFenceDO.getStatus()); - } - return false; - } - boolean result = updateStatusAndInvokeTargetMethod(conn, commitMethod, targetTCCBean, xid, branchId, CommonFenceConstant.STATUS_COMMITTED, status, args); - LOGGER.info("Common fence commit result: {}. xid: {}, branchId: {}", result, xid, branchId); + // The tcc transaction is activated on the service side. + RootContext.bindTccLocalTxActive(TccLocalTxActive.ACTIVE); + RootContext.bindTccCommitResult(false); + commitMethod.invoke(targetTCCBean, args); + Boolean result = RootContext.getTccCommitResult(); return result; - } catch (Throwable t) { - status.setRollbackOnly(); - throw new SkipCallbackWrapperException(t); + } catch (Throwable e) { + throw new SkipCallbackWrapperException(ExceptionUtil.unwrap(e)); + } finally { + RootContext.unbindTccLocalTxActive(); + RootContext.unbindTccCommitResult(); } - }); + } else { + return transactionTemplate.execute(status -> { + try { + // The tcc transaction is not activated on the service side. + RootContext.bindTccLocalTxActive(TccLocalTxActive.UN_ACTIVE); + Connection conn = DataSourceUtils.getConnection(dataSource); + CommonFenceDO commonFenceDO = COMMON_FENCE_DAO.queryCommonFenceDO(conn, xid, branchId); + if (commonFenceDO == null) { + throw new CommonFenceException(String.format("Common fence record not exists, commit fence method failed. xid= %s, branchId= %s", xid, branchId), + FrameworkErrorCode.RecordNotExists); + } + if (CommonFenceConstant.STATUS_COMMITTED == commonFenceDO.getStatus()) { + LOGGER.info("Branch transaction has already committed before. idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, commonFenceDO.getStatus()); + return true; + } + if (CommonFenceConstant.STATUS_ROLLBACKED == commonFenceDO.getStatus() || CommonFenceConstant.STATUS_SUSPENDED == commonFenceDO.getStatus()) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, commonFenceDO.getStatus()); + } + return false; + } + boolean result = updateStatusAndInvokeTargetMethod(conn, commitMethod, targetTCCBean, xid, branchId, CommonFenceConstant.STATUS_COMMITTED, status, args); + LOGGER.info("Common fence commit result: {}. xid: {}, branchId: {}", result, xid, branchId); + return result; + } catch (Throwable t) { + status.setRollbackOnly(); + throw new SkipCallbackWrapperException(t); + } finally { + RootContext.unbindTccLocalTxActive(); + } + }); + } } /** @@ -187,40 +235,111 @@ public boolean commitFence(Method commitMethod, Object targetTCCBean, */ @Override public boolean rollbackFence(Method rollbackMethod, Object targetTCCBean, - String xid, Long branchId, Object[] args, String actionName) { - return transactionTemplate.execute(status -> { + String xid, Long branchId, Object[] args, String actionName) { + if (hasTransactionAspectBeforeInterceptor(rollbackMethod, targetTCCBean, TccFenceRollbackInterceptor.class)) { try { - Connection conn = DataSourceUtils.getConnection(dataSource); - CommonFenceDO commonFenceDO = COMMON_FENCE_DAO.queryCommonFenceDO(conn, xid, branchId); - // non_rollback - if (commonFenceDO == null) { - boolean result = insertCommonFenceLog(conn, xid, branchId, actionName, CommonFenceConstant.STATUS_SUSPENDED); - LOGGER.info("Insert common fence record result: {}. xid: {}, branchId: {}", result, xid, branchId); - if (!result) { - throw new CommonFenceException(String.format("Insert common fence record error, rollback fence method failed. xid= %s, branchId= %s", xid, branchId), - FrameworkErrorCode.InsertRecordError); - } - return true; - } else { - if (CommonFenceConstant.STATUS_ROLLBACKED == commonFenceDO.getStatus() || CommonFenceConstant.STATUS_SUSPENDED == commonFenceDO.getStatus()) { - LOGGER.info("Branch transaction had already rollbacked before, idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, commonFenceDO.getStatus()); + // The tcc transaction is activated on the service side. + RootContext.bindTccLocalTxActive(TccLocalTxActive.ACTIVE); + RootContext.bindTccRollbackResult(false); + rollbackMethod.invoke(targetTCCBean, args); + Boolean result = RootContext.getTccRollbackResult(); + return result; + } catch (Exception e) { + throw new SkipCallbackWrapperException(ExceptionUtil.unwrap(e)); + } finally { + RootContext.unbindTccLocalTxActive(); + RootContext.unbindTccRollbackResult(); + } + } else { + return transactionTemplate.execute(status -> { + try { + // The tcc transaction is not activated on the service side. + RootContext.bindTccLocalTxActive(TccLocalTxActive.UN_ACTIVE); + Connection conn = DataSourceUtils.getConnection(dataSource); + CommonFenceDO commonFenceDO = COMMON_FENCE_DAO.queryCommonFenceDO(conn, xid, branchId); + // non_rollback + if (commonFenceDO == null) { + boolean result = insertCommonFenceLog(conn, xid, branchId, actionName, CommonFenceConstant.STATUS_SUSPENDED); + LOGGER.info("Insert common fence record result: {}. xid: {}, branchId: {}", result, xid, branchId); + if (!result) { + throw new CommonFenceException(String.format("Insert common fence record error, rollback fence method failed. xid= %s, branchId= %s", xid, branchId), + FrameworkErrorCode.InsertRecordError); + } return true; + } else { + if (CommonFenceConstant.STATUS_ROLLBACKED == commonFenceDO.getStatus() || CommonFenceConstant.STATUS_SUSPENDED == commonFenceDO.getStatus()) { + LOGGER.info("Branch transaction had already rollbacked before, idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, commonFenceDO.getStatus()); + return true; + } + if (CommonFenceConstant.STATUS_COMMITTED == commonFenceDO.getStatus()) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, commonFenceDO.getStatus()); + } + return false; + } } - if (CommonFenceConstant.STATUS_COMMITTED == commonFenceDO.getStatus()) { - if (LOGGER.isWarnEnabled()) { - LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, commonFenceDO.getStatus()); + boolean result = updateStatusAndInvokeTargetMethod(conn, rollbackMethod, targetTCCBean, xid, branchId, CommonFenceConstant.STATUS_ROLLBACKED, status, args); + LOGGER.info("Common fence rollback result: {}. xid: {}, branchId: {}", result, xid, branchId); + return result; + } catch (Throwable t) { + status.setRollbackOnly(); + Throwable cause = t.getCause(); + if (cause != null && cause instanceof SQLException) { + SQLException sqlException = (SQLException) cause; + String sqlState = sqlException.getSQLState(); + int errorCode = sqlException.getErrorCode(); + if ("40001".equals(sqlState) && errorCode == 1213) { + // MySQL deadlock exception + LOGGER.error("Common fence rollback fail. xid: {}, branchId: {}, This exception may be due to the deadlock caused by the transaction isolation level being Repeatable Read. The seata server will try to roll back again, so you can ignore this exception. (To avoid this exception, you can set transaction isolation to Read Committed.)", xid, branchId); } - return false; } + throw new SkipCallbackWrapperException(t); + } finally { + RootContext.unbindTccLocalTxActive(); + } + }); + } + } + + /** + * Check whether the transaction has been activated + * @return boolean + */ + public static boolean isTransactionActive() { + return TransactionSynchronizationManager.isActualTransactionActive(); + } + + /** + * Check that the business method's transaction interceptor precedes the target interceptor + * @param method business method + * @param proxy proxy + * @param targetInterceptorClass the target interceptor class + * @return boolean + */ + public static boolean hasTransactionAspectBeforeInterceptor(Method method, Object proxy, Class targetInterceptorClass) { + // Gets the interceptor for the proxy object + if (proxy instanceof Advised) { + Advised advised = (Advised) proxy; + Advisor[] advisors = advised.getAdvisors(); + + // Iterate over all interceptors to check if the transaction plane is before the target interceptor + boolean foundTransactionAspect = false; + for (Advisor advisor : advisors) { + if (targetInterceptorClass.isInstance(advisor.getAdvice())) { + break; + } + if (advisor.getAdvice() instanceof TransactionInterceptor) { + // Check if the method is matched by the transaction interceptor's transaction attribute source + TransactionAttributeSource attributeSource = ((TransactionInterceptor) advisor.getAdvice()).getTransactionAttributeSource(); + if (attributeSource != null) { + foundTransactionAspect = attributeSource.getTransactionAttribute(method, proxy.getClass()) != null; + } + break; } - boolean result = updateStatusAndInvokeTargetMethod(conn, rollbackMethod, targetTCCBean, xid, branchId, CommonFenceConstant.STATUS_ROLLBACKED, status, args); - LOGGER.info("Common fence rollback result: {}. xid: {}, branchId: {}", result, xid, branchId); - return result; - } catch (Throwable t) { - status.setRollbackOnly(); - throw new SkipCallbackWrapperException(t); } - }); + return foundTransactionAspect; + } + return false; } /** @@ -232,7 +351,7 @@ public boolean rollbackFence(Method rollbackMethod, Object targetTCCBean, * @param status the status * @return the boolean */ - private static boolean insertCommonFenceLog(Connection conn, String xid, Long branchId, String actionName, Integer status) { + protected static boolean insertCommonFenceLog(Connection conn, String xid, Long branchId, String actionName, Integer status) { CommonFenceDO commonFenceDO = new CommonFenceDO(); commonFenceDO.setXid(xid); commonFenceDO.setBranchId(branchId); @@ -342,7 +461,7 @@ private static void initLogCleanExecutor() { logCleanExecutor.submit(fenceLogCleanRunnable); } - private static void addToLogCleanQueue(final String xid, final long branchId) { + protected static void addToLogCleanQueue(final String xid, final long branchId) { FenceLogIdentity logIdentity = new FenceLogIdentity(); logIdentity.setXid(xid); logIdentity.setBranchId(branchId); diff --git a/spring/src/main/java/org/apache/seata/rm/fence/interceptor/TccFenceCommitInterceptor.java b/spring/src/main/java/org/apache/seata/rm/fence/interceptor/TccFenceCommitInterceptor.java new file mode 100644 index 00000000000..ca696c7f472 --- /dev/null +++ b/spring/src/main/java/org/apache/seata/rm/fence/interceptor/TccFenceCommitInterceptor.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.rm.fence.interceptor; + +import org.aopalliance.intercept.MethodInterceptor; +import org.aopalliance.intercept.MethodInvocation; +import org.apache.seata.common.exception.ExceptionUtil; +import org.apache.seata.common.exception.FrameworkErrorCode; +import org.apache.seata.core.context.RootContext; +import org.apache.seata.core.model.TccLocalTxActive; +import org.apache.seata.integration.tx.api.fence.constant.CommonFenceConstant; +import org.apache.seata.integration.tx.api.fence.exception.CommonFenceException; +import org.apache.seata.integration.tx.api.fence.store.CommonFenceDO; +import org.apache.seata.integration.tx.api.remoting.TwoPhaseResult; +import org.apache.seata.rm.fence.SpringFenceHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.datasource.DataSourceUtils; + +import java.sql.Connection; + +public class TccFenceCommitInterceptor extends SpringFenceHandler implements MethodInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(TccFenceCommitInterceptor.class); + + @Override + public Object invoke(MethodInvocation invocation) throws Throwable { + TccLocalTxActive tccLocalTxActive = RootContext.getTccLocalTxActive(); + if (tccLocalTxActive == null || TccLocalTxActive.UN_ACTIVE == tccLocalTxActive) { + // The tcc transaction is not activated on the service side. + return invocation.proceed(); + } + // The tcc transaction is activated on the service side. + // Do not wrap business exceptions to avoid invalidation of transactional annotations on the business side + String xid = RootContext.getXID(); + Long branchId = Long.valueOf(RootContext.getBranchId()); + String resourceId = RootContext.getResourceId(); + Connection conn = DataSourceUtils.getConnection(dataSource); + CommonFenceDO commonFenceDO = COMMON_FENCE_DAO.queryCommonFenceDO(conn, xid, branchId); + if (commonFenceDO == null) { + throw new CommonFenceException(String.format("Common fence record not exists, commit fence method failed. xid= %s, branchId= %s, resourceId= %s", xid, branchId, resourceId), + FrameworkErrorCode.RecordNotExists); + } + if (CommonFenceConstant.STATUS_COMMITTED == commonFenceDO.getStatus()) { + LOGGER.info("Branch transaction has already committed before. idempotency rejected. xid: {}, branchId: {}, resourceId: {}, status: {}", xid, branchId, resourceId, commonFenceDO.getStatus()); + boolean result = true; + RootContext.bindTccCommitResult(result); + return result; + } + if (CommonFenceConstant.STATUS_ROLLBACKED == commonFenceDO.getStatus() || CommonFenceConstant.STATUS_SUSPENDED == commonFenceDO.getStatus()) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, resourceId: {}, status: {}", xid, branchId, resourceId, commonFenceDO.getStatus()); + } + boolean result = false; + RootContext.bindTccCommitResult(result); + return result; + } + boolean result = updateStatusAndInvokeTargetMethodForCommit(conn, invocation, xid, branchId, CommonFenceConstant.STATUS_COMMITTED); + LOGGER.info("Common fence commit result: {}. xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId); + RootContext.bindTccCommitResult(result); + return result; + } + + + /** + * Update Common Fence status and invoke target method for commit + * + * @param conn connection + * @param invocation invocation + * @param xid the global transaction id + * @param branchId the branch transaction id + * @param status the common fence status + * @return the boolean + */ + protected static boolean updateStatusAndInvokeTargetMethodForCommit(Connection conn, MethodInvocation invocation, + String xid, Long branchId, int status) throws Throwable { + boolean result = COMMON_FENCE_DAO.updateCommonFenceDO(conn, xid, branchId, status, CommonFenceConstant.STATUS_TRIED); + if (result) { + try { + // invoke two phase method + Object ret = invocation.proceed(); + if (null != ret) { + if (ret instanceof TwoPhaseResult) { + result = ((TwoPhaseResult) ret).isSuccess(); + } else { + result = (boolean) ret; + } + // If the business execution result is false, the transaction will be rolled back + if (!result) { + // Trigger rollback + throw new RuntimeException("the tcc fence tx failed to commit, please try again"); + } + } + } catch (Exception e) { + throw ExceptionUtil.unwrap(e); + } + } + return result; + } +} diff --git a/spring/src/main/java/org/apache/seata/rm/fence/interceptor/TccFencePrepareInterceptor.java b/spring/src/main/java/org/apache/seata/rm/fence/interceptor/TccFencePrepareInterceptor.java new file mode 100644 index 00000000000..50a55fde3a5 --- /dev/null +++ b/spring/src/main/java/org/apache/seata/rm/fence/interceptor/TccFencePrepareInterceptor.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.rm.fence.interceptor; + +import org.aopalliance.intercept.MethodInterceptor; +import org.aopalliance.intercept.MethodInvocation; +import org.apache.seata.common.exception.FrameworkErrorCode; +import org.apache.seata.common.exception.SkipCallbackWrapperException; +import org.apache.seata.core.context.RootContext; +import org.apache.seata.core.model.TccLocalTxActive; +import org.apache.seata.integration.tx.api.fence.constant.CommonFenceConstant; +import org.apache.seata.integration.tx.api.fence.exception.CommonFenceException; +import org.apache.seata.rm.fence.SpringFenceHandler; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.datasource.DataSourceUtils; + +import java.sql.Connection; + +public class TccFencePrepareInterceptor extends SpringFenceHandler implements MethodInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(TccFencePrepareInterceptor.class); + + @Override + public Object invoke(@NotNull MethodInvocation invocation) throws Throwable { + TccLocalTxActive tccLocalTxActive = RootContext.getTccLocalTxActive(); + if (tccLocalTxActive == null || TccLocalTxActive.UN_ACTIVE == tccLocalTxActive) { + // The tcc transaction is not activated on the service side. + return invocation.proceed(); + } + + // The tcc transaction is activated on the service side. + // Do not wrap business exceptions to avoid invalidation of transactional annotations on the business side + String xid = null, resourceId = null; + Long branchId = null; + try { + xid = RootContext.getXID(); + branchId = Long.valueOf(RootContext.getBranchId()); + resourceId = RootContext.getResourceId(); + Connection conn = DataSourceUtils.getConnection(dataSource); + boolean result = insertCommonFenceLog(conn, xid, branchId, resourceId, CommonFenceConstant.STATUS_TRIED); + LOGGER.info("Common fence prepare result: {}. xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId); + if (result) { + return invocation.proceed(); + } else { + throw new CommonFenceException(String.format("Insert common fence record error, prepare fence failed. xid= %s, branchId= %s, resourceId= %s", xid, branchId, resourceId), + FrameworkErrorCode.InsertRecordError); + } + } catch (CommonFenceException e) { + if (e.getErrcode() == FrameworkErrorCode.DuplicateKeyException) { + LOGGER.error("Branch transaction has already rollbacked before,prepare fence failed. xid= {},branchId = {},resourceId = {}", xid, branchId, resourceId); + addToLogCleanQueue(xid, branchId); + } + throw new SkipCallbackWrapperException(e); + } + } +} diff --git a/spring/src/main/java/org/apache/seata/rm/fence/interceptor/TccFenceRollbackInterceptor.java b/spring/src/main/java/org/apache/seata/rm/fence/interceptor/TccFenceRollbackInterceptor.java new file mode 100644 index 00000000000..6c720435df2 --- /dev/null +++ b/spring/src/main/java/org/apache/seata/rm/fence/interceptor/TccFenceRollbackInterceptor.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.rm.fence.interceptor; + +import org.aopalliance.intercept.MethodInterceptor; +import org.aopalliance.intercept.MethodInvocation; +import org.apache.seata.common.exception.ExceptionUtil; +import org.apache.seata.common.exception.FrameworkErrorCode; +import org.apache.seata.core.context.RootContext; +import org.apache.seata.core.model.TccLocalTxActive; +import org.apache.seata.integration.tx.api.fence.constant.CommonFenceConstant; +import org.apache.seata.integration.tx.api.fence.exception.CommonFenceException; +import org.apache.seata.integration.tx.api.fence.store.CommonFenceDO; +import org.apache.seata.integration.tx.api.remoting.TwoPhaseResult; +import org.apache.seata.rm.fence.SpringFenceHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.datasource.DataSourceUtils; + +import java.sql.Connection; + +public class TccFenceRollbackInterceptor extends SpringFenceHandler implements MethodInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(TccFenceRollbackInterceptor.class); + + @Override + public Object invoke(MethodInvocation invocation) throws Throwable { + TccLocalTxActive tccLocalTxActive = RootContext.getTccLocalTxActive(); + if (tccLocalTxActive == null || TccLocalTxActive.UN_ACTIVE == tccLocalTxActive) { + // The tcc transaction is not activated on the service side. + return invocation.proceed(); + } + + // The tcc transaction is activated on the service side. + // Do not wrap business exceptions to avoid invalidation of transactional annotations on the business side + String xid = RootContext.getXID(); + Long branchId = Long.valueOf(RootContext.getBranchId()); + String resourceId = RootContext.getResourceId(); + Connection conn = DataSourceUtils.getConnection(dataSource); + CommonFenceDO commonFenceDO = COMMON_FENCE_DAO.queryCommonFenceDO(conn, xid, branchId); + // non_rollback + if (commonFenceDO == null) { + boolean result = insertCommonFenceLog(conn, xid, branchId, resourceId, CommonFenceConstant.STATUS_SUSPENDED); + LOGGER.info("Insert common fence record result: {}. xid: {}, branchId: {}", result, xid, branchId); + if (!result) { + throw new CommonFenceException(String.format("Insert common fence record error, rollback fence method failed. xid= %s, branchId= %s", xid, branchId), + FrameworkErrorCode.InsertRecordError); + } + boolean rollbackResult = true; + RootContext.bindTccRollbackResult(rollbackResult); + return rollbackResult; + } else { + if (CommonFenceConstant.STATUS_ROLLBACKED == commonFenceDO.getStatus() || CommonFenceConstant.STATUS_SUSPENDED == commonFenceDO.getStatus()) { + LOGGER.info("Branch transaction had already rollbacked before, idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, commonFenceDO.getStatus()); + boolean rollbackResult = true; + RootContext.bindTccRollbackResult(rollbackResult); + return rollbackResult; + } + if (CommonFenceConstant.STATUS_COMMITTED == commonFenceDO.getStatus()) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, commonFenceDO.getStatus()); + } + boolean rollbackResult = false; + RootContext.bindTccRollbackResult(rollbackResult); + return rollbackResult; + } + } + boolean result = updateStatusAndInvokeTargetMethodForRollback(conn, invocation, xid, branchId, CommonFenceConstant.STATUS_ROLLBACKED); + LOGGER.info("Common fence rollback result: {}. xid: {}, branchId: {}", result, xid, branchId); + RootContext.bindTccRollbackResult(result); + return result; + } + + + /** + * Update Common Fence status and invoke target method for rollback + * + * @param conn connection + * @param invocation invocation + * @param xid the global transaction id + * @param branchId the branch transaction id + * @param status the common fence status + * @return the boolean + */ + protected static boolean updateStatusAndInvokeTargetMethodForRollback(Connection conn, MethodInvocation invocation, + String xid, Long branchId, int status) throws Throwable { + boolean result = COMMON_FENCE_DAO.updateCommonFenceDO(conn, xid, branchId, status, CommonFenceConstant.STATUS_TRIED); + if (result) { + try { + // invoke two phase method + Object ret = invocation.proceed(); + if (null != ret) { + if (ret instanceof TwoPhaseResult) { + result = ((TwoPhaseResult) ret).isSuccess(); + } else { + result = (boolean) ret; + } + // If the business execution result is false, the transaction will be rolled back + if (!result) { + // Trigger rollback + throw new RuntimeException("the tcc fence tx failed to rollback, please try again"); + } + } + } catch (Exception e) { + throw ExceptionUtil.unwrap(e); + } + } + return result; + } +} diff --git a/spring/src/main/java/org/apache/seata/spring/annotation/AdapterInvocationWrapper.java b/spring/src/main/java/org/apache/seata/spring/annotation/AdapterInvocationWrapper.java index 08a67e11aac..c96b391b15e 100644 --- a/spring/src/main/java/org/apache/seata/spring/annotation/AdapterInvocationWrapper.java +++ b/spring/src/main/java/org/apache/seata/spring/annotation/AdapterInvocationWrapper.java @@ -20,14 +20,21 @@ import org.apache.seata.integration.tx.api.interceptor.InvocationWrapper; import org.aopalliance.intercept.MethodInvocation; +import org.springframework.aop.framework.ReflectiveMethodInvocation; public class AdapterInvocationWrapper implements InvocationWrapper { - private MethodInvocation invocation; - + private final MethodInvocation invocation; + private final Object proxy; public AdapterInvocationWrapper(MethodInvocation invocation) { this.invocation = invocation; + if (invocation instanceof ReflectiveMethodInvocation) { + ReflectiveMethodInvocation reflectiveInvocation = (ReflectiveMethodInvocation) invocation; + this.proxy = reflectiveInvocation.getProxy(); + } else { + this.proxy = null; + } } @Override @@ -37,7 +44,7 @@ public Method getMethod() { @Override public Object getProxy() { - return null; + return this.proxy; } @Override diff --git a/spring/src/main/java/org/apache/seata/spring/annotation/GlobalTransactionScanner.java b/spring/src/main/java/org/apache/seata/spring/annotation/GlobalTransactionScanner.java index 2170e59958f..c11f9c00670 100644 --- a/spring/src/main/java/org/apache/seata/spring/annotation/GlobalTransactionScanner.java +++ b/spring/src/main/java/org/apache/seata/spring/annotation/GlobalTransactionScanner.java @@ -17,11 +17,7 @@ package org.apache.seata.spring.annotation; import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.Set; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; @@ -30,6 +26,7 @@ import org.aopalliance.aop.Advice; import org.aopalliance.intercept.MethodInterceptor; import org.apache.commons.lang.ArrayUtils; +import org.apache.seata.common.exception.ShouldNeverHappenException; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.StringUtils; import org.apache.seata.config.CachedConfigurationChangeListener; @@ -50,6 +47,11 @@ import org.apache.seata.integration.tx.api.interceptor.parser.NeedEnhanceEnum; import org.apache.seata.integration.tx.api.remoting.parser.DefaultRemotingParser; import org.apache.seata.rm.RMClient; +import org.apache.seata.rm.fence.interceptor.TccFenceCommitInterceptor; +import org.apache.seata.rm.fence.interceptor.TccFencePrepareInterceptor; +import org.apache.seata.rm.fence.interceptor.TccFenceRollbackInterceptor; +import org.apache.seata.rm.tcc.api.LocalTCC; +import org.apache.seata.rm.tcc.api.TwoPhaseBusinessAction; import org.apache.seata.spring.annotation.scannercheckers.PackageScannerChecker; import org.apache.seata.spring.remoting.parser.RemotingFactoryBeanParser; import org.apache.seata.spring.util.OrderUtil; @@ -61,9 +63,13 @@ import org.slf4j.LoggerFactory; import org.springframework.aop.Advisor; import org.springframework.aop.TargetSource; +import org.springframework.aop.aspectj.AspectJExpressionPointcut; import org.springframework.aop.framework.AdvisedSupport; +import org.springframework.aop.framework.AopProxyUtils; +import org.springframework.aop.framework.ProxyFactory; import org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator; import org.springframework.aop.support.AopUtils; +import org.springframework.aop.support.DefaultPointcutAdvisor; import org.springframework.beans.BeansException; import org.springframework.beans.PropertyValue; import org.springframework.beans.factory.DisposableBean; @@ -76,6 +82,8 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.core.Ordered; +import org.springframework.core.annotation.AnnotatedElementUtils; +import org.springframework.transaction.interceptor.TransactionInterceptor; import static org.apache.seata.common.DefaultValues.DEFAULT_DISABLE_GLOBAL_TRANSACTION; import static org.apache.seata.common.DefaultValues.DEFAULT_TX_GROUP; @@ -327,6 +335,7 @@ protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) advised.addAdvisor(pos, avr); } } + addTccFenceAdvisorsForBean(bean); PROXYED_SET.add(beanName); return bean; } @@ -335,6 +344,161 @@ protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) } } + /** + * Add tcc fence advisors to the bean + * @param bean bean + * @throws Exception + */ + private static void addTccFenceAdvisorsForBean(Object bean) throws Exception { + List advisorsBeforeTransactionalInterceptor = getTccFenceAdvisorsForBean(bean); + AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean); + if (CollectionUtils.isNotEmpty(advisorsBeforeTransactionalInterceptor)) { + for (Advisor avr : advisorsBeforeTransactionalInterceptor) { + addAdvisorAfterTransactionalInterceptor(avr, advised); + } + } + } + + /** + * get tcc fence advisors from Bean + * @param bean the raw bean instance + * @return tcc fence advisors + */ + private static List getTccFenceAdvisorsForBean(Object bean) { + Class beanClass = bean.getClass(); + + // Check the @LocalTCC annotation on the class + LocalTCC localTCCAnnotation = AnnotatedElementUtils.findMergedAnnotation(beanClass, LocalTCC.class); + if (localTCCAnnotation != null) { + ProxyFactory proxyFactory = new ProxyFactory(bean); + // Get all methods + Method[] methods = beanClass.getDeclaredMethods(); + Set processedMethods = new HashSet<>(); + List advisors = new ArrayList<>(); + for (Method prepareMethod : methods) { + // Check the @TwoPhaseBusinessAction annotation and its properties on the method + TwoPhaseBusinessAction annotation = AnnotatedElementUtils.findMergedAnnotation(prepareMethod, TwoPhaseBusinessAction.class); + if (annotation != null && annotation.useTCCFence() && !processedMethods.contains(prepareMethod)) { + String commitMethodName = annotation.commitMethod(); + Class[] commitArgsClasses = annotation.commitArgsClasses(); + String rollbackMethodName = annotation.rollbackMethod(); + Class[] rollbackArgsClasses = annotation.rollbackArgsClasses(); + Method commitMethod = findMethodBySignature(beanClass, commitMethodName, commitArgsClasses); + Method rollbackMethod = findMethodBySignature(beanClass, rollbackMethodName, rollbackArgsClasses); + processedMethods.add(prepareMethod); + advisors.add(createAdvisorForTccFence(bean, prepareMethod, new TccFencePrepareInterceptor())); + if (commitMethod != null) { + processedMethods.add(commitMethod); + advisors.add(createAdvisorForTccFence(bean, commitMethod, new TccFenceCommitInterceptor())); + } + if (rollbackMethod != null) { + processedMethods.add(rollbackMethod); + advisors.add(createAdvisorForTccFence(bean, rollbackMethod, new TccFenceRollbackInterceptor())); + } + } + } + + // add all Advisor + advisors.forEach(proxyFactory::addAdvisor); + return advisors; + } + return null; + } + + /** + * find method by signature + * @param clazz the clazz + * @param methodName the method name + * @param parameterTypes the parameter types + * @return the specific signature method + */ + private static Method findMethodBySignature(Class clazz, String methodName, Class[] parameterTypes) { + try { + return clazz.getMethod(methodName, parameterTypes); + } catch (NoSuchMethodException e) { + LOGGER.warn("no such method, methodName:[{}], error: {}", methodName, e.getMessage()); + return null; + } + } + + /** + * Create an tcc fence advisor for a specific expression + * @param proxy proxy + * @param proxyMethod proxy method + * @param advice advice + * @return the tcc fence advisor for specific expressions + */ + private static Advisor createAdvisorForTccFence(Object proxy, Method proxyMethod, Advice advice) { + // Create tangential expressions dynamically + Method interfaceMethod = getInterfaceMethod(proxy, proxyMethod); + String expression = buildPointcutExpression(interfaceMethod); + AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut(); + pointcut.setExpression(expression); + return new DefaultPointcutAdvisor(pointcut, advice); + } + + /** + * Create a tangent expression + * @param method method + * @return the expression + */ + private static String buildPointcutExpression(Method method) { + StringBuilder sb = new StringBuilder(); + sb.append("execution("); + sb.append(method.getReturnType().getName()).append(" "); + sb.append(method.getDeclaringClass().getName()).append("."); + sb.append(method.getName()).append("("); + Class[] paramTypes = method.getParameterTypes(); + for (int i = 0; i < paramTypes.length; i++) { + sb.append(paramTypes[i].getName()); + if (i < paramTypes.length - 1) { + sb.append(", "); + } + } + sb.append("))"); + return sb.toString(); + } + + /** + * get interface method + * @param proxy proxy + * @param proxyMethod proxy method + * @return the interface method + */ + private static Method getInterfaceMethod(Object proxy, Method proxyMethod) { + Class targetClass = AopProxyUtils.ultimateTargetClass(proxy); + Class[] interfaces = targetClass.getInterfaces(); + for (Class interfaceClass : interfaces) { + try { + return interfaceClass.getMethod(proxyMethod.getName(), proxyMethod.getParameterTypes()); + } catch (NoSuchMethodException e) { + throw new ShouldNeverHappenException(e); + } + } + throw new ShouldNeverHappenException("Interface method not found for proxy method: " + proxyMethod.getName()); + } + + /** + * join the tcc fence Advisor after TransactionalInterceptor + * @param advisor tcc fence advisor + * @param advised bean advise + */ + private static void addAdvisorAfterTransactionalInterceptor(Advisor advisor, AdvisedSupport advised) { + Advisor[] advisors = advised.getAdvisors(); + Integer transactionInterceptorIndex = null; + for (int i = 0; i < advisors.length; i++) { + if (advisors[i].getAdvice() instanceof TransactionInterceptor) { + transactionInterceptorIndex = i; + break; + } + } + if (transactionInterceptorIndex != null) { + advised.addAdvisor(transactionInterceptorIndex + 1, advisor); + } else { + advised.addAdvisor(advisor); + } + } + private boolean doCheckers(Object bean, String beanName) { if (PROXYED_SET.contains(beanName) || EXCLUDE_BEAN_NAME_SET.contains(beanName) || FactoryBean.class.isAssignableFrom(bean.getClass())) { diff --git a/tcc/src/main/java/org/apache/seata/rm/tcc/TCCResourceManager.java b/tcc/src/main/java/org/apache/seata/rm/tcc/TCCResourceManager.java index eaaa76494cd..92778994d33 100644 --- a/tcc/src/main/java/org/apache/seata/rm/tcc/TCCResourceManager.java +++ b/tcc/src/main/java/org/apache/seata/rm/tcc/TCCResourceManager.java @@ -26,6 +26,7 @@ import org.apache.seata.common.exception.RepeatRegistrationException; import org.apache.seata.common.exception.ShouldNeverHappenException; import org.apache.seata.common.exception.SkipCallbackWrapperException; +import org.apache.seata.core.context.RootContext; import org.apache.seata.core.exception.TransactionException; import org.apache.seata.core.model.BranchStatus; import org.apache.seata.core.model.BranchType; @@ -121,6 +122,10 @@ public BranchStatus branchCommit(BranchType branchType, String xid, long branchI Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext); Object ret; boolean result; + RootContext.bind(xid); + RootContext.bindBranchId(String.valueOf(branchId)); + RootContext.bindResourceId(resourceId); + RootContext.bindBranchType(BranchType.TCC); // add idempotent and anti hanging if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_COMMON_FENCE))) { try { @@ -146,6 +151,11 @@ public BranchStatus branchCommit(BranchType branchType, String xid, long branchI String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid); LOGGER.error(msg, ExceptionUtil.unwrap(t)); return BranchStatus.PhaseTwo_CommitFailed_Retryable; + } finally { + RootContext.unbind(); + RootContext.unbindBranchId(); + RootContext.unbindResourceId(); + RootContext.unbindBranchType(); } } @@ -179,6 +189,10 @@ public BranchStatus branchRollback(BranchType branchType, String xid, long branc Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext); Object ret; boolean result; + RootContext.bind(xid); + RootContext.bindBranchId(String.valueOf(branchId)); + RootContext.bindResourceId(resourceId); + RootContext.bindBranchType(BranchType.TCC); // add idempotent and anti hanging if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_COMMON_FENCE))) { try { @@ -205,6 +219,11 @@ public BranchStatus branchRollback(BranchType branchType, String xid, long branc String msg = String.format("rollback TCC resource error, resourceId: %s, xid: %s.", resourceId, xid); LOGGER.error(msg, ExceptionUtil.unwrap(t)); return BranchStatus.PhaseTwo_RollbackFailed_Retryable; + } finally { + RootContext.unbind(); + RootContext.unbindBranchId(); + RootContext.unbindResourceId(); + RootContext.unbindBranchType(); } } diff --git a/tcc/src/main/java/org/apache/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java b/tcc/src/main/java/org/apache/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java index acf84663c1d..3cad8dfcbcb 100644 --- a/tcc/src/main/java/org/apache/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java +++ b/tcc/src/main/java/org/apache/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java @@ -83,8 +83,7 @@ protected Object doInvoke(InvocationWrapper invocation) throws Throwable { try { TwoPhaseBusinessActionParam businessActionParam = createTwoPhaseBusinessActionParam(businessAction); //Handler the TCC Aspect, and return the business result - return actionInterceptorHandler.proceed(method, invocation.getArguments(), xid, businessActionParam, - invocation::proceed); + return actionInterceptorHandler.proceed(invocation, xid, businessActionParam); } finally { //if not TCC, unbind branchType if (getBranchType() != previousBranchType) {