日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

CompletableFuture解決多線程返回結果問題

作者:wl_Honest 更新時間: 2022-10-14 編程語言

什么是CompletableFuture?

在Java中CompletableFuture用于異步編程,異步編程是編寫非阻塞的代碼,運行的任務在一個單獨的線程,與主線程隔離,并且會通知主線程它的進度,成功或者失敗。

在這種方式中,主線程不會被阻塞,不需要一直等到子線程完成。主線程可以并行的執行其他任務。

使用這種并行方式,可以極大的提高程序的性能。

Future vs CompletableFuture

CompletableFuture 是?Future API的擴展。

Future 被用于作為一個異步計算結果的引用。提供一個?isDone()?方法來檢查計算任務是否完成。當任務完成時,get()?方法用來接收計算任務的結果。

從?Callbale和 Future 教程可以學習更多關于 Future 知識.

Future API 是非常好的 Java 異步編程進階,但是它缺乏一些非常重要和有用的特性。

CompletableFuture 實現了?Future?和?CompletionStage接口,并且提供了許多關于創建,鏈式調用和組合多個 Future 的便利方法集,而且有廣泛的異常處理支持。

之前項目中需要對一組集合進行處理,集合內的所有元素處理完后更新任務狀態。當時經過詢問得知可以用到CompletableFuture,于是經過短暫的研究寫了一個很粗略的代碼,如下:

public void calculateAvgSensorData() {
        //0.獲取同步鎖
        Boolean isLock = redisUtils.getLockWithExpire(LOCK_CAL_CABINET_AVG_ENV, LOCK_EXPIRE);
        if (!isLock) {
            log.warn("獲取互斥鎖: {}失敗", LOCK_CAL_CABINET_AVG_ENV);
            return;
        }
        //1.查詢昨日每個機柜一整天的環境數據均值,如果有報警,則用報警數據
        //1.1.獲取當前日期
        Date today = DateUtil.date();
        //1.2.獲取昨天日期
        Date yesterday = DateUtil.offsetDay(today, DAY_OFFSET);
        String queryDate = DateUtil.formatDate(yesterday);
        //1.3.查詢機房列表
        List<BaseProps> roomList = roomService.getAllByUsed(1);
        //1.4.查詢傳感器列表
        List<Sensor> sensorList = sensorService.list();
        if (CollectionUtil.isEmpty(roomList) || CollectionUtil.isEmpty(sensorList)) {
            return;
        }
        //1.5.記錄任務執行時間
        AutoTask autoTask = autoTaskService.getById(AutoTaskId.CAL_CABINET_AVG_ENV_TASK);
        autoTask.setTaskStatus(AutoTaskStatus.EXECUTING);
        HistoryAutoTask historyAutoTask = BeanUtil.copyProperties(autoTask, HistoryAutoTask.class);
        historyAutoTask.setExecuteTime(new Date());
        //記錄任務開始時間
        historyAutoTaskService.startAutoTask(historyAutoTask);
        //1.6.組裝待查詢的機房隊列
        roomList.forEach(room -> {
            queue.add(room);
        });

        for(int i = 0; i < MAX_THREAD; i++) {
            //創建異步執行任務
            CompletableFuture cf = CompletableFuture.runAsync(()->{
                do{
                    try {
                        queryCabinetEnv(sensorList, queryDate);
                    } catch (Exception e) {
                        log.error("插入機房環境數據均值失敗,失敗原因: {}", e);
                        historyAutoTask.setTaskStatus(AutoTaskStatus.FAILED);
                        historyAutoTaskService.updateAutoTask(historyAutoTask, AutoTaskStatus.EXECUTING);
                    }
                }while(queue.size() > 0);
            }, taskExecutor).whenComplete((res,excption)-> {
                //3.記錄定時任務執行狀態
                historyAutoTask.setTaskStatus(AutoTaskStatus.EXECUTED);
                historyAutoTask.setFinishTime(new Date());
                historyAutoTaskService.updateAutoTask(historyAutoTask, AutoTaskStatus.EXECUTING);
            });
        }
    }

如上代碼,雖然用到了線程池,也用到了CompletableFuture,不過一眼可以看出問題所在,就是在每個遍歷結束后都會更新一次任務狀態,這明顯是不對的。明明在所有任務執行完后再執行一次任務狀態更新就可以了,這里卻每個線程執行完任務后更新一次任務狀態。

對于如上代碼,肯定是有更加優雅的寫法的,再次經過深入學習后,寫出以下例子僅供改造參考:

package com.tct.ii.ucr.task;

import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

/**
 * @author wl
 * @date 2022/10/12
 */
@Slf4j
public class TestFuture {
    public static CompletableFuture<String> printStr(String str, ExecutorService executorService) {
        return CompletableFuture.supplyAsync(() -> {
            log.info("str:{}", str);
            return str;
        }, executorService);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        List<String> list = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
        List<CompletableFuture<String>> futureList = list.stream()
                .map(str -> printStr(str, executorService)).collect(Collectors.toList());
        CompletableFuture<Void> allFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
        CompletableFuture<List<String>> resultFuture = allFuture.thenApply(v -> futureList.stream().map(future -> future.join()).collect(Collectors.toList()));
        log.info("result:{}", resultFuture.get());
        executorService.shutdown();
    }
}

在多任務組合中,allOf:等待所有任務完成,anyOf:只要有一個任務完成。

測試結果如圖:

可以看到這里既用到了線程池,最后調用allOf方法等待所有任務執行完后可以一次性獲取結果,非常方便和優雅。?

原文鏈接:https://blog.csdn.net/wl_Honest/article/details/127286938

欄目分類
最近更新