日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無先后,達者為師

網(wǎng)站首頁 編程語言 正文

Spring在多線程環(huán)境下如何確保事務(wù)一致性

作者:莽夫三拳有點疼 更新時間: 2024-04-08 編程語言

1.問題在現(xiàn)
我先把問題拋出來,大家就明白本文目的在于解決什么樣的業(yè)務(wù)痛點了:

public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
    //1.查詢出當前資源模塊下所有資源,查詢出來后進行刪除
    deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService);
    //2.查詢出當前資源模塊下所有子模塊,遞歸查詢,當刪除完所有子模塊下的資源后,再刪除所有子模塊,最終刪除當前資源模塊
    deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService);
    //3.刪除當前資源模塊
    removeById(authorityModuleId);
}

如果我希望將步驟1和步驟2并行執(zhí)行,然后確保步驟1和步驟2執(zhí)行成功后,再執(zhí)行步驟3,等到步驟3執(zhí)行完畢后,再提交全部事務(wù),這個需求該如何實現(xiàn)呢?

2.如何解決異步執(zhí)行

上面需求第一點是: 如何讓任務(wù)異步并行執(zhí)行,如何實現(xiàn)二元依賴呢?

說到異步執(zhí)行,很多小伙伴首先想到Spring中提供的@Async注解,但是Spring提供的異步執(zhí)行任務(wù)能力并不足以解決我們當前的需求。

@Async注解原理簡單來說,就是掃描IOC中的bean,給方法上標注有@Async注解的bean進行代理,代理的核心是添加一個MethodInterceptor即AsyncExecutionInterceptor,該方法攔截器負責(zé)將方法真正的執(zhí)行包裝為任務(wù),放入線程池中執(zhí)行。

下面我們先使用CompletableFuture來完成我們第一步需求:

public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
    CompletableFuture.runAsync(()->{
        //兩個并行執(zhí)行的任務(wù)
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() ->
                deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService),executor);
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() ->
                deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService), executor);
        //等待兩個并行任務(wù)執(zhí)行完后,再執(zhí)行最后一個步驟
        CompletableFuture.allOf(future1,future2).thenRun(()->removeById(authorityModuleId));
    },executor);
}

3.多線程環(huán)境下如何確保事務(wù)一致性

我們已經(jīng)完成了任務(wù)的異步執(zhí)行化,那么又如何確保多線程環(huán)境下的事務(wù)一致性問題呢?

public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
    CompletableFuture.runAsync(()->{
        //兩個并行執(zhí)行的任務(wù)
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() ->
                deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService),executor);
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() ->
                deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService), executor);
        //等待兩個并行任務(wù)執(zhí)行完后,再執(zhí)行最后一個步驟
        CompletableFuture.allOf(future1,future2).thenRun(()->removeById(authorityModuleId));
    },executor);
}

在Spring環(huán)境下說到事務(wù)控制,大家第一反應(yīng)就想到使用@Transactional注解解決問題,但是這里顯然行不通,為什么行不通呢?

我還是簡單的對Spring事務(wù)實現(xiàn)原理進行一番概括:

4.事務(wù)王國回顧

事務(wù)管理大體分為三個流程: 事務(wù)創(chuàng)建 ,事務(wù)執(zhí)行,事務(wù)結(jié)束

事務(wù)創(chuàng)建涉及到一些屬性的配置,如:

  • 事務(wù)的隔離級別
  • 事務(wù)的傳播行為
  • 事務(wù)的超時時間
  • 是否為只讀事務(wù)

由于涉及屬性頗多,并且后期還有可能進行擴展,因此必須通過一個類來封裝這些屬性,在Spring中對應(yīng)TransactionDefinition。

有了事務(wù)相關(guān)屬性定義后,我們就可以利用TransactionDefinition來創(chuàng)建一個事務(wù)了,在Spring中局部事務(wù)由PlatformTransactionManager負責(zé)管理,創(chuàng)建事務(wù)也是由PlatformTransactionManager負責(zé)提供:

TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
   throws TransactionException;

如果我們希望追蹤事務(wù)的狀態(tài),例如: 事務(wù)已完成,事務(wù)回滾等,那么就需要一個事務(wù)狀態(tài)類貫穿當前事務(wù)的執(zhí)行流程,在Spring中由TransactionStatus負責(zé)完成。

