首頁 > 軟體

Java CompletableFuture實現多執行緒非同步編排

2022-09-08 18:04:46

一 :問題背景

問題:當查詢介面較複雜時候,資料的獲取都需要[遠端呼叫],必然需要花費更多的時間。 假如查詢文章詳情頁面,需要如下標註的時間才能完成,比如如下場景:

1. 查詢文章詳情 0.5s

2. 查詢文章博主個人資訊 0.5s

3. 查詢文章評論 1s

4. 查詢博主相關文章分類 1s

5. 相關推薦文章 1s

上面的描述只是舉個例子不要在意這裡的查詢描述,看實際情況使用,有些相關的查詢我們可以拆分介面實現,上面的描述只是為了舉例子。

那麼,使用者需要4s後才能統計的資料。很顯然是不能接受的。 如果有多個執行緒同時完成這4步操作,也許只需要1s左右即可完成響應。

二 :CompletableFuture介紹

在Java 8中, 新增加了一個包含50個方法左右的類: CompletableFuture,提供了非常強大的Future的擴充套件功能,可以幫助我們簡化非同步程式設計的複雜性,提供了函數語言程式設計的能力,可以通過回撥的方式處理計算結果,並且提供了轉換和組合CompletableFuture的方法。

CompletableFuture類實現了Future介面,所以你還是可以像以前一樣通過get方法阻塞或者輪詢的方式獲得結果,但是這種方式不推薦使用。 CompletableFuture和FutureTask同屬於Future介面的實現類,都可以獲取執行緒的執行結果。

三 :具體場景

1.0 單個任務

1.0.1 runAsync:無返回值

/**
 * runAsync無返回值
 */
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
    System.out.println("當前執行緒" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("執行結果:" + i);
}, executor);

1.0.2 supplyAsync:有返回值

whenComplete:能感知異常,能感知結果,但沒辦法給返回值

exceptionally:能感知異常,不能感知結果,能給返回值。相當於,如果出現異常就返回這個值

/**
 * supplyAsync有返回值
 * whenComplete能感知異常,能感知結果,但沒辦法給返回值
 * exceptionally能感知異常,不能感知結果,能給返回值。相當於,如果出現異常就返回這個值
 */
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("當前執行緒" + Thread.currentThread().getId());
    int i = 10 / 0;
    System.out.println("執行結果:" + i);
    return i;
}, executor).whenComplete((res,excption)->{
    //whenComplete雖然能得到異常資訊,但是沒辦法修改返回值
    System.out.println("非同步任務成功完成...結果是:"+res+";異常是:"+excption);
}).exceptionally(throwable -> {
    //exceptionally能感知異常,而且能返回一個預設值,相當於,如果出現異常就返回這個值
    return 10;
});

1.0.3 supplyAsync:有返回值

handle能拿到返回結果,也能得到異常資訊,也能修改返回值

/**
* supplyAsync有返回值
* handle能拿到返回結果,也能得到異常資訊,也能修改返回值
*/
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("當前執行緒" + Thread.currentThread().getId());
    int i = 10 / 4;
    System.out.println("執行結果:" + i);
    return i;
}, executor).handle((res,excption)->{
    if(excption!=null){
        return 0;
    }else {
        return res * 2;
    }
});

2.0 兩個任務編排

兩任務組合(執行緒序列化)

可以是兩任務的序列化,就是一個任務執行完了再執行下一個

也可以是多個任務的序列化,就是按照順序一個個的執行

2.0.1 thenRunAsync

不能接收上一次的執行結果,也沒返回值

        /**
         * thenRunXXX 不能接收上一次的執行結果,也沒返回值
         * .thenRunAsync(() -> {
         *      System.out.println("任務2啟動了...");
         * }, executor);
         */
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("當前執行緒" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("執行結果:" + i);
            return i;
        }, executor).thenRunAsync(() -> {
            System.out.println("任務2啟動了...");
        }, executor);

2.0.2 thenAcceptAsync

