引言
在上一篇《深入剖析Spring源码解析系列-Spring事务原理(上)》中,我们已经探讨了Spring事务的动态代{过}{滤}理机制。在本篇文章中,我们将继续深入研究事务拦截器的实现以及事务传播机制。
spring版本: 5.0.2
掘金原文
一、事务拦截器
Spring事务拦截器(TransactionInterceptor
)是声明式事务切面的重要组件,这是我们研究spring声明式事务机制的重要入口,接下进入TransactionInterceptor
源码看看都做了些什么?
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
//...省略
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@Code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
/**
* 【重点】事务拦截器核心逻辑
*/
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
//...省略
}
TransactionInterceptor
继承了TransactionAspectSupport
,TransactionAspectSupport
是提供事务支持的抽象基类,由子类继承去支持不同事务管理方式和集成情况。
例如:
TransactionInterceptor
:用于实现声明式事务切面的基本功能,例如 @Transactional
注解进行声明式事务管理
JtaTransactionAspect
: AbstractTransactionAspect
的一个扩展类,专门用于与 JTA(Java Transaction API)集成的事务切面。支持全局分布式事务,与多个资源(例如数据库、JMS 队列等)的事务协调。
AspectJTransactionManagementConfiguration
:支持使用 AspectJ 来实现事务切面,为 AspectJ 事务切面提供配置和集成。
HibernateJpaDialect
:虽然不是直接的子类,但是 HibernateJpaDialect
实现了 Spring 的 JpaDialect
接口,用于将 Hibernate 与 Spring 事务管理集成,实现将 Hibernate 的 Session 绑定到 Spring 事务中的功能。
TransactionAspectSupport#invokeWithinTransaction
是事务拦截器的核心逻辑,我们继续跟踪下去看看做了什么?
invokeWithinTransaction分析
//org.springframework.transaction.interceptor.TransactionInterceptor
@Nullable
protected Object invokeWithinTransaction(Method method, @nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 获取之前在配置类中注册了一个AnnotationTransactionAttributeSource 事务注解解析器
// 这里通过它去获取事务属性
TransactionAttributeSource tas = getTransactionAttributeSource();
// 【重点】解析@Transactional注解获取事务属性
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 根据别名获取事务管理器,没有别名则根据默认事务名称获取
// 事务管理器 提供三种方法,获取事务,事务回滚,事务提交
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// 切点名称(类名+方法名),会被作为事务的名称
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
//判断是标准事务管理还是响应式事务管理,一般我们使用标准事务管理
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
//【重点】创建事务信息
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
//【重点】执行模版方法的proceed,最终调用目标类的方法
retVal = invocation.proceedWithInvocation();
} catch (Throwable ex) {
// target invocation exception
//【重点】回滚事务
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
} finally {
//清空事务信息
cleanupTransactionInfo(txInfo);
}
//【重点】提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
} else {
// 省略响应式事务管理....
}
}
通过上述代码,我们发现在事务拦截器核心逻辑里,主要分为标准事务管理和回调事务管理 (spring 5.2版本增加响应式事务管理)。回调事务管理简单来说是通过回调方法进行事务管理。由于回调事务管理不常用,所以在这里只对标准事务管理进行分析。
事务拦截器核心步骤:
-
获取事务属性: tas.getTransactionAttribute(method, targetClass)
-
创建事务 :createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
-
执行业务逻辑:invocation.proceedWithInvocation();
-
异常回滚事务:completeTransactionAfterThrowing(txInfo, ex);
-
清空事务信息:cleanupTransactionInfo(txInfo);
-
提交事务:commitTransactionAfterReturning(txInfo);
二、获取事务属性
// 获取事务对应的属性
@Override
@Nullable
public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
if (method.getDeclaringClass() == Object.class) {
return null;
}
// First, see if we have a cached value.
// 在缓存中查找
Object cacheKey = getCacheKey(method, targetClass);
Object cached = this.attributeCache.get(cacheKey);
if (cached != null) {
// Value will either be canonical value indicating there is no transaction attribute,
// or an actual transaction attribute.
if (cached == NULL_TRANSACTION_ATTRIBUTE) {
return null;
} else {
return (TransactionAttribute) cached;
}
} else {
// We need to work it out.
// 【重点】解析类或者方法上@Transactional事务注解
TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
// Put it in the cache.
// 将解析结果缓存
// 若为null也放入一个特殊标记,表示该方法不需要事务管理
if (txAttr == null) {
this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
} else {
String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
if (txAttr instanceof DefaultTransactionAttribute) {
((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
}
this.attributeCache.put(cacheKey, txAttr);
}
return txAttr;
}
}
这块逻辑很简单,先从AnnotationTransactionAttributeSource
本地缓存attributeCache
获取事务属性,如果获取不到,则调用computeTransactionAttribute()
解析方法/类上@Transactional
注解,然后将解析的事务属性放入本地缓存attributeCache
。
这里重点关注computeTransactionAttribute()
如何实现
computeTransactionAttribute分析
@Nullable
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// 默认 allowPublicMethodsOnly为true
// 如果@Transactional放在非public方法上返回null,不生效事务
if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
return null;
}
// 获取目标类的用户类,以避免代{过}{滤}理类的影响
Class<?> userClass = (targetClass != null ? ClassUtils.getUserClass(targetClass) : null);
// 找到具体实现类的方法,解决从父类继承的方法
Method specificMethod = ClassUtils.getMostSpecificMethod(method, userClass);
// 处理可能的桥接方法
specificMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
// First try is the method in the target class.
// 尝试在目标类方法上找@Transactional
TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
if (txAttr != null) {
return txAttr;
}
// 尝试在目标类上找@Transactional
// Second try is the transaction attribute on the target class.
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
// 降级到接口跟接口中的方法上找@Transactional
if (specificMethod != method) {
// Fallback is to look at the original method.
txAttr = findTransactionAttribute(method);
if (txAttr != null) {
return txAttr;
}
// Last fallback is the class of the original method.
txAttr = findTransactionAttribute(method.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
}
return null;
}
通过上述代码,可以看到对非public方法不生效事务,然后根据优先级查找目标方法上的@Transactional
注解,接着查找目标类上的@Transactional
注解,最后回退到接口或父类的方法/类上查找。@Transactional
注解解析的具体细节是在findTransactionAttribute
方法中完成的,我们继续往下看findTransactionAttribute
实现逻辑
findTransactionAttributefe分析
@Override
@Nullable
protected TransactionAttribute findTransactionAttribute(Method method) {
return determineTransactionAttribute(method);
}
@Nullable
protected TransactionAttribute determineTransactionAttribute(AnnotatedElement ae) {
for (TransactionAnnotationParser annotationParser : this.annotationParsers) {
// 遍历注解解析器,尝试解析@Transactional
TransactionAttribute attr = annotationParser.parseTransactionAnnotation(ae);
if (attr != null) {
return attr;
}
}
return null;
}
AnnotationTransactionAttributeSource
默认使用SpringTransactionAnnotationParser
来解析@Transactional
注解,我们继续往下分析这个解析器的实现。
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
// 定义了在出现异常时如何回滚
RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
Propagation propagation = attributes.getEnum("propagation"); //事务传播级别
rbta.setPropagationBehavior(propagation.value());
Isolation isolation = attributes.getEnum("isolation"); //事务隔离级别
rbta.setIsolationLevel(isolation.value());
rbta.setTimeout(attributes.getNumber("timeout").intValue()); //事务超时时间
rbta.setReadOnly(attributes.getBoolean("readOnly")); //默认只读事务
rbta.setQualifier(attributes.getString("value")); //事务名称
//添加回滚规则
ArrayList<RollbackRuleAttribute> rollBackRules = new ArrayList<>();
Class<?>[] rbf = attributes.getClassArray("rollbackFor");
for (Class<?> rbRule : rbf) {
RollbackRuleAttribute rule = new RollbackRuleAttribute(rbRule);
rollBackRules.add(rule);
}
String[] rbfc = attributes.getStringArray("rollbackForClassName");
for (String rbRule : rbfc) {
RollbackRuleAttribute rule = new RollbackRuleAttribute(rbRule);
rollBackRules.add(rule);
}
Class<?>[] nrbf = attributes.getClassArray("noRollbackFor");
for (Class<?> rbRule : nrbf) {
NoRollbackRuleAttribute rule = new NoRollbackRuleAttribute(rbRule);
rollBackRules.add(rule);
}
String[] nrbfc = attributes.getStringArray("noRollbackForClassName");
for (String rbRule : nrbfc) {
NoRollbackRuleAttribute rule = new NoRollbackRuleAttribute(rbRule);
rollBackRules.add(rule);
}
rbta.getRollbackRules().addAll(rollBackRules);
return rbta;
}
可以看到解析完@Transactional
属性后,将这些信息封装成RuleBasedTransactionAttribute
返回。
三、创建事务
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
// 如果没有为事务指定名称,使用切点作为事务名称
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 【重点】调用事务管理器的方法,获取一个事务并返回事务的状态
status = tm.getTransaction(txAttr);
}
//...省略日志
}
// 将事务相关信息封装到TransactionInfo对象中
// 并将TransactionInfo绑定到当前线程
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
上述代码主要是根据给定的PlatformTransactionManager
调用getTransaction(txAttr)
创建事务,最后调用prepareTransactionInfo(...)
将事务相关信息封装到 TransactionInfo
对象中,并将该对象绑定到当前线程resource
线程上下文(ThreadLocal
),以便在方法执行期间传递事务相关的信息。
我们继续往下跟踪tm.getTransaction(txAttr)
做了些什么?
getTransaction(txAttr)分析
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
//【重点】创建数据源事务对象,封装连接持有器等相关信息
Object transaction = doGetTransaction();
// 缓存调试标志,避免重复检查
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// 如果没有提供事务定义,使用默认事务定义
definition = new DefaultTransactionDefinition();
}
//【重点】是否存在事务
//判断是否满足有连接持有器以及事务激活的
//第一次调用不满足走外部事物,第二次调用满足走嵌套事务
if (isExistingTransaction(transaction)) {
/**
* 【重点】嵌套事务处理事务传播机制
**/
return handleExistingTransaction(definition, transaction, debugEnabled);
}
/**
*【重点】 外部事务处理事务传播机制
**/
//事务属性配置事务超时 小于 事务默认超时-1,抛异常
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
//事务传播类型为 MANDATORY (强制性要求必须开启事务),表示如果当前没有事务,则抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
} else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
/**
* REQUIRED : 如果当前没有事务,就新建一个事务
* REQUIRES_NEW: 如果当前存在事务,把当前事务挂起,然后新建一个事务
* NESTED: 如果一个活动事务存在,则运行在一个嵌套事务中
*/
// 如果存在同步,将注册的同步挂起,一般不存在
SuspendedResourcesHolder suspendedResources = suspend(null);
//...省略日志
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
//根据事务注解属性,创建事务状态,包含事务对象,事务属性、是否是新事务标识true、
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//开启事务 -> 绑定数据连接到ThreadLocal
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
} catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
} else {
//非事务方式执行。事务传播级别:NEVER、NOT_SUPPORTED、SUPPORTS
// 创建一个空事务,没有实际的事务提交以及回滚机制
// 会激活同步:将数据库连接绑定到当前线程上
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
说白了,上述代码doGetTransaction()
创建一个事务对象设置ConnectionHolder
,然后通过isExistingTransaction(transaction)
方法判断ConnectionHolder
是否存在,用于区分外部事务和嵌套事务,便于对事务传播级别进行不同处理。
我们从doGetTransaction()
开始入手
@Override
protected Object doGetTransaction() {
// 创建一个数据源事务对象
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
// 根据事务管理器的配置,设置是否允许使用保存点(Savepoint)
// 默认true允许
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// 从当前事务同步管理器中获取连接持有器,连接持有器包含了数据源连接池中的连接
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
// 如果当前事务上下文中已经存在连接持有器(已经存在一个连接),则将连接持有器设置给事务对象
// 并将 newConnectionHolder 标志设置为 false,表示复用之前的连接,不是创建一个新连接
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
上述代码流程:
- 首先,创建一个
DataSourceTransactionObject
对象,该对象用于封装数据源的事务相关信息,包括连接持有器和事务状态等。在创建该对象时,还设置了保存点(Savepoint),用于在事务过程中设置回滚点或部分提交点。
- 然后,通过
TransactionSynchronizationManager.getResource(obtainDataSource())
方法从 TransactionSynchronizationManager
的 ThreadLocal
中获取一个 HashMap
,这个 HashMap
用于维护与数据源相关的事务状态。
- 在获取到的
HashMap
中,以数据源为key获取相应的 ConnectionHolder
对象,包含了从数据源连接池中获取的连接。
- 将获取到的
ConnectionHolder
设置到之前创建的 DataSourceTransactionObject
事务对象中,这个操作为后续的事务处理提供了条件,包括外部事务和嵌套事务的传播机制处理,以及传递数据源连接。
- 当第一次调用事务时,无法从
TransactionSynchronizationManager
中获取绑定到线程的 ConnectionHolder
,说明此时尚未开启事务。因此,会走外部事务的传播机制,将 ConnectionHolder
绑定到线程上,以便在事务过程中进行复用。
- 当后续再次调用事务时,可以从
TransactionSynchronizationManager
中获取已绑定的 ConnectionHolder
,从而可以进入嵌套事务的逻辑,复用已有的连接,实现嵌套事务对传播机制的处理。
继续往下跟踪if (isExistingTransaction(transaction))
判断是否满足有连接持有器以及事务激活后,事务传播机制在外部事务和嵌套事务中如何处理?
揭秘Spring事务传播机制
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
// 创建事务对象,略...
// 处理嵌套事务,略...
/**
* 外部事务处理事务传播
*/
//事务属性配置事务超时 小于 事务默认超时-1,抛异常
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
//事务传播类型为 MANDATORY (强制性要求必须开启事务),表示如果当前没有事务,则抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
} else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
/**
* REQUIRED : 如果当前没有事务,就新建一个事务
* REQUIRES_NEW: 如果当前存在事务,把当前事务挂起,然后新建一个事务
* NESTED: 如果一个活动事务存在,则运行在一个嵌套事务中
*/
// 如果存在同步,将注册的同步挂起,一般不存在
SuspendedResourcesHolder suspendedResources = suspend(null);
//...省略日志
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
//根据事务注解属性,创建事务状态,包含事务对象,事务属性、是否是新事务标识true、
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//开启事务 -> 绑定数据连接到ThreadLocal
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
} catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
} else {
//非事务方式执行。事务传播级别:NEVER、NOT_SUPPORTED、SUPPORTS
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
// 创建一个空事务,没有实际的事务提交以及回滚机制
// 会激活同步:将数据库连接绑定到当前线程上
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
外部事务事务传播级别处理:
- 如果传播级别为
MANDATORY
,要求必须存在已有事务,如果事务不存在,就会抛出 IllegalTransactionStateException
异常
- 如果传播级别为
REQUIRED
、REQUIRES_NEW
、 NESTED
,则doBegin()
新建一个事务。
- 如果传播级别为
NEVER
、NOT_SUPPORTED
、SUPPORTS
传播类型,以非事务方式执行,创建空事务,没有实际的事务提交和回滚机制,但会激活同步,将数据库连接绑定到当前线程上。
重点关注传播级别为 REQUIRED
、REQUIRES_NEW
、 NESTED
时,它的核心处理逻辑doBegin(transaction, definition)
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
//第一次开启事务都没ConnectionHolder
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
//获取数据源连接
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
//设置连接持有器,是否新建为 true
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
//获取数据源连接
con = txObject.getConnectionHolder().getConnection();
//对连接设置 事务隔离级别、是否只读
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
//是否自动提交,如果自动提交事务,则设置为false
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
//如果配置连接只读,则执行SQL ( SET TRANSACTION READ ONLY) 设置事务只读
prepareTransactionalConnection(con, definition);
//事务激活标识设置为true
txObject.getConnectionHolder().setTransactionActive(true);
//如果配置了事务超时,则进行设置
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
//key为数据源,value为连接持有器 的map形式与线程绑定,存储到TransactionSynchronizationManager的resource线程上下文
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
//省略异常...
}
在 doBegin()
方法中,从事务对象中获取ConnectionHolder
,如果没有,则调用obtainDataSource().getConnection()
从DataSource
获取Connection
封装成ConnectionHolder
设置到事务对象,然后对ConnectionHolder
设置 事务隔离级别、是否只读、事务超时、是否自动提交,最后以HashMap
形式将DataSource
为key
与ConnectionHolder
为value
绑定到TransactionSynchronizationManager
的resource
线程上下文(ThreadLocal
)。
if (isExistingTransaction(transaction)) {
// 嵌套事务处理事务传播机制
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}
当事务已经存在时,就会进入了处理嵌套事务的阶段,这时会调用 handleExistingTransaction
方法,其内部实现如下:
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
//嵌套事务处理传播级别
//NEVER 以非事务方式执行,如果当前存在事务,则抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
//NOT_SUPPORTED 非事务方式执行,如果存在事务,则将当前事务挂起
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
//挂起事务
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
//创建事务状态
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
//REQUIRES_NEW, 如果存在事务,则将当前事务挂起,然后新建一个事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
//挂起事务
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
// 创建新事务状态
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//开启事务
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
} catch (RuntimeException | Error beginEx) {\
// 开启事务失败,恢复事务状态并抛出异常
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
//NESTED 如果一个活动事务存在,则运行在一个嵌套事务中
// 如果嵌套事务的传播级别为nested,会获取当前线程绑定的数据库连接
// 并通过数据库连接创建一个保存点(save point)
// 其实就是调用Connection的setSavepoint方法
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 默认是允许的,所以这个判断不会成立
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
//省略日志...
// 默认是true, 使用保存点
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
//创建保存点
status.createAndHoldSavepoint();
return status;
} else {
// JTA进行事务管理才会进入这这里,我们不做考虑
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// 省略日志...
// 嵌套事务传播级别为supports、required、mandatory时,是否需要校验嵌套事务的属性
// 主要校验的是个隔离级别跟只读属性
// 默认是不需要校验的
// 如果开启了校验,那么会判断如果外围事务的隔离级别跟嵌套事务的隔离级别是否一致
// 如果不一致,直接抛出异常
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
// 嵌套事务的只读为false
if (!definition.isReadOnly()) {
// 但是外部事务的只读为true,那么直接抛出异常
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
每个传播级别在代码中都有特定的处理逻辑,下面将对各个级别进行详细解释:
-
如果事务传播级别为NEVER
不允许存在事务,如果事务存在,就会抛出 IllegalTransactionStateException
异常,没啥好分析的
-
如果事务传播级别为NOT_SUPPORTED
时,代码如下
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
//挂起事务
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
//创建事务状态,transaction = null,newTransaction = false,表示不存在事务
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
这里主要做了挂起事务suspend(transaction)
和创建事务状态prepareTransactionStatus
-
挂起事务
-
清理外部事务使用TransactionSynchronizationManager
中绑定到resource
线程上下文的数据源连接
-
将清理的外部事务数据源连接保存到当前事务,以便后续嵌套事务完成后重新绑定到TransactionSynchronizationManager
中绑定到resource
线程上下文,让外部事务能继续运行
-
创建事务状态
- 传递参数
transaction = null
,newTransaction = false
创建事务状态,这两个属性表示不存在事务。后续二阶段提交/回滚判断时,需要依赖两个属性判断是否进行真正的提交/回滚。
- 如果事务传播级别为
REQUIRES_NEW
时,代码如下:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
//省略日志...
//挂起事务
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
// 创建新事务状态
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//开启事务
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
} catch (RuntimeException | Error beginEx) {\
// 开启事务失败,恢复事务状态并抛出异常
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
通过前文对挂起事务suspend(transaction)
和开启事务doBegin()
已经了解了,我们重点关注创建新事务状态的属性transaction 不为 null
,newTransaction = true
,表示存在事务,可以真正进行二阶段提交/回滚。
- 如果事务传播级别为
NESTED
时,代码如下:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 默认是允许的,所以这个判断不会成立
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
//省略日志...
// 默认是true, 使用保存点
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
//创建保存点
status.createAndHoldSavepoint();
return status;
} else {
// JTA进行事务管理才会进入这这里,我们不做考虑
}
}
创建保存点调用Connection#setSavepoint
方法没啥好说的,我们还是重点关注创建新事务状态的属性transaction 不为 null
,newTransaction = false
,表示加入到外部事务,后续在外部事务提交或回滚时,嵌套事务的操作会随之提交或回滚。但嵌套事务增加了保存点方式,可以将嵌套事务的操作隔离出来,以便在需要时选择性地回滚这些操作,而不会影响整个外部事务。
- 如果事务传播级别为
SUPPORTS`、`REQUIRED`、`MANDATORY
时,代码如下:
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
与事务传播级别NESTED
只是少了保存点,不过创建事务状态的属性transaction 不为 null
,newTransaction = false
是一致的,表示加入到外部事务,与外部事务一起提交或回滚。
- 传播级别为
NEVER
,不允许存在事务,如果事务存在,就会抛出 IllegalTransactionStateException
异常
- 传播级别为
NOT_SUPPORTED
,以非事务方式执行,如果存在事务,则suspend()
将当前事务挂起,新事务状态(transaction = null
,newTransaction = false
),表示不存在事务
- 传播级别为
REQUIRES_NEW
,如果存在事务,那么suspend()
将当前事务挂起,doBegin()
新建一个事务,新事务状态(transaction 不为 null
,newTransaction = true
),表示存在事务,可以真正进行二阶段提交/回滚。
- 传播级别为
NESTED
,获取外部事务线程绑定的数据库连接, 并通过数据库连接创建一个保存点,新事务状态(transaction 不为 null
,newTransaction = false
),表示加入外部事务,将嵌套事务隔离于外部事务,可以根据需要选择性地回滚
- 传播级别为
SUPPORTS
、REQUIRED
、MANDATORY
,新事务状态(transaction 不为 null
,newTransaction = false
),表示加入到外部事务,与外部事务一起提交或回滚。
四、执行业务逻辑
在实际业务中,通常会使用 MyBatis
这样的ORM框架来访问数据库。所以我们从 MyBatis
的角度深入探讨它如何与 Spring 事务进行整合
//--- org.apache.ibatis.executor.SimpleExecutor ----
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
//获取数据连接
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt);
return stmt;
}
protected Connection getConnection(Log statementLog) throws SQLException {
//Spring事务管理器SpringManagedTransaction#getConnection()
Connection connection = transaction.getConnection();
if (statementLog.isDebugEnabled()) {
return ConnectionLogger.newInstance(connection, statementLog, queryStack);
} else {
return connection;
}
}
//--- SpringManagedTransaction ---
public Connection getConnection() throws SQLException {
// 如果连接为null,获取一个新的数据源连接
if (this.connection == null) {
openConnection();
}
return this.connection;
}
private void openConnection() throws SQLException {
//DataSourceUtils获取数据源连接
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
//省略日志...
}
//--- DataSourceUtils ---
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
Assert.notNull(dataSource, "No DataSource specified");
//事务同步管理器获取绑定到resource线程上下文的ConnectionHolder
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
logger.debug("Fetching resumed JDBC Connection from DataSource");
conHolder.setConnection(fetchConnection(dataSource));
}
return conHolder.getConnection();
}
//省略...
}
在MyBatis
中,当Executor
创建PreparedStatementHandler
执行prepareStatement()
方法时,通过SpringManagedTransaction
使用DataSourceUtils
工具类从TransactionSynchronizationManager
的resource
线程上下文中,以DataSource
为key获取绑定到线程的Connection
,然后执行业务SQL
五、异常回滚事务
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
// 事务信息和事务状态不为空
if (txInfo != null && txInfo.getTransactionStatus() != null) {
// 检查事务属性是否需要回滚
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 回滚事务
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
// 如果回滚异常,记录错误信息并包装异常再次抛出
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException | Error ex2) {
// 如果回滚异常,记录错误信息并抛出异常
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
} else {
// 当前异常不需要回滚,但如果事务状态设置为回滚,则仍然会回滚
try {
// 提交事务
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
// 如果提交异常,记录错误信息并包装异常再次抛出
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException | Error ex2) {
// 如果提交异常,记录错误信息并抛出异常
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
txInfo.transactionAttribute.rollbackOn(ex)
判断是否应该回滚事务。如果给定的异常类型与事务属性的回滚规则匹配,即异常类型满足回滚条件,那么会返回 true
,表示需要回滚事务;否则返回 false
,表示不需要回滚事务。默认情况下,@Transationa
注解没有配置rollnackFor
属性,那么只对RunTimeException
和Error
进行回滚。
继续往下看真正执行回滚的rollback
方法,代码如下:
public final void rollback(TransactionStatus status) throws TransactionException {
// 检查事务状态是否已经完成
if (status.isCompleted()) {
// 如果事务已经完成,则抛出异常,不允许多次调用 commit 或 rollback
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
// 事务状态强制转换为 DefaultTransactionStatus 类型
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 调用 processRollback 方法进行实际的回滚操作
// 第二个参数传入 false,表示不是回滚异常导致的回滚操作
processRollback(defStatus, false);
}
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
//传入为false
boolean unexpectedRollback = unexpected;
try {
// 触发事务完成前置处理 (Hook)
triggerBeforeCompletion(status);
// 判断是否存在保存点,如果存在则回滚到保存点
// 嵌套事务传播级别:nested
if (status.hasSavepoint()) {
status.rollbackToHeldSavepoint();
}
// 如果是新事务,则直接执行回滚
// 外部事务传播级别:nested、required、requires_new
// 嵌套事务传播级别:requires_new
else if (status.isNewTransaction()) {
doRollback(status);
}
// 如果参与到外部事务中
// 嵌套事务传播级别为:supports、required、mandatory
else {
if (status.hasTransaction()) {
// isLocalRollbackOnly 本地事务被标记为只回滚 , 默认 false
// isGlobalRollbackOnParticipationFailure 全局标记为在参与事务失败时回滚,默认true
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
// 设置事务为只回滚
// 说白了就是嵌套事务发生异常时,设置为只回滚
doSetRollbackOnly(status);
}
//...省略日志
}
// 如果不在全局回滚标记失败时立即失败的情况下,不将未预期回滚标志传递下去
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
} catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
// 触发事务完成后置处理(Hook),标记为回滚完成
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// 如果存在未预期回滚标记,抛出 UnexpectedRollbackException 异常
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
} finally {
// 清理事务状态
cleanupAfterCompletion(status);
}
}
可以看到根据事务传播级别进行不同回滚处理,当在回滚完成后,最终都会调用cleanupAfterCompletion
清理事务状态
cleanupAfterCompletion
代码如下:
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
// 标记事务已完成
status.setCompleted();
// 如果是新的同步,清除当前线程上下文中的事务同步信息
// 外部事务:任务事务传播级别
// 嵌套事务:not_supported、require_new
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
// 如果是新事务,执行清理操作
//外部事务:required、requires_new、nested
//嵌套事务:requires_new
if (status.isNewTransaction()) {
//对数据源连接清理
//包括解除数据源的线程绑定、重置连接状态、释放连接以及清除连接持有器的内容
doCleanupAfterCompletion(status.getTransaction());
}
// 如果存在挂起的资源(嵌套事务),恢复挂起的资源
if (status.getSuspendedResources() != null) {
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
// 恢复挂起的事务资源
// 将之前解绑并缓存在当前事务对象的线程资源ConnectionHolder 重新绑定回线程
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
简单来说标记事务已完成,并根据不同的传播级别进行相应的清理操作。对于新的同步,清除当前线程上下文中的事务同步信息。对于新事务,执行数据源连接的清理操作。对于存在挂起资源的情况,恢复这些资源,确保外部事务得到正确处理。
六、事务提交
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
//事务管理进行提交
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
在这段代码中,当事务信息和事务状态不为空时,会触发事务提交。通过事务信息中的事务管理器获取事务状态,并调用其 commit()
方法来完成实际的事务提交操作。
接下来,我们将深入了解事务提交的整体流程,主要涉及到抽象父类 AbstractPlatformTransactionManager
的 commit()
方法,代码如下:
@Override
public final void commit(TransactionStatus status) throws TransactionException {
// 如果事务已经完成,则抛出异常,不允许多次调用 commit 或 rollback
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
// 检查事务状态中,rollbackOnly属性是否设置只回滚
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
// 执行事务回滚
// 前文已经分析过,这里不再展开
processRollback(defStatus, false);
return;
}
// shouldCommitOnGlobalRollbackOnly 全局标记回滚是否进行提交,默认false
// isGlobalRollbackOnly 检查事务对象中,rollbackOnly属性是否设置只回滚
// 通过之前嵌套事务分析,当发生回滚时,会标志为全局回滚为true
// 那么外部事务就可以通过全局回滚标识,执行回滚操作
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
// 处理全局回滚标记,执行回滚操作
processRollback(defStatus, true);
return;
}
// 事务提交
processCommit(defStatus);
}
上述代码逻辑很简单,spring事务的commit()
实际上包含了回滚和提交操作。在处理事务回滚时,回滚条件主要分为两种。一种根据事务状态的属性rollbackOnly
判断进行回滚,另一种根据事务对象的属性rollbackOnly
判断是否进行回滚。两者主要的区别是:事务状态的回滚并不会抛出异常,而事务对象的回滚则会将异常抛给调用者。
接下我们看看processCommit(defStatus)
事务提交逻辑
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
//事务管理器子策略在事务同步提交前置处理(Hook), 实现扩展操作
prepareForCommit(status);
//事务同步提交的前置处理
triggerBeforeCommit(status);
//事务同步提交的前置完成处理
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
//存在保存点,则清理清理保存点
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
// 外部事务:事务传播级别 required、nested、requires_new
// 嵌套事务:事务传播级别 requires_new
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
//真正进行事务提交,例如 mysql数据库,调用connection#commit进行事务提交
doCommit(status);
} else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
} catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
//事务同步提交的后置处理
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
} catch (TransactionException ex) {
// can only be caused by doCommit
//isRollbackOnCommitFailure 默认为false
if (isRollbackOnCommitFailure()) {
//当connection#commit失败抛SQLException异常时,进行事务回滚
doRollbackOnCommitException(status, ex);
} else {
//事务同步提交的后置处理
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
} catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
//其他异常,进行事务回滚
doRollbackOnCommitException(status, ex);
throw ex;
}
//事务同步提交的后置处理操作
try {
triggerAfterCommit(status);
} finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
} finally {
//清理事务状态,前文已分析过
cleanupAfterCompletion(status);
}
}
说白了上述代码也不难,主要逻辑就是调用doCommit(status)
对数据源连接进行提交,从而实现了数据的持久化
七、小结
本文深入剖析了 Spring 框架事务的核心流程和关键代码,主要围绕事务的传播机制、异常回滚、事务提交等方面展开了详细的解析。通过对 Spring 事务管理的内部原理进行剖析,我们可以更好地理解 Spring 如何实现事务的控制和管理,以及其背后的设计理念。
抛一个小问题,不知小伙伴们对spring事务能否在多线程执行有何见解?欢迎在评论区进行探讨,后续将会对这块内容分析,敬请期待~