From 135f2732efd713d20f9b3cba9f6127a91701460f Mon Sep 17 00:00:00 2001 From: chengliefeng Date: Fri, 23 Aug 2024 15:14:37 +0800 Subject: [PATCH] feature: add TCC three-phase hooks (#6731) --- .../tx/api/fence/hook/TccHook.java | 51 +++++++++++ .../tx/api/fence/hook/TccHookManager.java | 73 ++++++++++++++++ .../interceptor/ActionInterceptorHandler.java | 46 +++++++++- .../seata/rm/tcc/TCCResourceManager.java | 87 +++++++++++++++++++ 4 files changed, 256 insertions(+), 1 deletion(-) create mode 100644 integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHook.java create mode 100644 integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHook.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHook.java new file mode 100644 index 00000000000..365880f8be0 --- /dev/null +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHook.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.integration.tx.api.fence.hook; + + +public interface TccHook { + + /** + * before tcc prepare + */ + void beforeTccPrepare(String xid, Long branchId, String actionName); + + /** + * after tcc prepare + */ + void afterTccPrepare(String xid, Long branchId, String actionName); + + /** + * before tcc commit + */ + void beforeTccCommit(String xid, Long branchId, String actionName); + + /** + * after tcc commit + */ + void afterTccCommit(String xid, Long branchId, String actionName); + + /** + * before tcc rollback + */ + void beforeTccRollback(String xid, Long branchId, String actionName); + + /** + * after tcc rollback + */ + void afterTccRollback(String xid, Long branchId, String actionName); +} diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java new file mode 100644 index 00000000000..e6d537c73f2 --- /dev/null +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/fence/hook/TccHookManager.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.integration.tx.api.fence.hook; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class TccHookManager { + private static final Logger LOGGER = LoggerFactory.getLogger(TccHookManager.class); + + private TccHookManager() { + + } + + private static final List TCC_HOOKS = new CopyOnWriteArrayList<>(); + // Cache unmodifiable lists + private volatile static List CACHED_UNMODIFIABLE_HOOKS = null; + + /** + * get the hooks + * @return tccHook list + */ + public static List getHooks() { + if (CACHED_UNMODIFIABLE_HOOKS == null) { + synchronized (TccHookManager.class) { + if (CACHED_UNMODIFIABLE_HOOKS == null) { + CACHED_UNMODIFIABLE_HOOKS = Collections.unmodifiableList(TCC_HOOKS); + } + } + } + return CACHED_UNMODIFIABLE_HOOKS; + } + + /** + * add new hook + * @param tccHook tccHook + */ + public static void registerHook(TccHook tccHook) { + if (tccHook == null) { + throw new NullPointerException("tccHook must not be null"); + } + TCC_HOOKS.add(tccHook); + CACHED_UNMODIFIABLE_HOOKS = null; + LOGGER.info("TccHook registered succeeded! TccHooks size: {}", TCC_HOOKS.size()); + } + + /** + * clear hooks + */ + public static void clear() { + TCC_HOOKS.clear(); + CACHED_UNMODIFIABLE_HOOKS = null; + LOGGER.info("All TccHooks have been cleared."); + } +} \ No newline at end of file diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/ActionInterceptorHandler.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/ActionInterceptorHandler.java index b3600cdc55a..6773faa108d 100644 --- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/ActionInterceptorHandler.java +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/ActionInterceptorHandler.java @@ -22,6 +22,7 @@ import java.lang.reflect.UndeclaredThrowableException; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.seata.common.Constants; @@ -32,6 +33,8 @@ import org.apache.seata.common.util.NetUtil; import org.apache.seata.core.context.RootContext; import org.apache.seata.integration.tx.api.fence.DefaultCommonFenceHandler; +import org.apache.seata.integration.tx.api.fence.hook.TccHook; +import org.apache.seata.integration.tx.api.fence.hook.TccHookManager; import org.apache.seata.integration.tx.api.util.JsonUtil; import org.apache.seata.rm.DefaultResourceManager; import org.apache.seata.rm.tcc.api.BusinessActionContext; @@ -87,7 +90,7 @@ public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBus try { //share actionContext implicitly BusinessActionContextUtil.setContext(actionContext); - + doBeforeTccPrepare(xid, branchId, actionName); if (businessActionParam.getUseCommonFence()) { try { // Use common Fence, and return the business result @@ -105,6 +108,7 @@ public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBus } } finally { try { + doAfterTccPrepare(xid, branchId, actionName); //to report business action context finally if the actionContext.getUpdated() is true BusinessActionContextUtil.reportContext(actionContext); } finally { @@ -119,6 +123,46 @@ public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBus } } + /** + * to do some business operations before tcc prepare + * @param xid the xid + * @param branchId the branchId + * @param actionName the actionName + */ + private void doBeforeTccPrepare(String xid, String branchId, String actionName) { + List hooks = TccHookManager.getHooks(); + if (hooks.isEmpty()) { + return; + } + for (TccHook hook : hooks) { + try { + hook.beforeTccPrepare(xid, Long.valueOf(branchId), actionName); + } catch (Exception e) { + LOGGER.error("Failed execute beforeTccPrepare in hook {}", e.getMessage(), e); + } + } + } + + /** + * to do some business operations after tcc prepare + * @param xid the xid + * @param branchId the branchId + * @param actionName the actionName + */ + private void doAfterTccPrepare(String xid, String branchId, String actionName) { + List hooks = TccHookManager.getHooks(); + if (hooks.isEmpty()) { + return; + } + for (TccHook hook : hooks) { + try { + hook.afterTccPrepare(xid, Long.valueOf(branchId), actionName); + } catch (Exception e) { + LOGGER.error("Failed execute afterTccPrepare in hook {}", e.getMessage(), e); + } + } + } + /** * Get or create action context, and reset to arguments * diff --git a/tcc/src/main/java/org/apache/seata/rm/tcc/TCCResourceManager.java b/tcc/src/main/java/org/apache/seata/rm/tcc/TCCResourceManager.java index 2ad3c2b373f..a905d616c50 100644 --- a/tcc/src/main/java/org/apache/seata/rm/tcc/TCCResourceManager.java +++ b/tcc/src/main/java/org/apache/seata/rm/tcc/TCCResourceManager.java @@ -18,6 +18,7 @@ import java.lang.reflect.Method; import java.lang.reflect.UndeclaredThrowableException; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -31,6 +32,8 @@ import org.apache.seata.core.model.BranchType; import org.apache.seata.core.model.Resource; import org.apache.seata.integration.tx.api.fence.DefaultCommonFenceHandler; +import org.apache.seata.integration.tx.api.fence.hook.TccHook; +import org.apache.seata.integration.tx.api.fence.hook.TccHookManager; import org.apache.seata.integration.tx.api.remoting.TwoPhaseResult; import org.apache.seata.rm.AbstractResourceManager; import org.apache.seata.rm.tcc.api.BusinessActionContext; @@ -121,6 +124,7 @@ public BranchStatus branchCommit(BranchType branchType, String xid, long branchI Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext); //share actionContext implicitly BusinessActionContextUtil.setContext(businessActionContext); + doBeforeTccCommit(xid, branchId, tccResource.getActionName()); Object ret; boolean result; // add idempotent and anti hanging @@ -149,6 +153,7 @@ public BranchStatus branchCommit(BranchType branchType, String xid, long branchI LOGGER.error(msg, ExceptionUtil.unwrap(t)); return BranchStatus.PhaseTwo_CommitFailed_Retryable; } finally { + doAfterTccCommit(xid, branchId, tccResource.getActionName()); // clear the action context BusinessActionContextUtil.clear(); } @@ -184,6 +189,7 @@ public BranchStatus branchRollback(BranchType branchType, String xid, long branc Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext); //share actionContext implicitly BusinessActionContextUtil.setContext(businessActionContext); + doBeforeTccRollback(xid, branchId, tccResource.getActionName()); Object ret; boolean result; // add idempotent and anti hanging @@ -213,11 +219,92 @@ public BranchStatus branchRollback(BranchType branchType, String xid, long branc LOGGER.error(msg, ExceptionUtil.unwrap(t)); return BranchStatus.PhaseTwo_RollbackFailed_Retryable; } finally { + doAfterTccRollback(xid, branchId, tccResource.getActionName()); // clear the action context BusinessActionContextUtil.clear(); } } + /** + * to do some business operations before tcc rollback + * @param xid the xid + * @param branchId the branchId + * @param actionName the actionName + */ + private void doBeforeTccRollback(String xid, long branchId, String actionName) { + List hooks = TccHookManager.getHooks(); + if (hooks.isEmpty()) { + return; + } + for (TccHook hook : hooks) { + try { + hook.beforeTccRollback(xid, branchId, actionName); + } catch (Exception e) { + LOGGER.error("Failed execute beforeTccRollback in hook {}", e.getMessage(), e); + } + } + } + + /** + * to do some business operations after tcc rollback + * @param xid the xid + * @param branchId the branchId + * @param actionName the actionName + */ + private void doAfterTccRollback(String xid, long branchId, String actionName) { + List hooks = TccHookManager.getHooks(); + if (hooks.isEmpty()) { + return; + } + for (TccHook hook : hooks) { + try { + hook.afterTccRollback(xid, branchId, actionName); + } catch (Exception e) { + LOGGER.error("Failed execute afterTccRollback in hook {}", e.getMessage(), e); + } + } + } + + /** + * to do some business operations before tcc commit + * @param xid the xid + * @param branchId the branchId + * @param actionName the actionName + */ + private void doBeforeTccCommit(String xid, long branchId, String actionName) { + List hooks = TccHookManager.getHooks(); + if (hooks.isEmpty()) { + return; + } + for (TccHook hook : hooks) { + try { + hook.beforeTccCommit(xid, branchId, actionName); + } catch (Exception e) { + LOGGER.error("Failed execute beforeTccCommit in hook {}", e.getMessage(), e); + } + } + } + + /** + * to do some business operations after tcc commit + * @param xid the xid + * @param branchId the branchId + * @param actionName the actionName + */ + private void doAfterTccCommit(String xid, long branchId, String actionName) { + List hooks = TccHookManager.getHooks(); + if (hooks.isEmpty()) { + return; + } + for (TccHook hook : hooks) { + try { + hook.afterTccCommit(xid, branchId, actionName); + } catch (Exception e) { + LOGGER.error("Failed execute afterTccCommit in hook {}", e.getMessage(), e); + } + } + } + /** * get phase two commit method's args * @param tccResource tccResource