能接收上一次的執行結果,但沒返回值

    /**
     * thenAcceptXXX 能接收上一次的執行結果,但沒返回值
     * .thenAcceptAsync(res->{
     *      System.out.println("任務2啟動了..."+res);
     *  },executor);
     */
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("當前執行緒" + Thread.currentThread().getId());
        int i = 10 / 4;
        System.out.println("執行結果:" + i);
        return i;
    }, executor).thenAcceptAsync(res -> {
        System.out.println("任務2啟動了..." + res);
    }, executor);

2.0.3 thenApplyAsync

能接收上一次的執行結果,又可以有返回值

        /**
         * thenApplyXXX 能接收上一次的執行結果,又可以有返回值
         * .thenApplyAsync(res -> {
         *      System.out.println("任務2啟動了..." + res);
         *      return "hello " + res;
         *  }, executor);
         */
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() ->         {
            System.out.println("當前執行緒" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("執行結果:" + i);
            return i;
        }, executor).thenApplyAsync(res -> {
            System.out.println("任務2啟動了..." + res);
            return "hello " + res;
        }, executor);

3.0 三任務編排

先準備兩個任務

       CompletableFuture<Object> future01 =CompletableFuture.supplyAsync(() -> {
            System.out.println("任務1執行緒" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("任務1結束:");
            return i;
        }, executor);
        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任務2執行緒" + Thread.currentThread().getId());
            try {
                Thread.sleep(3000);
                System.out.println("任務2結束:");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }, executor);

3.0.1 三任務組合

前兩個任務都完成,才執行任務3

3.0.1-1、runAfterBothAsync:任務01 任務02都完成了,再開始執行任務3,不感知任務1、2的結果的,也沒返回值

CompletableFuture<Void> future = future01.runAfterBothAsync(future02, () -> {
    System.out.println("任務3開始");
}, executor);

3.0.1-2、thenAcceptBothAsync:任務01 任務02都完成了,再開始執行任務3,能感知到任務1、2的結果,但沒返回值

CompletableFuture<Void> future = future01.thenAcceptBothAsync(future02, (f1, f2) -> {
            System.out.println("任務3開始...得到之前的結果:f1:" + f1 + ", f2:" + f2);
        }, executor);

3.0.1-3、 thenCombineAsync:任務01 任務02都完成了,再開始執行任務3,能感知到任務1、2的結果,而且自己可以帶返回值

 CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> {
            return f1+":"+f2+":哈哈";
        }, executor);

3.0.2 三任務組合二

前兩個任務只要有一個完成,就執行任務3

3.0.2-1、runAfterEitherAsync:兩個任務只要有一個完成,就執行任務3,不感知結果,自己沒返回值

CompletableFuture<Void> future = future01.runAfterEitherAsync(future02, () -> {
            System.out.println("任務3開始...");
        }, executor);

3.0.2-2、 acceptEitherAsync:兩個任務只要有一個完成,就執行任務3,感知結果,自己沒返回值

CompletableFuture<Void> future = future01.acceptEitherAsync(future02, (res) -> {
        System.out.println("任務3開始...之前的結果" + res);
    }, executor);

3.0.2-3、applyToEitherAsync:兩個任務只要有一個完成,就執行任務3,感知結果,自己有返回值

CompletableFuture<String> future = future01.applyToEitherAsync(future02, (res) -> {
        System.out.println("任務3開始...之前的結果" + res);
        return "任務3的結果...";
    }, executor);

4.0 多工的編排

 /**
         * 多工組合
         */
        CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> {
            System.out.println("查詢商品圖片資訊");
            return "hello.jpg";
        },executor);
        CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
            System.out.println("查詢商品屬性資訊");
            return "黑色+256G";
        },executor);
        CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
                System.out.println("查詢商品介紹資訊");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "華為...";
        },executor);

4.0.1、allOf:所有任務都執行完

        /**
         * allOf 所有任務都執行完
         */
        CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImg, futureAttr, futureDesc);
        allOf.get();//等待所有結果完成

