Spring编程式事务原理

类型 说明 举例
PROPAGATION_REQUIRED 支持当前事务 如果不存,就建一个(默认的) A,B,如果A有事务,则B使用A事务。如果A没有事务,则B开启一个新的事务。(A,B在同一个事务中)
PROPAGATION_SUPPORTS 支持当前事务 如果不存在,就不使用事务 A,B,如果A有事务,则B就使用A的事务。如果A没有事务,则B就不使用事务
PROPAGATION_MANDATORY 支持当前事务 如果不存在,抛出异常 A,B,如果A有事务,则B就使用A的事务。如果A没有事务,则B抛出异常
PROPAGATION_REQUIRES_NEW 如果有事务存在 挂起当前事务,创建一个新的事务 A,B,如果A有事务,B将A的事务挂起,并重新创建一个事务(A,B不在一个事务中,两个事务互不影响)
PROPAGATION_NOT_SUPPORTED 以非事务方式运行 如果有事务存在,挂起当前事务 A,B,非事务方式运行,A有事务就挂起事务
PROPAGATION_NEVER 以非事务方式运行 如果有事务存在,抛出异常 总是以非事务运行
PROPAGATION_NESTED 如果当前事务存在,则嵌套事务执行 基于SavePonit技术。(保存点) A,B,A有事务,A执行之后,将A事务执行之后的内容保存到SavePonit。B事务有异常的情况,用户需要自己设置事务是提交还是回滚到保存点
@Autowired
private DataSourceTransactionManager dstManager;
@Autowired
private UserDao userDao;

public void updateTest(){
  DefaultTransactionDefinition def = new DefaultTransactionDefinition();
  def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
  TransactionStatus transaction= dstManager.getTransaction(def);
  try{
    userDao.updateTest();
  }catch(Exception e){
    log.error(e.toString(),e);
    dstManager.rollback(transaction);
  }
  dstManager.commit(transaction);
}

DataSourceTransactionManagerPlatformTransactionManager的一个实现。PlatformTransactionManager是Spring事务处理抽象层中的管理器。

统一一致的事务抽象是Spring框架的一大优势,无论是全局事务还是本地事务,JTA、JDBC、Hibernate还是JPA,Spring都使用统一的编程模型,使得应用程序可以很容易地在全局事务与本地事务,或者不同的事务框架之间进行切换。下图是Spring事务抽象的核心类图:

PlatformTransactionManager

PlatformTransactionManager:事务管理器
getTransaction方法:事务获取操作,根据事务属性定义,获取当前事务或者创建新事物;
commit方法:事务提交操作,注意这里所说的提交并非直接提交事务,而是根据当前事务状态执行提交或者回滚操作;
rollback方法:事务回滚操作,同样,也并非一定直接回滚事务,也有可能只是标记事务为只读,等待其他调用方执行回滚。

TransactionDefinition:事务属性定义
getPropagationBehavior方法:返回事务的传播属性,默认是PROPAGATION_REQUIRED;
getIsolationLevel方法:返回事务隔离级别,事务隔离级别只有在创建新事务时才有效,也就是说只对应传播属性PROPAGATION_REQUIRED和PROPAGATION_REQUIRES_NEW;
getTimeout方法:返回事务超时时间,以秒为单位,同样只有在创建新事务时才有效;
isReadOnly方法:是否优化为只读事务,支持这项属性的事务管理器会将事务标记为只读,只读事务不允许有写操作,不支持只读属性的事务管理器需要忽略这项设置,这一点跟其他事务属性定义不同,针对其他不支持的属性设置,事务管理器应该抛出异常。
getName方法:返回事务名称,声明式事务中默认值为“类的完全限定名.方法名”。

TransactionStatus:当前事务状态
isNewTransaction方法:当前方法是否创建了新事务(区别于使用现有事务以及没有事务);
hasSavepoint方法:在嵌套事务场景中,判断当前事务是否包含保存点;
setRollbackOnly和isRollbackOnly方法:只读属性设置(主要用于标记事务,等待回滚)和查询;
flush方法:刷新底层会话中的修改到数据库,一般用于刷新如Hibernate/JPA的会话,是否生效由具体事务资源实现决定;
isCompleted方法:判断当前事务是否已完成(已提交或者已回滚)。

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
    // 创建DataSourceTransactionObject
		Object transaction = doGetTransaction();

		// Cache debug flag to avoid repeated checks.
		boolean debugEnabled = logger.isDebugEnabled();

		if (definition == null) {
			// Use defaults if no transaction definition given.
			definition = new DefaultTransactionDefinition();
		}

    // 已有事务逻辑,通过handleExistingTransaction方法处理
		if (isExistingTransaction(transaction)) {
			// Existing transaction found -> check propagation behavior to find out how to behave.
			return handleExistingTransaction(definition, transaction, debugEnabled);
		}

		// Check definition settings for new transaction.
		if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
			throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
		}

    // 以下是当前无事务的情况
    // - PROPAGATION_MANDATORY:当前事务不存在,抛出异常
		// No existing transaction found -> check propagation behavior to find out how to proceed.
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
			throw new IllegalTransactionStateException(
					"No existing transaction found for transaction marked with propagation 'mandatory'");
		}
    // 无事务便创建事务
		else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			SuspendedResourcesHolder suspendedResources = suspend(null);
			if (debugEnabled) {
				logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
			}
			try {
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
				doBegin(transaction, definition);
				prepareSynchronization(status, definition);
				return status;
			}
			catch (RuntimeException | Error ex) {
				resume(null, suspendedResources);
				throw ex;
			}
		}
    // 无事务不需创建
		else {
			// Create "empty" transaction: no actual transaction, but potentially synchronization.
			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
				logger.warn("Custom isolation level specified but no actual transaction initiated; " +
						"isolation level will effectively be ignored: " + definition);
			}
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
		}
	}
