Spring事务注解切面实现类

我爱海鲸 2024-05-08 11:29:25 暂无标签

简介记录一位同事的spring切面问题

Spring事务注解切面实现类

org.springframework.transaction.interceptor.TransactionAspectSupport

入口方法:

@Nullable
    protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,final InvocationCallback invocation) throws Throwable {
​
        // If the transaction attribute is null, the method is non-transactional.
        TransactionAttributeSource tas = getTransactionAttributeSource();
        final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
        final TransactionManager tm = determineTransactionManager(txAttr);
​
        /**
        *...忽略响应式框架逻辑
        */
        // 判断spring事务管理器是否合法
        PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
​
        if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
            // Standard transaction demarcation with getTransaction and commit/rollback calls.
            // 生成事务信息 同时获取数据库连接
            TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
​
            Object retVal;
            try {
                // 执行被代理的目标方法
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // 目标方法报错
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                //结束逻辑
                cleanupTransactionInfo(txInfo);
            }
​
            if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                // Set rollback-only in case of Vavr failure matching our rollback rules...
                TransactionStatus status = txInfo.getTransactionStatus();
                if (status != null && txAttr != null) {
                    retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
                }
            }
​
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }
        /**
        * 忽略txAttr为null的逻辑
        */
    }

createTransactionIfNecessary方法

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
            @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
​
        // If no name specified, apply method identification as transaction name.
        if (txAttr != null && txAttr.getName() == null) {
            txAttr = new DelegatingTransactionAttribute(txAttr) {
                @Override
                public String getName() {
                    return joinpointIdentification;
                }
            };
        }
​
        TransactionStatus status = null;
        if (txAttr != null) {
            if (tm != null) {
                // 生成事务状态,里面有个关键值 ConnectionHolder(重点)
                status = tm.getTransaction(txAttr);
            }
            else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                            "] because no transaction manager has been configured");
                }
            }
        }
        // 生成TransactionInfo 并将上面的status绑定在一起
        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    }
 

事务管理器

org.springframework.transaction.support.AbstractPlatformTransactionManager

获取事务状态方法 getTransaction

@Override
    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
            throws TransactionException {
​
        // Use defaults if no transaction definition given.
        TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
​
        Object transaction = doGetTransaction();
        boolean debugEnabled = logger.isDebugEnabled();
​
        // 当前线程若已经存在事务,则走存在事务的逻辑
        if (isExistingTransaction(transaction)) {
            // Existing transaction found -> check propagation behavior to find out how to behave.
            return handleExistingTransaction(def, transaction, debugEnabled);
        }
​
        //判断事务是否超时
        if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
        }
​
        //判断是否支持事务  即传播属性为Propagation.MANDATORY直接抛出异常
        if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        // REQUIRED、REQUIRES_NEW、NESTED三种事务传播方式的实现逻辑
        else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
            }
            try {
                // 生成TransactionStatus 同时根据DataSource获取Connection
                return startTransaction(def, transaction, debugEnabled, suspendedResources);
            }
            catch (RuntimeException | Error ex) {
                resume(null, suspendedResources);
                throw ex;
            }
        }
        // 这个暂时没去了解
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + def);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
        }
    }

生成TransactionStatus方法 startTransaction

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
            boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
​
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        DefaultTransactionStatus status = newTransactionStatus(
                definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
        // doBegin是核心方法,包括数据库连接的获取 以及当前线程的数据源、数据连接的绑定
        doBegin(transaction, definition);
        prepareSynchronization(status, definition);
        return status;
    }
 

当当前线程已存在ConnectHolder调用的方法 handleExistingTransaction

private TransactionStatus handleExistingTransaction(
            TransactionDefinition definition, Object transaction, boolean debugEnabled)
            throws TransactionException {
​
        // 判断是否支持事务 即Propagation.NEVER 直接抛出异常
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
            throw new IllegalTransactionStateException(
                    "Existing transaction found for transaction marked with propagation 'never'");
        }
        // 判断是否支持事务 即Propagation.NOT_SUPPORTED 直接抛出异常
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction");
            }
            Object suspendedResources = suspend(transaction);
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(
                    definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }
        // 判断是否开启新事务 即Propagation.REQUIRES_NEW
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction, creating new transaction with name [" +
                        definition.getName() + "]");
            }
            // 删除当前线程的事务信息 即TransactionSynchronizationManager之前保存过的信息
            SuspendedResourcesHolder suspendedResources = suspend(transaction);
            try {
                return startTransaction(definition, transaction, debugEnabled, suspendedResources);
            }
            catch (RuntimeException | Error beginEx) {
                resumeAfterBeginException(transaction, suspendedResources, beginEx);
                throw beginEx;
            }
        }
        // 判断是否嵌套事务 即Propagation.NESTED
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            if (!isNestedTransactionAllowed()) {
                throw new NestedTransactionNotSupportedException(
                        "Transaction manager does not allow nested transactions by default - " +
                        "specify 'nestedTransactionAllowed' property with value 'true'");
            }
            if (debugEnabled) {
                logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
            }
            // 判断是否支持savePoint
            if (useSavepointForNestedTransaction()) {
                // Create savepoint within existing Spring-managed transaction,
                // through the SavepointManager API implemented by TransactionStatus.
                // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
                DefaultTransactionStatus status =
                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                status.createAndHoldSavepoint();
                return status;
            }
            else {
                // 不支持的话 直接开启新事物
                // Nested transaction through nested begin and commit/rollback calls.
                // Usually only for JTA: Spring synchronization might get activated here
                // in case of a pre-existing JTA transaction.
                return startTransaction(definition, transaction, debugEnabled, null);
            }
        }