對于常見的數(shù)據(jù)源而言,通常需要記錄的事務(wù)狀態(tài)有如下幾點:

  • 當前事務(wù)是否是新事務(wù)
  • 當前事務(wù)是否結(jié)束
  • 當前事務(wù)是否需要回滾(通過標記來判斷,因此我也可以在業(yè)務(wù)流程中手動設(shè)置標記為true,來讓事務(wù)在沒有發(fā)生異常的情況下進行回滾)
  • 當前事務(wù)是否設(shè)置了回滾點(savePoint)

事務(wù)的執(zhí)行過程就是具體業(yè)務(wù)代碼的執(zhí)行流程,這里就不多說了。

事務(wù)的結(jié)束分為兩種情況: 需要進行事務(wù)回滾或者事務(wù)正常提交,如果是事務(wù)回滾,還需要判斷TransactionStatus 中的savePoint是否被設(shè)置了。

5.事務(wù)實現(xiàn)方式回顧

Spring中常見的事務(wù)實現(xiàn)方式有兩種: 編程式和聲明式。

編程式事務(wù)使用是本文重點,因此這里按下不表,我們先來復(fù)習(xí)一下聲明式事務(wù)的使用。

聲明式事務(wù)就是使用我們常見的@Transactional注解完成的,聲明式事務(wù)優(yōu)點就在于讓事務(wù)代碼與業(yè)務(wù)代碼解耦,通過Spring中提供的聲明式事務(wù)使用,我們也可以發(fā)覺我們只需要編寫業(yè)務(wù)代碼即可,而事務(wù)的管理基本不需要我們操心,Spring就像使用了魔法一樣,幫我們自動完成了。

之所以那么神奇,本質(zhì)還是依靠Spring框架提供的Bean生命周期相關(guān)回調(diào)接口和AOP結(jié)合完成的,簡述如下:

  • 通過自動代理創(chuàng)建器依次嘗試為每個放入容器中的bean嘗試進行代理
    嘗試進行代理的過程對于事務(wù)管理來說,就是利用事務(wù)管理涉及到的增強器advisor,即TransactionAttributeSourceAdvisor
  • 判斷當前增強器是否能夠應(yīng)用與當前bean上,怎么判斷呢? —> advisor內(nèi)部的pointCut嘍 !
    如果能夠應(yīng)用,那么好,為當前bean創(chuàng)建代理對象返回,并且往代理對象內(nèi)部添加一個TransactionInterceptor攔截器。
  • 此時我們再從容器中獲取,拿到的就是代理對象了,當我們調(diào)用代理對象的方法時,首先要經(jīng)過代理對象內(nèi)部攔截器鏈的處理,處理完后,最終才會調(diào)用被代理對象的方法。(這里其實就是責(zé)任鏈模式的應(yīng)用)

對于被事務(wù)增強器TransactionAttributeSourceAdvisor代理的bean而言,代理對象內(nèi)部會存在一個TransactionInterceptor,該攔截器內(nèi)部構(gòu)造了一個事務(wù)執(zhí)行的模板流程:

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
   final InvocationCallback invocation) throws Throwable {
  //TransactionAttributeSource內(nèi)部保存著當前類某個方法對應(yīng)的TransactionAttribute---事務(wù)屬性源
  //可以看做是一個存放TransactionAttribute與method方法映射的池子
  TransactionAttributeSource tas = getTransactionAttributeSource();
  //獲取當前事務(wù)方法對應(yīng)的TransactionAttribute
  final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
  //定位TransactionManager
  final TransactionManager tm = determineTransactionManager(txAttr);
        .....
        //類型轉(zhuǎn)換為局部事務(wù)管理器
  PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
  final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

  if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
   //TransactionManager根據(jù)TransactionAttribute創(chuàng)建事務(wù)后返回
   //TransactionInfo封裝了當前事務(wù)的信息--包括TransactionStatus
   TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

   Object retVal;
   try {
    //繼續(xù)執(zhí)行過濾器鏈---過濾鏈最終會調(diào)用目標方法
    //因此可以理解為這里是調(diào)用目標方法
    retVal = invocation.proceedWithInvocation();
   }
   catch (Throwable ex) {
    //目標方法拋出異常則進行判斷是否需要回滾
    completeTransactionAfterThrowing(txInfo, ex);
    throw ex;
   }
   finally {
       //清除當前事務(wù)信息
    cleanupTransactionInfo(txInfo);
   }
            ...
            //正常返回,那么就正常提交事務(wù)唄(當然還是需要判斷TransactionStatus狀態(tài)先)
   commitTransactionAfterReturning(txInfo);
   return retVal;
  }
  ...

