Spring编程式事务原理
类型 | 说明 | 举例 |
---|---|---|
PROPAGATION_REQUIRED | 支持当前事务 如果不存,就建一个(默认的) | A,B,如果A有事务,则B使用A事务。如果A没有事务,则B开启一个新的事务。(A,B在同一个事务中) |
PROPAGATION_SUPPORTS | 支持当前事务 如果不存在,就不使用事务 | A,B,如果A有事务,则B就使用A的事务。如果A没有事务,则B就不使用事务 |
PROPAGATION_MANDATORY | 支持当前事务 如果不存在,抛出异常 | A,B,如果A有事务,则B就使用A的事务。如果A没有事务,则B抛出异常 |
PROPAGATION_REQUIRES_NEW | 如果有事务存在 挂起当前事务,创建一个新的事务 | A,B,如果A有事务,B将A的事务挂起,并重新创建一个事务(A,B不在一个事务中,两个事务互不影响) |
PROPAGATION_NOT_SUPPORTED | 以非事务方式运行 如果有事务存在,挂起当前事务 | A,B,非事务方式运行,A有事务就挂起事务 |
PROPAGATION_NEVER | 以非事务方式运行 如果有事务存在,抛出异常 | 总是以非事务运行 |
PROPAGATION_NESTED | 如果当前事务存在,则嵌套事务执行 | 基于SavePonit技术。(保存点) A,B,A有事务,A执行之后,将A事务执行之后的内容保存到SavePonit。B事务有异常的情况,用户需要自己设置事务是提交还是回滚到保存点 |
@Autowired
private DataSourceTransactionManager dstManager;
@Autowired
private UserDao userDao;
public void updateTest(){
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionStatus transaction= dstManager.getTransaction(def);
try{
userDao.updateTest();
}catch(Exception e){
log.error(e.toString(),e);
dstManager.rollback(transaction);
}
dstManager.commit(transaction);
}
DataSourceTransactionManager
是PlatformTransactionManager
的一个实现。PlatformTransactionManager
是Spring事务处理抽象层中的管理器。
统一一致的事务抽象是Spring框架的一大优势,无论是全局事务还是本地事务,JTA、JDBC、Hibernate还是JPA,Spring都使用统一的编程模型,使得应用程序可以很容易地在全局事务与本地事务,或者不同的事务框架之间进行切换。下图是Spring事务抽象的核心类图:
PlatformTransactionManager:事务管理器
getTransaction方法:事务获取操作,根据事务属性定义,获取当前事务或者创建新事物;
commit方法:事务提交操作,注意这里所说的提交并非直接提交事务,而是根据当前事务状态执行提交或者回滚操作;
rollback方法:事务回滚操作,同样,也并非一定直接回滚事务,也有可能只是标记事务为只读,等待其他调用方执行回滚。
TransactionDefinition:事务属性定义
getPropagationBehavior方法:返回事务的传播属性,默认是PROPAGATION_REQUIRED;
getIsolationLevel方法:返回事务隔离级别,事务隔离级别只有在创建新事务时才有效,也就是说只对应传播属性PROPAGATION_REQUIRED和PROPAGATION_REQUIRES_NEW;
getTimeout方法:返回事务超时时间,以秒为单位,同样只有在创建新事务时才有效;
isReadOnly方法:是否优化为只读事务,支持这项属性的事务管理器会将事务标记为只读,只读事务不允许有写操作,不支持只读属性的事务管理器需要忽略这项设置,这一点跟其他事务属性定义不同,针对其他不支持的属性设置,事务管理器应该抛出异常。
getName方法:返回事务名称,声明式事务中默认值为“类的完全限定名.方法名”。
TransactionStatus:当前事务状态
isNewTransaction方法:当前方法是否创建了新事务(区别于使用现有事务以及没有事务);
hasSavepoint方法:在嵌套事务场景中,判断当前事务是否包含保存点;
setRollbackOnly和isRollbackOnly方法:只读属性设置(主要用于标记事务,等待回滚)和查询;
flush方法:刷新底层会话中的修改到数据库,一般用于刷新如Hibernate/JPA的会话,是否生效由具体事务资源实现决定;
isCompleted方法:判断当前事务是否已完成(已提交或者已回滚)。
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
// 创建DataSourceTransactionObject
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
// 已有事务逻辑,通过handleExistingTransaction方法处理
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// 以下是当前无事务的情况
// - PROPAGATION_MANDATORY:当前事务不存在,抛出异常
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 无事务便创建事务
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
// 无事务不需创建
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
org.springframework.jdbc.datasource.DataSourceTransactionManager#isExistingTransaction
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
TransactionSynchronizationManager
是事务同步管理器,TransactionSynchronizationManager通过ThreadLocal对象在当前线程记录了resources和synchronizations属性。resources是一个HashMap,用于记录当前参与事务的事务资源,方便进行事务同步,在DataSourceTransactionManager的例子中就是以dataSource作为key,保存了数据库连接,这样在同一个线程中,不同的方法调用就可以通过dataSource获取相同的数据库连接,从而保证所有操作在一个事务中进行。
org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// PROPAGATION_NEVER 直接抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// PROPAGATION_NOT_SUPPORTED suspend挂起当前事务,当前方法不使用事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// PROPAGATION_REQUIRES_NEW suspend挂起当前事务,doBegin创建一个新事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// PROPAGATION_NESTED 嵌套事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
// PROPAGATION_REQUIRED、PROPAGATION_SUPPORTS、PROPAGATION_MANDATORY 使用当前事务
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}