日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無先后,達(dá)者為師

網(wǎng)站首頁 編程語言 正文

CompletableFuture解決多線程返回結(jié)果問題

作者:wl_Honest 更新時(shí)間: 2022-10-14 編程語言

什么是CompletableFuture?

在Java中CompletableFuture用于異步編程,異步編程是編寫非阻塞的代碼,運(yùn)行的任務(wù)在一個(gè)單獨(dú)的線程,與主線程隔離,并且會(huì)通知主線程它的進(jìn)度,成功或者失敗。

在這種方式中,主線程不會(huì)被阻塞,不需要一直等到子線程完成。主線程可以并行的執(zhí)行其他任務(wù)。

使用這種并行方式,可以極大的提高程序的性能。

Future vs CompletableFuture

CompletableFuture 是?Future API的擴(kuò)展。

Future 被用于作為一個(gè)異步計(jì)算結(jié)果的引用。提供一個(gè)?isDone()?方法來檢查計(jì)算任務(wù)是否完成。當(dāng)任務(wù)完成時(shí),get()?方法用來接收計(jì)算任務(wù)的結(jié)果。

從?Callbale和 Future 教程可以學(xué)習(xí)更多關(guān)于 Future 知識(shí).

Future API 是非常好的 Java 異步編程進(jìn)階,但是它缺乏一些非常重要和有用的特性。

CompletableFuture 實(shí)現(xiàn)了?Future?和?CompletionStage接口,并且提供了許多關(guān)于創(chuàng)建,鏈?zhǔn)秸{(diào)用和組合多個(gè) Future 的便利方法集,而且有廣泛的異常處理支持。

之前項(xiàng)目中需要對一組集合進(jìn)行處理,集合內(nèi)的所有元素處理完后更新任務(wù)狀態(tài)。當(dāng)時(shí)經(jīng)過詢問得知可以用到CompletableFuture,于是經(jīng)過短暫的研究寫了一個(gè)很粗略的代碼,如下:

public void calculateAvgSensorData() {
        //0.獲取同步鎖
        Boolean isLock = redisUtils.getLockWithExpire(LOCK_CAL_CABINET_AVG_ENV, LOCK_EXPIRE);
        if (!isLock) {
            log.warn("獲取互斥鎖: {}失敗", LOCK_CAL_CABINET_AVG_ENV);
            return;
        }
        //1.查詢昨日每個(gè)機(jī)柜一整天的環(huán)境數(shù)據(jù)均值,如果有報(bào)警,則用報(bào)警數(shù)據(jù)
        //1.1.獲取當(dāng)前日期
        Date today = DateUtil.date();
        //1.2.獲取昨天日期
        Date yesterday = DateUtil.offsetDay(today, DAY_OFFSET);
        String queryDate = DateUtil.formatDate(yesterday);
        //1.3.查詢機(jī)房列表
        List<BaseProps> roomList = roomService.getAllByUsed(1);
        //1.4.查詢傳感器列表
        List<Sensor> sensorList = sensorService.list();
        if (CollectionUtil.isEmpty(roomList) || CollectionUtil.isEmpty(sensorList)) {
            return;
        }
        //1.5.記錄任務(wù)執(zhí)行時(shí)間
        AutoTask autoTask = autoTaskService.getById(AutoTaskId.CAL_CABINET_AVG_ENV_TASK);
        autoTask.setTaskStatus(AutoTaskStatus.EXECUTING);
        HistoryAutoTask historyAutoTask = BeanUtil.copyProperties(autoTask, HistoryAutoTask.class);
        historyAutoTask.setExecuteTime(new Date());
        //記錄任務(wù)開始時(shí)間
        historyAutoTaskService.startAutoTask(historyAutoTask);
        //1.6.組裝待查詢的機(jī)房隊(duì)列
        roomList.forEach(room -> {
            queue.add(room);
        });

        for(int i = 0; i < MAX_THREAD; i++) {
            //創(chuàng)建異步執(zhí)行任務(wù)
            CompletableFuture cf = CompletableFuture.runAsync(()->{
                do{
                    try {
                        queryCabinetEnv(sensorList, queryDate);
                    } catch (Exception e) {
                        log.error("插入機(jī)房環(huán)境數(shù)據(jù)均值失敗,失敗原因: {}", e);
                        historyAutoTask.setTaskStatus(AutoTaskStatus.FAILED);
                        historyAutoTaskService.updateAutoTask(historyAutoTask, AutoTaskStatus.EXECUTING);
                    }
                }while(queue.size() > 0);
            }, taskExecutor).whenComplete((res,excption)-> {
                //3.記錄定時(shí)任務(wù)執(zhí)行狀態(tài)
                historyAutoTask.setTaskStatus(AutoTaskStatus.EXECUTED);
                historyAutoTask.setFinishTime(new Date());
                historyAutoTaskService.updateAutoTask(historyAutoTask, AutoTaskStatus.EXECUTING);
            });
        }
    }

如上代碼,雖然用到了線程池,也用到了CompletableFuture,不過一眼可以看出問題所在,就是在每個(gè)遍歷結(jié)束后都會(huì)更新一次任務(wù)狀態(tài),這明顯是不對的。明明在所有任務(wù)執(zhí)行完后再執(zhí)行一次任務(wù)狀態(tài)更新就可以了,這里卻每個(gè)線程執(zhí)行完任務(wù)后更新一次任務(wù)狀態(tài)。

對于如上代碼,肯定是有更加優(yōu)雅的寫法的,再次經(jīng)過深入學(xué)習(xí)后,寫出以下例子僅供改造參考:

package com.tct.ii.ucr.task;

import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

/**
 * @author wl
 * @date 2022/10/12
 */
@Slf4j
public class TestFuture {
    public static CompletableFuture<String> printStr(String str, ExecutorService executorService) {
        return CompletableFuture.supplyAsync(() -> {
            log.info("str:{}", str);
            return str;
        }, executorService);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        List<String> list = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
        List<CompletableFuture<String>> futureList = list.stream()
                .map(str -> printStr(str, executorService)).collect(Collectors.toList());
        CompletableFuture<Void> allFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
        CompletableFuture<List<String>> resultFuture = allFuture.thenApply(v -> futureList.stream().map(future -> future.join()).collect(Collectors.toList()));
        log.info("result:{}", resultFuture.get());
        executorService.shutdown();
    }
}

在多任務(wù)組合中,allOf:等待所有任務(wù)完成,anyOf:只要有一個(gè)任務(wù)完成。

測試結(jié)果如圖:

可以看到這里既用到了線程池,最后調(diào)用allOf方法等待所有任務(wù)執(zhí)行完后可以一次性獲取結(jié)果,非常方便和優(yōu)雅。?

原文鏈接:https://blog.csdn.net/wl_Honest/article/details/127286938

欄目分類
最近更新