4.0.2、anyOf:其中有一個任務執行完就可以

        /**
         * anyOf 其中有一個任務執行完就可以
         */
        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImg, futureAttr, futureDesc);
        anyOf.get();

四: 一個實際的例子

public SkuItemVo item(Long skuId) {
        SkuItemVo skuItemVo = new SkuItemVo();
        //1、sku詳細資訊 sku_info
        SkuInfoEntity skuInfo = getById(skuId);
        skuItemVo.setInfo(skuInfo);
        //2、sku 圖片資訊 sku_img
        List<SkuImagesEntity> images = skuImagesService.getImagesBySkuId(skuId);
        skuItemVo.setImages(images);
        //3、spu 銷售屬性組合
        List<SkuItemSaleAttrVo> saleAttr = skuSaleAttrValueService.getSaleAttrBySpuId(skuInfo.getSpuId());
        skuItemVo.setSaleAttr(saleAttr);
        //4、spu 的介紹
        SpuInfoDescEntity spuInfoDesc = spuInfoDescService.getById(skuInfo.getSpuId());
        skuItemVo.setDesc(spuInfoDesc);
        //5、spu 規格引數資訊
        List<SpuItemAttrGroupVo> groupAttrs = attrGroupService.getAttrGroupWithAttrsBySpuId(skuInfo.getSpuId(),skuInfo.getCatalogId());
        skuItemVo.setGroupAttrs(groupAttrs);
        return skuItemVo;
    }

使用CompletableFuture非同步編排後

private SkuItemVo item(Long skuId) {
        SkuItemVo skuItemVo = new SkuItemVo();
        /**
         * 3、4、5需要依賴1的執行結果,需要返回skuInfo後從中獲取spuId和catalogId
         * 而2不需要依賴1的執行結果
         */
        //1、sku詳細資訊 sku_info
        CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
                SkuInfoEntity skuInfo = getById(skuId);
                skuItemVo.setInfo(skuInfo);
                return skuInfo;
        }, executor);
        //2、sku 圖片資訊 sku_img  2不需要等待上邊1的執行結果
        CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
                List<SkuImagesEntity> images = skuImagesService.getImagesBySkuId(skuId);
                skuItemVo.setImages(images);
        }, executor);
        //下邊的3、4、5都需要上邊1的執行結果
        //所以下邊的3、4、5都是基於上邊1的執行結果 infoFuture 開始的
        //都是以infoFuture.thenAcceptAsync(skuInfo -> {})開始的
        CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync(skuInfo -> {
                //3、spu 銷售屬性組合  3
                List<SkuItemSaleAttrVo> saleAttr = skuSaleAttrValueService.getSaleAttrBySpuId(skuInfo.getSpuId());
                skuItemVo.setSaleAttr(saleAttr);
                System.out.println(saleAttr);
        }, executor);
        CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync(skuInfo -> {
                //4、spu 的介紹
                SpuInfoDescEntity spuInfoDesc = spuInfoDescService.getById(skuInfo.getSpuId());
                skuItemVo.setDesc(spuInfoDesc);
        }, executor);
        CompletableFuture<Void> attrGroupFuture = infoFuture.thenAcceptAsync(skuInfo -> {
                //5、spu 規格引數資訊
                List<SpuItemAttrGroupVo> groupAttrs = attrGroupService.getAttrGroupWithAttrsBySpuId(skuInfo.getSpuId(),skuInfo.getCatalogId());
                System.out.println(groupAttrs);
                skuItemVo.setGroupAttrs(groupAttrs);
        }, executor);
        //等待所有任務完成
        try {
                CompletableFuture.allOf(saleAttrFuture,descFuture,attrGroupFuture,imageFuture).get() ;
        } catch (InterruptedException e) {
                log.error("查詢商品詳情非同步編排錯誤: ");
                log.error(e.getMessage() );
        } catch (ExecutionException e) {
                log.error(e.getMessage() );
        }
        return skuItemVo;
}

以上就是Java CompletableFuture實現多執行緒非同步編排的詳細內容,更多關於Java CompletableFuture的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com