6.編程式事務(wù)

還記得本文一開始提出的業(yè)務(wù)需求嗎?

不清楚,可以回看一下,在上文,我們已經(jīng)解決了任務(wù)異步并行執(zhí)行的難題,下面我們需要解決的就是如何確保Spring在多線程環(huán)境下也能保持事務(wù)一致性。

通過上文對Spring事務(wù)基礎(chǔ)和聲明式事務(wù)的原理回顧,相信大家也發(fā)現(xiàn)了,聲明式事務(wù)并不能解決我們當前的問題,那么就只能求助于編程式事務(wù)了。

那么編程式事務(wù)是什么樣子呢?

其實上面TransactionInterceptor給出的那套模板流程,就是編程式事務(wù)使用的模范案例,我們可以簡化上面的模板流程,簡單使用如下:

public class TransactionMain {
    public static void main(String[] args) throws ClassNotFoundException, SQLException {
        test();
    }

    private static void test() {
        DataSource dataSource = getDS();
        JdbcTransactionManager jtm = new JdbcTransactionManager(dataSource);
        //JdbcTransactionManager根據(jù)TransactionDefinition信息來進行一些連接屬性的設(shè)置
        //包括隔離級別和傳播行為等
        DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
        //開啟一個新事務(wù)---此時autocommit已經(jīng)被設(shè)置為了false,并且當前沒有事務(wù),這里創(chuàng)建的是一個新事務(wù)
        TransactionStatus ts = jtm.getTransaction(transactionDef);
        //進行業(yè)務(wù)邏輯操作
        try {
            update(dataSource);
            jtm.commit(ts);
        }catch (Exception e){
            jtm.rollback(ts);
            System.out.println("發(fā)生異常,我已回滾");
        }
    }

    private static void update(DataSource dataSource) throws Exception {
        JdbcTemplate jt = new JdbcTemplate();
        jt.setDataSource(dataSource);
        jt.update("UPDATE Department SET Dname=\"大忽悠\" WHERE id=6");
        throw new Exception("我是來搗亂的");
    }
}

7.利用編程式事務(wù)解決問題

我們明白了編程式事務(wù)的使用,相信大家也都知道問題如何解決了,下面我給出一份看似正確的解決方案:

/**
 * 多線程事務(wù)一致性管理 <br>
 * 聲明式事務(wù)管理無法完成,此時我們只能采用初期的編程式事務(wù)管理才行
 * @author 大忽悠
 * @create 2022/10/19 21:34
 */
@Component
@RequiredArgsConstructor
public class MultiplyThreadTransactionManager {
    /**
     * 如果是多數(shù)據(jù)源的情況下,需要指定具體是哪一個數(shù)據(jù)源
     */
    private final DataSource dataSource;