org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction

	protected Object doGetTransaction() {
		DataSourceTransactionObject txObject = new DataSourceTransactionObject();
		txObject.setSavepointAllowed(isNestedTransactionAllowed());
		ConnectionHolder conHolder =
				(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
		txObject.setConnectionHolder(conHolder, false);
		return txObject;
	}

org.springframework.jdbc.datasource.DataSourceTransactionManager#isExistingTransaction

	protected boolean isExistingTransaction(Object transaction) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
	}

TransactionSynchronizationManager 是事务同步管理器,TransactionSynchronizationManager通过ThreadLocal对象在当前线程记录了resources和synchronizations属性。resources是一个HashMap,用于记录当前参与事务的事务资源,方便进行事务同步,在DataSourceTransactionManager的例子中就是以dataSource作为key,保存了数据库连接,这样在同一个线程中,不同的方法调用就可以通过dataSource获取相同的数据库连接,从而保证所有操作在一个事务中进行。

org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction

private TransactionStatus handleExistingTransaction(
		TransactionDefinition definition, Object transaction, boolean debugEnabled)
		throws TransactionException {
   // PROPAGATION_NEVER 直接抛出异常
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
		throw new IllegalTransactionStateException(
				"Existing transaction found for transaction marked with propagation 'never'");
	}
  	// PROPAGATION_NOT_SUPPORTED suspend挂起当前事务,当前方法不使用事务
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
		if (debugEnabled) {
			logger.debug("Suspending current transaction");
		}
		Object suspendedResources = suspend(transaction);
		boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
		return prepareTransactionStatus(
				definition, null, false, newSynchronization, debugEnabled, suspendedResources);
	}
  // PROPAGATION_REQUIRES_NEW suspend挂起当前事务,doBegin创建一个新事务
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
		if (debugEnabled) {
			logger.debug("Suspending current transaction, creating new transaction with name [" +
					definition.getName() + "]");
		}
		SuspendedResourcesHolder suspendedResources = suspend(transaction);
		try {
			boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
			DefaultTransactionStatus status = newTransactionStatus(
					definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
			doBegin(transaction, definition);
			prepareSynchronization(status, definition);
			return status;
		}
		catch (RuntimeException | Error beginEx) {
			resumeAfterBeginException(transaction, suspendedResources, beginEx);
			throw beginEx;
		}
	}
  // PROPAGATION_NESTED 嵌套事务
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
		if (!isNestedTransactionAllowed()) {
			throw new NestedTransactionNotSupportedException(
					"Transaction manager does not allow nested transactions by default - " +
					"specify 'nestedTransactionAllowed' property with value 'true'");
		}
		if (debugEnabled) {
			logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
		}
		if (useSavepointForNestedTransaction()) {
			// Create savepoint within existing Spring-managed transaction,
			// through the SavepointManager API implemented by TransactionStatus.
			// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
			DefaultTransactionStatus status =
					prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
			status.createAndHoldSavepoint();
			return status;
		}
     // PROPAGATION_REQUIRED、PROPAGATION_SUPPORTS、PROPAGATION_MANDATORY 使用当前事务
		else {
			// Nested transaction through nested begin and commit/rollback calls.
			// Usually only for JTA: Spring synchronization might get activated here
			// in case of a pre-existing JTA transaction.
			boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
			DefaultTransactionStatus status = newTransactionStatus(
					definition, transaction, true, newSynchronization, debugEnabled, null);
			doBegin(transaction, definition);
			prepareSynchronization(status, definition);
			return status;
		}
	}

	// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
	if (debugEnabled) {
		logger.debug("Participating in existing transaction");
	}
	if (isValidateExistingTransaction()) {
		if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
			Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
			if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
				Constants isoConstants = DefaultTransactionDefinition.constants;
				throw new IllegalTransactionStateException("Participating transaction with definition [" +
						definition + "] specifies isolation level which is incompatible with existing transaction: " +
						(currentIsolationLevel != null ?
								isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
								"(unknown)"));
			}
		}
		if (!definition.isReadOnly()) {
			if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
				throw new IllegalTransactionStateException("Participating transaction with definition [" +
						definition + "] is not marked as read-only but existing transaction is");
			}
		}
	}
	boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
	return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

【技术干货】Spring事务原理一探

spring事务传播性源码解析

Spring事务源码解析


Spring编程式事务原理
https://blog.yjll.blog/post/65703737.html
作者
简斋
发布于
2020年11月6日
许可协议