Skip to content

Commit

Permalink
feature: add TCC three-phase hooks (apache#6731)
Browse files Browse the repository at this point in the history
  • Loading branch information
chengliefeng committed Aug 23, 2024
1 parent 5601c36 commit 135f273
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.integration.tx.api.fence.hook;


public interface TccHook {

/**
* before tcc prepare
*/
void beforeTccPrepare(String xid, Long branchId, String actionName);

/**
* after tcc prepare
*/
void afterTccPrepare(String xid, Long branchId, String actionName);

/**
* before tcc commit
*/
void beforeTccCommit(String xid, Long branchId, String actionName);

/**
* after tcc commit
*/
void afterTccCommit(String xid, Long branchId, String actionName);

/**
* before tcc rollback
*/
void beforeTccRollback(String xid, Long branchId, String actionName);

/**
* after tcc rollback
*/
void afterTccRollback(String xid, Long branchId, String actionName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.integration.tx.api.fence.hook;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class TccHookManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TccHookManager.class);

private TccHookManager() {

}

private static final List<TccHook> TCC_HOOKS = new CopyOnWriteArrayList<>();
// Cache unmodifiable lists
private volatile static List<TccHook> CACHED_UNMODIFIABLE_HOOKS = null;

/**
* get the hooks
* @return tccHook list
*/
public static List<TccHook> getHooks() {
if (CACHED_UNMODIFIABLE_HOOKS == null) {
synchronized (TccHookManager.class) {
if (CACHED_UNMODIFIABLE_HOOKS == null) {
CACHED_UNMODIFIABLE_HOOKS = Collections.unmodifiableList(TCC_HOOKS);
}
}
}
return CACHED_UNMODIFIABLE_HOOKS;
}

/**
* add new hook
* @param tccHook tccHook
*/
public static void registerHook(TccHook tccHook) {
if (tccHook == null) {
throw new NullPointerException("tccHook must not be null");
}
TCC_HOOKS.add(tccHook);
CACHED_UNMODIFIABLE_HOOKS = null;
LOGGER.info("TccHook registered succeeded! TccHooks size: {}", TCC_HOOKS.size());
}

/**
* clear hooks
*/
public static void clear() {
TCC_HOOKS.clear();
CACHED_UNMODIFIABLE_HOOKS = null;
LOGGER.info("All TccHooks have been cleared.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.seata.common.Constants;
Expand All @@ -32,6 +33,8 @@
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.core.context.RootContext;
import org.apache.seata.integration.tx.api.fence.DefaultCommonFenceHandler;
import org.apache.seata.integration.tx.api.fence.hook.TccHook;
import org.apache.seata.integration.tx.api.fence.hook.TccHookManager;
import org.apache.seata.integration.tx.api.util.JsonUtil;
import org.apache.seata.rm.DefaultResourceManager;
import org.apache.seata.rm.tcc.api.BusinessActionContext;
Expand Down Expand Up @@ -87,7 +90,7 @@ public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBus
try {
//share actionContext implicitly
BusinessActionContextUtil.setContext(actionContext);

doBeforeTccPrepare(xid, branchId, actionName);
if (businessActionParam.getUseCommonFence()) {
try {
// Use common Fence, and return the business result
Expand All @@ -105,6 +108,7 @@ public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBus
}
} finally {
try {
doAfterTccPrepare(xid, branchId, actionName);
//to report business action context finally if the actionContext.getUpdated() is true
BusinessActionContextUtil.reportContext(actionContext);
} finally {
Expand All @@ -119,6 +123,46 @@ public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBus
}
}

/**
* to do some business operations before tcc prepare
* @param xid the xid
* @param branchId the branchId
* @param actionName the actionName
*/
private void doBeforeTccPrepare(String xid, String branchId, String actionName) {
List<TccHook> hooks = TccHookManager.getHooks();
if (hooks.isEmpty()) {
return;
}
for (TccHook hook : hooks) {
try {
hook.beforeTccPrepare(xid, Long.valueOf(branchId), actionName);
} catch (Exception e) {
LOGGER.error("Failed execute beforeTccPrepare in hook {}", e.getMessage(), e);
}
}
}

/**
* to do some business operations after tcc prepare
* @param xid the xid
* @param branchId the branchId
* @param actionName the actionName
*/
private void doAfterTccPrepare(String xid, String branchId, String actionName) {
List<TccHook> hooks = TccHookManager.getHooks();
if (hooks.isEmpty()) {
return;
}
for (TccHook hook : hooks) {
try {
hook.afterTccPrepare(xid, Long.valueOf(branchId), actionName);
} catch (Exception e) {
LOGGER.error("Failed execute afterTccPrepare in hook {}", e.getMessage(), e);
}
}
}

/**
* Get or create action context, and reset to arguments
*
Expand Down
87 changes: 87 additions & 0 deletions tcc/src/main/java/org/apache/seata/rm/tcc/TCCResourceManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.lang.reflect.Method;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -31,6 +32,8 @@
import org.apache.seata.core.model.BranchType;
import org.apache.seata.core.model.Resource;
import org.apache.seata.integration.tx.api.fence.DefaultCommonFenceHandler;
import org.apache.seata.integration.tx.api.fence.hook.TccHook;
import org.apache.seata.integration.tx.api.fence.hook.TccHookManager;
import org.apache.seata.integration.tx.api.remoting.TwoPhaseResult;
import org.apache.seata.rm.AbstractResourceManager;
import org.apache.seata.rm.tcc.api.BusinessActionContext;
Expand Down Expand Up @@ -121,6 +124,7 @@ public BranchStatus branchCommit(BranchType branchType, String xid, long branchI
Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);
//share actionContext implicitly
BusinessActionContextUtil.setContext(businessActionContext);
doBeforeTccCommit(xid, branchId, tccResource.getActionName());
Object ret;
boolean result;
// add idempotent and anti hanging
Expand Down Expand Up @@ -149,6 +153,7 @@ public BranchStatus branchCommit(BranchType branchType, String xid, long branchI
LOGGER.error(msg, ExceptionUtil.unwrap(t));
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
} finally {
doAfterTccCommit(xid, branchId, tccResource.getActionName());
// clear the action context
BusinessActionContextUtil.clear();
}
Expand Down Expand Up @@ -184,6 +189,7 @@ public BranchStatus branchRollback(BranchType branchType, String xid, long branc
Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext);
//share actionContext implicitly
BusinessActionContextUtil.setContext(businessActionContext);
doBeforeTccRollback(xid, branchId, tccResource.getActionName());
Object ret;
boolean result;
// add idempotent and anti hanging
Expand Down Expand Up @@ -213,11 +219,92 @@ public BranchStatus branchRollback(BranchType branchType, String xid, long branc
LOGGER.error(msg, ExceptionUtil.unwrap(t));
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
} finally {
doAfterTccRollback(xid, branchId, tccResource.getActionName());
// clear the action context
BusinessActionContextUtil.clear();
}
}

/**
* to do some business operations before tcc rollback
* @param xid the xid
* @param branchId the branchId
* @param actionName the actionName
*/
private void doBeforeTccRollback(String xid, long branchId, String actionName) {
List<TccHook> hooks = TccHookManager.getHooks();
if (hooks.isEmpty()) {
return;
}
for (TccHook hook : hooks) {
try {
hook.beforeTccRollback(xid, branchId, actionName);
} catch (Exception e) {
LOGGER.error("Failed execute beforeTccRollback in hook {}", e.getMessage(), e);
}
}
}

/**
* to do some business operations after tcc rollback
* @param xid the xid
* @param branchId the branchId
* @param actionName the actionName
*/
private void doAfterTccRollback(String xid, long branchId, String actionName) {
List<TccHook> hooks = TccHookManager.getHooks();
if (hooks.isEmpty()) {
return;
}
for (TccHook hook : hooks) {
try {
hook.afterTccRollback(xid, branchId, actionName);
} catch (Exception e) {
LOGGER.error("Failed execute afterTccRollback in hook {}", e.getMessage(), e);
}
}
}

/**
* to do some business operations before tcc commit
* @param xid the xid
* @param branchId the branchId
* @param actionName the actionName
*/
private void doBeforeTccCommit(String xid, long branchId, String actionName) {
List<TccHook> hooks = TccHookManager.getHooks();
if (hooks.isEmpty()) {
return;
}
for (TccHook hook : hooks) {
try {
hook.beforeTccCommit(xid, branchId, actionName);
} catch (Exception e) {
LOGGER.error("Failed execute beforeTccCommit in hook {}", e.getMessage(), e);
}
}
}

/**
* to do some business operations after tcc commit
* @param xid the xid
* @param branchId the branchId
* @param actionName the actionName
*/
private void doAfterTccCommit(String xid, long branchId, String actionName) {
List<TccHook> hooks = TccHookManager.getHooks();
if (hooks.isEmpty()) {
return;
}
for (TccHook hook : hooks) {
try {
hook.afterTccCommit(xid, branchId, actionName);
} catch (Exception e) {
LOGGER.error("Failed execute afterTccCommit in hook {}", e.getMessage(), e);
}
}
}

/**
* get phase two commit method's args
* @param tccResource tccResource
Expand Down

0 comments on commit 135f273

Please sign in to comment.