    /**
     * 執(zhí)行的是無返回值的任務(wù)
     * @param tasks 異步執(zhí)行的任務(wù)列表
     * @param executor 異步執(zhí)行任務(wù)需要用到的線程池,考慮到線程池需要隔離,這里強制要求傳
     */
    public void runAsyncButWaitUntilAllDown(List<Runnable> tasks, Executor executor) {
        if(executor==null){
            throw new IllegalArgumentException("線程池不能為空");
        }
        DataSourceTransactionManager transactionManager = getTransactionManager();
        //是否發(fā)生了異常
        AtomicBoolean ex=new AtomicBoolean();

        List<CompletableFuture> taskFutureList=new ArrayList<>(tasks.size());
        List<TransactionStatus> transactionStatusList=new ArrayList<>(tasks.size());

        tasks.forEach(task->{
            taskFutureList.add(CompletableFuture.runAsync(
                    () -> {
                        try{
                            //1.開啟新事務(wù)
                            transactionStatusList.add(openNewTransaction(transactionManager));
                            //2.異步任務(wù)執(zhí)行
                            task.run();
                        }catch (Throwable throwable){
                            //打印異常
                            throwable.printStackTrace();
                            //其中某個異步任務(wù)執(zhí)行出現(xiàn)了異常,進行標記
                            ex.set(Boolean.TRUE);
                            //其他任務(wù)還沒執(zhí)行的不需要執(zhí)行了
                            taskFutureList.forEach(completableFuture -> completableFuture.cancel(true));
                        }
                    }
                    , executor)
            );
        });

        try {
            //阻塞直到所有任務(wù)全部執(zhí)行結(jié)束---如果有任務(wù)被取消,這里會拋出異常滴,需要捕獲
            CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        //發(fā)生了異常則進行回滾操作,否則提交
        if(ex.get()){
            System.out.println("發(fā)生異常,全部事務(wù)回滾");
            transactionStatusList.forEach(transactionManager::rollback);
        }else {
            System.out.println("全部事務(wù)正常提交");
            transactionStatusList.forEach(transactionManager::commit);
        }
    }

    private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) {
        //JdbcTransactionManager根據(jù)TransactionDefinition信息來進行一些連接屬性的設(shè)置
        //包括隔離級別和傳播行為等
        DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
        //開啟一個新事務(wù)---此時autocommit已經(jīng)被設(shè)置為了false,并且當前沒有事務(wù),這里創(chuàng)建的是一個新事務(wù)
        return transactionManager.getTransaction(transactionDef);
    }

    private DataSourceTransactionManager getTransactionManager() {
        return new DataSourceTransactionManager(dataSource);
    }
}

大家思考上面的代碼存在問題嗎?

測試:

public void test(){
    List<Runnable> tasks=new ArrayList<>();

    tasks.add(()->{
       userMapper.deleteById(26);
    });

    tasks.add(()->{
        signMapper.deleteById(10);
    });
    multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool());
}

任務(wù)正常都執(zhí)行完畢,事務(wù)進行提交,但是會拋出異常,導(dǎo)致事務(wù)回滾:

一次事務(wù)的完成通常都是默認在當前線程內(nèi)完成的,又因為一次事務(wù)的執(zhí)行過程中,涉及到對當前數(shù)據(jù)庫連接Connection的操作,因此為了避免將Connection在事務(wù)執(zhí)行過程中來回傳遞,我們可以將Connextion綁定到當前事務(wù)執(zhí)行線程對應(yīng)的ThreadLocalMap內(nèi)部,順便還可以將一些其他屬性也放入其中進行保存,在Spring中,負責(zé)保存這些ThreadLocal屬性的實現(xiàn)類由TransactionSynchronizationManager承擔。

TransactionSynchronizationManager類內(nèi)部默認提供了下面六個ThreadLocal屬性,分別保存當前線程對應(yīng)的不同事務(wù)資源:

//保存當前事務(wù)關(guān)聯(lián)的資源--默認只會在新建事務(wù)的時候保存當前獲取到的DataSource和當前事務(wù)對應(yīng)Connection的映射關(guān)系--當然這里Connection被包裝為了ConnectionHolder
 private static final ThreadLocal<Map<Object, Object>> resources =
   new NamedThreadLocal<>("Transactional resources");
    //事務(wù)監(jiān)聽者--在事務(wù)執(zhí)行到某個階段的過程中,會去回調(diào)監(jiān)聽者對應(yīng)的回調(diào)接口(典型觀察者模式的應(yīng)用)---默認為空集合
 private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
   new NamedThreadLocal<>("Transaction synchronizations");
   //見名知意: 存放當前事務(wù)名字
 private static final ThreadLocal<String> currentTransactionName =
   new NamedThreadLocal<>("Current transaction name");
   //見名知意: 存放當前事務(wù)是否是只讀事務(wù)
 private static final ThreadLocal<Boolean> currentTransactionReadOnly =
   new NamedThreadLocal<>("Current transaction read-only status");
   //見名知意: 存放當前事務(wù)的隔離級別
 private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
   new NamedThreadLocal<>("Current transaction isolation level");
   //見名知意: 存放當前事務(wù)是否處于激活狀態(tài)
 private static final ThreadLocal<Boolean> actualTransactionActive =
   new NamedThreadLocal<>("Actual transaction active");

