Skip to content

Commit

Permalink
bugfix: add three tcc fence interceptors to fix tcc fence deadlock ex…
Browse files Browse the repository at this point in the history
…ception (apache#6679)
  • Loading branch information
chengliefeng committed Jul 29, 2024
1 parent 1c0a442 commit 3447a5d
Show file tree
Hide file tree
Showing 13 changed files with 871 additions and 115 deletions.
96 changes: 96 additions & 0 deletions core/src/main/java/org/apache/seata/core/context/RootContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seata.common.exception.ShouldNeverHappenException;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.core.model.TccLocalTxActive;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand Down Expand Up @@ -60,6 +61,26 @@ private RootContext() {
*/
public static final String KEY_TIMEOUT = "TX_TIMEOUT";

/**
* The constant KEY_RESOURCE_ID.
*/
public static final String KEY_RESOURCE_ID = "TX_RESOURCE_ID";

/**
* The constant KEY_TCC_LOCAL_TX_ACTIVE.
*/
public static final String KEY_TCC_LOCAL_TX_ACTIVE = "TCC_LOCAL_TX_ACTIVE";

/**
* The constant KEY_TCC_COMMIT_RESULT.
*/
public static final String KEY_TCC_COMMIT_RESULT = "KEY_TCC_COMMIT_RESULT";

/**
* The constant KEY_TCC_ROLLBACK_RESULT.
*/
public static final String KEY_TCC_ROLLBACK_RESULT = "KEY_TCC_ROLLBACK_RESULT";

/**
* The constant MDC_KEY_XID for logback
* @since 1.5.0
Expand Down Expand Up @@ -142,6 +163,81 @@ public static void setTimeout(Integer timeout) {
CONTEXT_HOLDER.put(KEY_TIMEOUT,timeout);
}

public static String getBranchId() {
return (String) CONTEXT_HOLDER.get(KEY_BRANCHID);
}

public static void bindBranchId(String branchId) {
CONTEXT_HOLDER.put(KEY_BRANCHID, branchId);
}

public static void unbindBranchId() {
String branchId = (String) CONTEXT_HOLDER.remove(KEY_BRANCHID);
if (LOGGER.isDebugEnabled() && branchId != null) {
LOGGER.debug("unbind branch id");
}
}

public static String getResourceId() {
return (String) CONTEXT_HOLDER.get(KEY_RESOURCE_ID);
}

public static void bindResourceId(String resourceId) {
CONTEXT_HOLDER.put(KEY_RESOURCE_ID, resourceId);
}

public static void unbindResourceId() {
String resourceId = (String) CONTEXT_HOLDER.remove(KEY_RESOURCE_ID);
if (LOGGER.isDebugEnabled() && resourceId != null) {
LOGGER.debug("unbind tcc resource id");
}
}

public static TccLocalTxActive getTccLocalTxActive() {
return (TccLocalTxActive) CONTEXT_HOLDER.get(KEY_TCC_LOCAL_TX_ACTIVE);
}

public static void bindTccLocalTxActive(TccLocalTxActive tccLocalTxActive) {
CONTEXT_HOLDER.put(KEY_TCC_LOCAL_TX_ACTIVE, tccLocalTxActive);
}

public static void unbindTccLocalTxActive() {
TccLocalTxActive tccLocalTxActive = (TccLocalTxActive) CONTEXT_HOLDER.remove(KEY_TCC_LOCAL_TX_ACTIVE);
if (LOGGER.isDebugEnabled() && tccLocalTxActive != null) {
LOGGER.debug("unbind tcc local tx active identification");
}
}

public static Boolean getTccCommitResult() {
return (Boolean) CONTEXT_HOLDER.get(KEY_TCC_COMMIT_RESULT);
}

public static void bindTccCommitResult(Boolean tccCommitResult) {
CONTEXT_HOLDER.put(KEY_TCC_COMMIT_RESULT, tccCommitResult);
}

public static void unbindTccCommitResult() {
Boolean tccCommitResult = (Boolean) CONTEXT_HOLDER.remove(KEY_TCC_COMMIT_RESULT);
if (LOGGER.isDebugEnabled() && tccCommitResult != null) {
LOGGER.debug("unbind tcc commit result");
}
}

public static Boolean getTccRollbackResult() {
return (Boolean) CONTEXT_HOLDER.get(KEY_TCC_ROLLBACK_RESULT);
}

public static void bindTccRollbackResult(Boolean tccRollbackResult) {
CONTEXT_HOLDER.put(KEY_TCC_ROLLBACK_RESULT, tccRollbackResult);
}

public static void unbindTccRollbackResult() {
Boolean tccRollbackResult = (Boolean) CONTEXT_HOLDER.remove(KEY_TCC_ROLLBACK_RESULT);
if (LOGGER.isDebugEnabled() && tccRollbackResult != null) {
LOGGER.debug("unbind tcc rollback result");
}
}

/**
* declare local transactions will use global lock check for update/delete/insert/selectForUpdate SQL
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.model;

/**
* Identifies whether tcc transactions are activated on the business side
*/
public enum TccLocalTxActive {

/**
* The tcc transaction is not activated on the service side.
*/
UN_ACTIVE(0),

/**
* The tcc transaction is activated on the service side.
*/
ACTIVE(1);


private final int code;

TccLocalTxActive(int code) {
this.code = code;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ private void check() {
}

@Override
public Object prepareFence(String xid, Long branchId, String actionName, Callback<Object> targetCallback) {
public Object prepareFence(Method prepareMethod, Object targetTCCBean, String xid, Long branchId, String actionName, Callback<Object> targetCallback) {
check();
return fenceHandler.prepareFence(xid, branchId, actionName, targetCallback);
return fenceHandler.prepareFence(prepareMethod, targetTCCBean, xid, branchId, actionName, targetCallback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public interface FenceHandler {

Object prepareFence(String xid, Long branchId, String actionName, Callback<Object> targetCallback);
Object prepareFence(Method prepareMethod, Object targetTCCBean, String xid, Long branchId, String actionName, Callback<Object> targetCallback);

boolean commitFence(Method commitMethod, Object targetTCCBean, String xid, Long branchId, Object[] args);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,6 @@
*/
package org.apache.seata.integration.tx.api.interceptor;

import javax.annotation.Nonnull;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.seata.common.Constants;
import org.apache.seata.common.exception.FrameworkException;
import org.apache.seata.common.exception.SkipCallbackWrapperException;
Expand All @@ -42,6 +34,14 @@
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import javax.annotation.Nonnull;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* Handler the Tx Participant Aspect : Setting Context, Creating Branch Record
*
Expand All @@ -53,16 +53,17 @@ public class ActionInterceptorHandler {
/**
* Handler the Tx Aspect
*
* @param method the method
* @param arguments the arguments
* @param invocation the invocation wrapper
* @param xid the xid
* @param businessActionParam the business action params
* @param targetCallback the target callback
* @return the business result
* @throws Throwable the throwable
*/
public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessActionParam businessActionParam,
Callback<Object> targetCallback) throws Throwable {
public Object proceed(InvocationWrapper invocation, String xid, TwoPhaseBusinessActionParam businessActionParam) throws Throwable {
Method method = invocation.getMethod();
Object targetTCCBean = invocation.getProxy();
Object[] arguments = invocation.getArguments();
Callback<Object> targetCallback = invocation::proceed;
//Get action context from arguments, or create a new one and then reset to arguments
BusinessActionContext actionContext = getOrCreateActionContextAndResetToArguments(method.getParameterTypes(), arguments);

Expand All @@ -87,11 +88,13 @@ public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBus
try {
//share actionContext implicitly
BusinessActionContextUtil.setContext(actionContext);
RootContext.bindBranchId(branchId);
RootContext.bindResourceId(actionName);

if (businessActionParam.getUseCommonFence()) {
try {
// Use common Fence, and return the business result
return DefaultCommonFenceHandler.get().prepareFence(xid, Long.valueOf(branchId), actionName, targetCallback);
return DefaultCommonFenceHandler.get().prepareFence(method, targetTCCBean, xid, Long.valueOf(branchId), actionName, targetCallback);
} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
Throwable originException = e.getCause();
if (originException instanceof FrameworkException) {
Expand All @@ -104,6 +107,8 @@ public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBus
return targetCallback.execute();
}
} finally {
RootContext.unbindBranchId();
RootContext.unbindResourceId();
try {
//to report business action context finally if the actionContext.getUpdated() is true
BusinessActionContextUtil.reportContext(actionContext);
Expand Down
Loading

0 comments on commit 3447a5d

Please sign in to comment.