​
        /**
        * 忽略部分逻辑
        **/
    
        // Propagation.REQUIRED直接走到这
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }

 

数据源事务管理器

org.springframework.jdbc.datasource.DataSourceTransactionManager

同时是org.springframework.transaction.support.AbstractPlatformTransactionManager的实现类

开启事务方法 doBegin

  
  @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;
​
        try {
            // 判断是否已经存在数据库连接,第一次进来必定会null,所以会生成一个连接
            if (!txObject.hasConnectionHolder() ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                // 从数据源里获取连接
                Connection newCon = obtainDataSource().getConnection();
                if (logger.isDebugEnabled()) {
                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
                // 用ConnectionHolder去包装数据库连接
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }
​
            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();
​
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);
            txObject.setReadOnly(definition.isReadOnly());
​
            // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
            // so we don't want to do it unnecessarily (for example if we've explicitly
            // configured the connection pool to set it already).
            // 判断数据源书否支持自动提交,如果是,则改为否。交给spring去手动提交,而不是让ibatis自己提交,如果不设置取消自动提交,后面通过mybatis的时候,会自动提交sql,这样spring事务会失效
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                if (logger.isDebugEnabled()) {
                    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }
                con.setAutoCommit(false);
            }
​
            prepareTransactionalConnection(con, definition);
            txObject.getConnectionHolder().setTransactionActive(true);
​
            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }
​
            // Bind the connection holder to the thread.
            // 关键方法,将ConnectHolder与当前线程绑定,key值为当前使用的数据源,value值为ConnectHolder,
            if (txObject.isNewConnectionHolder()) {
                // TransactionSynchronizationManager这个类是spring控制ibatis的事务提交关键
                TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
            }
        }
​
        catch (Throwable ex) {
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, obtainDataSource());
                txObject.setConnectionHolder(null, false);
            }
            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
    }

 

测试验证代码:

/**
* 当前数据库id=1的username为13022043489-del-1691402199.643
**/
@GetMapping("/test")
    @Transactional(rollbackFor = Exception.class)
    @UnAuth
    public AjaxResult<?> test() {
        // 01
        customerService.update(Wrappers.lambdaUpdate(CustomerDO.class)
                .set(CustomerDO::getUserName, "13022043489-del-1691402199.645")
                .eq(CustomerDO::getId, 1L)
        );
        try{
            // 02
            customerService.test();
        }catch (Exception e){
​
        }
        return AjaxResult.success();
    }
​
// customerService.test()方法
@Override
    @Transactional(rollbackFor = Exception.class,propagation = Propagation.NESTED)
    public void test() {
        this.update(Wrappers.lambdaUpdate(CustomerDO.class)
                .set(CustomerDO::getUserName, "13022043489-del-1691402199.644")
                .eq(CustomerDO::getId, 1L)
        );
        if(true) throw new RuntimeException("测试");
    }

 

当用postman请求test接口会先进入

org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction

先生成一次TranscationInfo,然后进入目标方法 即测试代码中的 01注释的位置

当走到注释02时候,再次进入

org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction

与第一次有所不同的是,当前线程threalocal存在上一次的TranscationInfo,所以会走

org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction

做一次事务传递。这时候根据customerService.test()方法的事务传播属性,做不同的调整。

  • Propagation.REQUIRED

    继续使用第一次生成的TranscationInfo

  • Propagation.REQUIRES_NEW

    清除线程中绑定的一次生成的TranscationInfo,绑定一个新的上去

  • Propagation.NESTED

    在第一次生成的TranscationInfo基础上,加入savePoint

你好:我的2025

上一篇:我的故事

下一篇:vue3学习之自定义Hooks