那么上面拋出的異常的原因也就很清楚了,無法在main線程找到當前事務(wù)對應(yīng)的資源,原因如下:

開啟新事務(wù)時,事務(wù)相關(guān)資源都被綁定到了thread-cache-pool-1線程對應(yīng)的threadLocalMap內(nèi)部,而當執(zhí)行事務(wù)提交代碼時,commit內(nèi)部需要從TransactionSynchronizationManager中獲取當前事務(wù)的資源,顯然我們無法從main線程對應(yīng)的threadLocalMap中獲取到對應(yīng)的事務(wù)資源,這也就是異常拋出的原因。

8.問題分析完了,那么如何解決問題呢?

這里給出一個我首先想到的簡單粗暴的方法—CopyTransactionResource—將事務(wù)資源在兩個線程間來回復(fù)制

這里給出解決后問題后的代碼示例:

/**
 * 多線程事務(wù)一致性管理 <br>
 * 聲明式事務(wù)管理無法完成,此時我們只能采用初期的編程式事務(wù)管理才行
 * @author 大忽悠
 * @create 2022/10/19 21:34
 */
@Component
@RequiredArgsConstructor
public class MultiplyThreadTransactionManager {
    /**
     * 如果是多數(shù)據(jù)源的情況下,需要指定具體是哪一個數(shù)據(jù)源
     */
    private final DataSource dataSource;

    /**
     * 執(zhí)行的是無返回值的任務(wù)
     * @param tasks 異步執(zhí)行的任務(wù)列表
     * @param executor 異步執(zhí)行任務(wù)需要用到的線程池,考慮到線程池需要隔離,這里強制要求傳
     */
    public void runAsyncButWaitUntilAllDown(List<Runnable> tasks, Executor executor) {
        if(executor==null){
            throw new IllegalArgumentException("線程池不能為空");
        }
        DataSourceTransactionManager transactionManager = getTransactionManager();
        //是否發(fā)生了異常
        AtomicBoolean ex=new AtomicBoolean();

        List<CompletableFuture> taskFutureList=new ArrayList<>(tasks.size());
        List<TransactionStatus> transactionStatusList=new ArrayList<>(tasks.size());
        List<TransactionResource> transactionResources=new ArrayList<>(tasks.size());

        tasks.forEach(task->{
            taskFutureList.add(CompletableFuture.runAsync(
                    () -> {
                        try{
                            //1.開啟新事務(wù)
                            transactionStatusList.add(openNewTransaction(transactionManager));
                            //2.copy事務(wù)資源
                         transactionResources.add(TransactionResource.copyTransactionResource());
                            //3.異步任務(wù)執(zhí)行
                            task.run();
                        }catch (Throwable throwable){
                            //打印異常
                            throwable.printStackTrace();
                            //其中某個異步任務(wù)執(zhí)行出現(xiàn)了異常,進行標記
                            ex.set(Boolean.TRUE);
                            //其他任務(wù)還沒執(zhí)行的不需要執(zhí)行了
                            taskFutureList.forEach(completableFuture -> completableFuture.cancel(true));
                        }
                    }
                    , executor)
            );
        });

        try {
            //阻塞直到所有任務(wù)全部執(zhí)行結(jié)束---如果有任務(wù)被取消,這里會拋出異常滴,需要捕獲
            CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        //發(fā)生了異常則進行回滾操作,否則提交
        if(ex.get()){
            System.out.println("發(fā)生異常,全部事務(wù)回滾");
            for (int i = 0; i < tasks.size(); i++) {
                transactionResources.get(i).autoWiredTransactionResource();
                transactionManager.rollback(transactionStatusList.get(i));
                transactionResources.get(i).removeTransactionResource();
            }
        }else {
            System.out.println("全部事務(wù)正常提交");
            for (int i = 0; i < tasks.size(); i++) {
                transactionResources.get(i).autoWiredTransactionResource();
                transactionManager.commit(transactionStatusList.get(i));
                transactionResources.get(i).removeTransactionResource();
            }
        }
    }

    private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) {
        //JdbcTransactionManager根據(jù)TransactionDefinition信息來進行一些連接屬性的設(shè)置
        //包括隔離級別和傳播行為等
        DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
        //開啟一個新事務(wù)---此時autocommit已經(jīng)被設(shè)置為了false,并且當前沒有事務(wù),這里創(chuàng)建的是一個新事務(wù)
        return transactionManager.getTransaction(transactionDef);
    }

    private DataSourceTransactionManager getTransactionManager() {
        return new DataSourceTransactionManager(dataSource);
    }

    /**
     * 保存當前事務(wù)資源,用于線程間的事務(wù)資源COPY操作
     */
    @Builder
    private static class TransactionResource{
        //事務(wù)結(jié)束后默認會移除集合中的DataSource作為key關(guān)聯(lián)的資源記錄
        private  Map<Object, Object> resources = new HashMap<>();

        //下面五個屬性會在事務(wù)結(jié)束后被自動清理,無需我們手動清理
        private  Set<TransactionSynchronization> synchronizations =new HashSet<>();

        private  String currentTransactionName;

        private Boolean currentTransactionReadOnly;

        private Integer currentTransactionIsolationLevel;

        private Boolean actualTransactionActive;

        public static TransactionResource copyTransactionResource(){
            return TransactionResource.builder()
                    //返回的是不可變集合
                    .resources(TransactionSynchronizationManager.getResourceMap())
                    //如果需要注冊事務(wù)監(jiān)聽者,這里記得修改--我們這里不需要,就采用默認負責(zé)--spring事務(wù)內(nèi)部默認也是這個值
                    .synchronizations(new LinkedHashSet<>())
                    .currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName())
                    .currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly())
                    .currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel())
                    .actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive())
                    .build();
        }

        public void autoWiredTransactionResource(){
             resources.forEach(TransactionSynchronizationManager::bindResource);
             //如果需要注冊事務(wù)監(jiān)聽者,這里記得修改--我們這里不需要,就采用默認負責(zé)--spring事務(wù)內(nèi)部默認也是這個值
             TransactionSynchronizationManager.initSynchronization();
             TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
             TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);
             TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel);
             TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly);
        }

        public void removeTransactionResource() {
            //事務(wù)結(jié)束后默認會移除集合中的DataSource作為key關(guān)聯(lián)的資源記錄
            //DataSource如果重復(fù)移除,unbindResource時會因為不存在此key關(guān)聯(lián)的事務(wù)資源而報錯
            resources.keySet().forEach(key->{
                if(!(key instanceof  DataSource)){
                    TransactionSynchronizationManager.unbindResource(key);
                }
            });
        }
    }
}

增加異常拋出,測試是否能夠保證多線程間的事務(wù)一致性:

@SpringBootTest(classes = UserMain.class)
public class Test {
    @Resource
    private UserMapper userMapper;
    @Resource
    private SignMapper signMapper;
    @Resource
    private MultiplyThreadTransactionManager multiplyThreadTransactionManager;

    @SneakyThrows
    @org.junit.jupiter.api.Test
    public void test(){
        List<Runnable> tasks=new ArrayList<>();

        tasks.add(()->{
                userMapper.deleteById(26);
                throw new RuntimeException("我就要拋出異常!");
        });

        tasks.add(()->{
            signMapper.deleteById(10);
        });

        multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool());
    }
}

事務(wù)都進行了回滾,數(shù)據(jù)庫數(shù)據(jù)沒變。

9.小結(jié)

本文給出的只是一個方法,為了實現(xiàn)多線程事務(wù)一致性,我們還有很多方法,例如和本文一樣的思想,直接利用JDBC提供的API來手動控制事務(wù)提交和回滾,或者可以嘗試采用分布式事務(wù)的思路來解決問題。

大家之所以會被這個問題難住,主要是因為對Spring框架提供的便捷聲明式事務(wù)支持中毒太深,以至于腦海中對事務(wù)的認知完全停留在@Transactional注解的層面,多了解底層基礎(chǔ)設(shè)施,才能做到遇事不慌。

原文鏈接:https://blog.csdn.net/weixin_52100990/article/details/137210487

  • 上一篇:沒有了
  • 下一篇:沒有了
欄目分類
最近更新